/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriterWithAvailabilityHelper;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestBoundedOneInputStreamOperator;
import org.apache.flink.streaming.runtime.tasks.TestBoundedTwoInputOperator;
import org.apache.flink.streaming.runtime.tasks.TestFinishedOnRestoreStreamOperator;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.OutputTag;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectArrayAssert;
import org.junit.jupiter.api.Test;

class TwoInputStreamTaskTest {
    TwoInputStreamTaskTest() {
    }

    @Test
    void testOpenCloseAndTimestamps() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new TestOpenCloseMapFunction());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        streamConfig.setOperatorID(new OperatorID());
        long initialTime = 0L;
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new StreamRecord((Object)"Hello", initialTime + 1L), 0, 0);
        expectedOutput.add(new StreamRecord((Object)"Hello", initialTime + 1L));
        testHarness.waitForInputProcessing();
        testHarness.processElement(new StreamRecord((Object)1337, initialTime + 2L), 1, 0);
        expectedOutput.add(new StreamRecord((Object)"1337", initialTime + 2L));
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)TestOpenCloseMapFunction.closeCalled).as("RichFunction methods were not called.", new Object[0])).isTrue();
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testWatermarkAndWatermarkStatusForwarding() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new Watermark(initialTime), 0, 0);
        testHarness.processElement(new Watermark(initialTime), 0, 1);
        testHarness.processElement(new Watermark(initialTime), 1, 0);
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new StreamRecord((Object)"Hello", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)42, initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"Hello", initialTime));
        expectedOutput.add(new StreamRecord((Object)"42", initialTime));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 0, 0);
        testHarness.processElement(new Watermark(initialTime + 3L), 0, 1);
        testHarness.processElement(new Watermark(initialTime + 3L), 1, 0);
        testHarness.processElement(new Watermark(initialTime + 2L), 1, 1);
        expectedOutput.add(new Watermark(initialTime + 2L));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 3L));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 0, 1);
        testHarness.processElement(new Watermark(initialTime + 4L), 1, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 4L));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(WatermarkStatus.IDLE, 0, 1);
        testHarness.processElement(WatermarkStatus.IDLE, 1, 0);
        testHarness.processElement(new Watermark(initialTime + 6L), 0, 0);
        testHarness.processElement(new Watermark(initialTime + 5L), 1, 1);
        testHarness.processElement(WatermarkStatus.IDLE, 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 5L));
        expectedOutput.add(new Watermark(initialTime + 6L));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(WatermarkStatus.IDLE, 0, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(WatermarkStatus.IDLE);
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(WatermarkStatus.ACTIVE, 1, 0);
        testHarness.processElement(WatermarkStatus.ACTIVE, 0, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(WatermarkStatus.ACTIVE);
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assertions.assertThat((List)resultElements).hasSize(2);
    }

    @Test
    void testCheckpointBarriers() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Ciao-0-0", initialTime), 0, 1);
        expectedOutput.add(new StreamRecord((Object)"Ciao-0-0", initialTime));
        testHarness.waitForInputProcessing();
        testHarness.processElement(new StreamRecord((Object)11, initialTime), 1, 1);
        testHarness.processElement(new StreamRecord((Object)111, initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"11", initialTime));
        expectedOutput.add(new StreamRecord((Object)"111", initialTime));
        testHarness.waitForInputProcessing();
        for (int i = 0; i < 20 && testHarness.getOutput().size() < expectedOutput.size(); ++i) {
            Thread.sleep(100L);
        }
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        expectedOutput.add(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assertions.assertThat((List)resultElements).hasSize(3);
    }

    @Test
    void testOvertakingCheckpointBarriers() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        testHarness.processElement(new StreamRecord((Object)42, initialTime), 1, 1);
        testHarness.processElement(new StreamRecord((Object)1337, initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"42", initialTime));
        expectedOutput.add(new StreamRecord((Object)"1337", initialTime));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        expectedOutput.add(new CancelCheckpointMarker(0L));
        expectedOutput.add(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testOperatorMetricReuse() throws Exception {
        int x;
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new DuplicatingOperator()).chain(new OperatorID(), (OneInputStreamOperator)new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).chain(new OperatorID(), (OneInputStreamOperator)new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).finish();
        final TaskMetricGroup taskMetricGroup = TaskManagerMetricGroup.createTaskManagerMetricGroup((MetricRegistry)NoOpMetricRegistry.INSTANCE, (String)"host", (ResourceID)ResourceID.generate()).addJob(new JobID(), "jobname").addTask(ExecutionGraphTestUtils.createExecutionAttemptId(), "task");
        StreamMockEnvironment env = new StreamMockEnvironment(testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, (TaskStateManager)new TestTaskStateManager()){

            public TaskMetricGroup getMetricGroup() {
                return taskMetricGroup;
            }
        };
        Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
        Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
        testHarness.invoke(env);
        testHarness.waitForTaskRunning();
        int numRecords1 = 5;
        int numRecords2 = 3;
        for (x = 0; x < 5; ++x) {
            testHarness.processElement(new StreamRecord((Object)"hello"), 0, 0);
        }
        for (x = 0; x < 3; ++x) {
            testHarness.processElement(new StreamRecord((Object)"hello"), 1, 0);
        }
        testHarness.waitForInputProcessing();
        Assertions.assertThat((long)numRecordsInCounter.getCount()).isEqualTo(8L);
        Assertions.assertThat((long)numRecordsOutCounter.getCount()).isEqualTo(64L);
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    @Test
    void testSkipExecutionsIfFinishedOnRestore() throws Exception {
        OperatorID nonSourceOperatorId = new OperatorID();
        try (StreamTaskMailboxTestHarness testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).setCollectNetworkEvents().modifyStreamConfig(config -> config.setCheckpointingEnabled(true)).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setupOperatorChain(nonSourceOperatorId, (StreamOperator<?>)new TestFinishedOnRestoreStreamOperator()).finishForSingletonOperatorChain((TypeSerializer)StringSerializer.INSTANCE)).build();){
            testHarness.processElement(Watermark.MAX_WATERMARK, 0);
            testHarness.processElement(Watermark.MAX_WATERMARK, 1);
            testHarness.waitForTaskCompletion();
            Assertions.assertThat(testHarness.getOutput()).containsExactly(new Object[]{Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)});
        }
    }

    @Test
    void testWatermarkMetrics() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        CoStreamMap headOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        final OperatorID headOperatorId = new OperatorID();
        OneInputStreamTaskTest.WatermarkMetricOperator chainedOperator = new OneInputStreamTaskTest.WatermarkMetricOperator();
        final OperatorID chainedOperatorId = new OperatorID();
        testHarness.setupOperatorChain(headOperatorId, (StreamOperator<?>)headOperator).chain(chainedOperatorId, (OneInputStreamOperator)chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).finish();
        final InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup(){

            public InternalOperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
                if (id.equals((Object)headOperatorId)) {
                    return headOperatorMetricGroup;
                }
                if (id.equals((Object)chainedOperatorId)) {
                    return chainedOperatorMetricGroup;
                }
                return super.getOrAddOperator(id, name);
            }
        };
        StreamMockEnvironment env = new StreamMockEnvironment(testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, (TaskStateManager)new TestTaskStateManager()){

            public TaskMetricGroup getMetricGroup() {
                return taskMetricGroup;
            }
        };
        testHarness.invoke(env);
        testHarness.waitForTaskRunning();
        Gauge taskInputWatermarkGauge = (Gauge)taskMetricGroup.get("currentInputWatermark");
        Gauge headInputWatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentInputWatermark");
        Gauge headOutputWatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentOutputWatermark");
        Gauge chainedInputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentInputWatermark");
        Gauge chainedOutputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentOutputWatermark");
        ((AbstractCollectionAssert)Assertions.assertThat(new HashSet<Gauge>(Arrays.asList(taskInputWatermarkGauge, headInputWatermarkGauge, headOutputWatermarkGauge, chainedInputWatermarkGauge, chainedOutputWatermarkGauge))).as("A metric was registered multiple times.", new Object[0])).hasSize(5);
        Assertions.assertThat((Long)((Long)taskInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
        Assertions.assertThat((Long)((Long)headInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
        Assertions.assertThat((Long)((Long)headOutputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
        Assertions.assertThat((Long)((Long)chainedInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
        Assertions.assertThat((Long)((Long)chainedOutputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
        testHarness.processElement(new Watermark(1L), 0, 0);
        testHarness.waitForInputProcessing();
        Assertions.assertThat((Long)((Long)taskInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
        Assertions.assertThat((Long)((Long)headInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
        Assertions.assertThat((Long)((Long)headOutputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
        Assertions.assertThat((Long)((Long)chainedInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
        Assertions.assertThat((Long)((Long)chainedOutputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
        testHarness.processElement(new Watermark(2L), 1, 0);
        testHarness.waitForInputProcessing();
        Assertions.assertThat((Long)((Long)taskInputWatermarkGauge.getValue())).isOne();
        Assertions.assertThat((Long)((Long)headInputWatermarkGauge.getValue())).isOne();
        Assertions.assertThat((Long)((Long)headOutputWatermarkGauge.getValue())).isOne();
        Assertions.assertThat((Long)((Long)chainedInputWatermarkGauge.getValue())).isOne();
        Assertions.assertThat((Long)((Long)chainedOutputWatermarkGauge.getValue())).isEqualTo(2L);
        testHarness.processElement(new Watermark(3L), 0, 0);
        testHarness.waitForInputProcessing();
        Assertions.assertThat((Long)((Long)taskInputWatermarkGauge.getValue())).isEqualTo(2L);
        Assertions.assertThat((Long)((Long)headInputWatermarkGauge.getValue())).isEqualTo(2L);
        Assertions.assertThat((Long)((Long)headOutputWatermarkGauge.getValue())).isEqualTo(2L);
        Assertions.assertThat((Long)((Long)chainedInputWatermarkGauge.getValue())).isEqualTo(2L);
        Assertions.assertThat((Long)((Long)chainedOutputWatermarkGauge.getValue())).isEqualTo(4L);
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    @Test
    void testClosingAllOperatorsOnChainProperly() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new TestBoundedTwoInputOperator("Operator0")).chain(new OperatorID(), (OneInputStreamOperator)new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).finish();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new StreamRecord((Object)"Hello-1"), 0, 0);
        testHarness.endInput(0, 0);
        testHarness.waitForInputProcessing();
        testHarness.processElement(new StreamRecord((Object)"Hello-2"), 1, 0);
        testHarness.endInput(1, 0);
        testHarness.waitForTaskCompletion();
        StreamRecord[] expected = new StreamRecord[]{new StreamRecord((Object)"[Operator0-1]: Hello-1"), new StreamRecord((Object)"[Operator0-1]: End of input"), new StreamRecord((Object)"[Operator0-2]: Hello-2"), new StreamRecord((Object)"[Operator0-2]: End of input"), new StreamRecord((Object)"[Operator0]: Finish"), new StreamRecord((Object)"[Operator1]: End of input"), new StreamRecord((Object)"[Operator1]: Finish")};
        Object[] output = testHarness.getOutput().toArray();
        ((ObjectArrayAssert)Assertions.assertThat((Object[])output).as("Output was not correct.", new Object[0])).isEqualTo((Object)expected);
    }

    @Test
    void testCheckpointBarrierMetrics() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        testHarness.setupOutputForSingletonOperatorChain();
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<String, Metric>();
        TaskMetricGroup taskMetricGroup = StreamTaskTestHarness.createTaskMetricGroup(metrics);
        StreamMockEnvironment environment = testHarness.createEnvironment();
        environment.setTaskMetricGroup(taskMetricGroup);
        testHarness.invoke(environment);
        testHarness.waitForTaskRunning();
        Assertions.assertThat(metrics).containsKeys((Object[])new String[]{"checkpointAlignmentTime", "checkpointStartDelayNanos"});
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    @Test
    void testCanEmitBatchOfRecords() throws Exception {
        AvailabilityProvider.AvailabilityHelper availabilityHelper = new AvailabilityProvider.AvailabilityHelper();
        try (StreamTaskMailboxTestHarness testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addAdditionalOutput(new ResultPartitionWriter[]{new ResultPartitionWriterWithAvailabilityHelper(availabilityHelper)}).setupOperatorChain((StreamOperator<?>)new DuplicatingOperator()).finishForSingletonOperatorChain((TypeSerializer)IntSerializer.INSTANCE)).build();){
            StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecordsChecker = testHarness.streamTask.getCanEmitBatchOfRecords();
            testHarness.processAll();
            availabilityHelper.resetAvailable();
            Assertions.assertThat((boolean)canEmitBatchOfRecordsChecker.check()).isFalse();
            availabilityHelper.resetUnavailable();
            Assertions.assertThat((boolean)canEmitBatchOfRecordsChecker.check()).isFalse();
            availabilityHelper.resetAvailable();
            Assertions.assertThat((boolean)canEmitBatchOfRecordsChecker.check()).isFalse();
            testHarness.streamTask.mainMailboxExecutor.execute(() -> {}, "mail");
            Assertions.assertThat((boolean)canEmitBatchOfRecordsChecker.check()).isFalse();
            testHarness.processAll();
            Assertions.assertThat((boolean)canEmitBatchOfRecordsChecker.check()).isFalse();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testTaskSideOutputStatistics() throws Exception {
        TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
        for (int i = 0; i < partitionWriters.length; ++i) {
            partitionWriters[i] = new RecordOrEventCollectingResultPartitionWriter(new ArrayDeque(), (TypeSerializer)new StreamElementSerializer(BasicTypeInfo.INT_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())));
            partitionWriters[i].setup();
        }
        try (StreamTaskMailboxTestHarness testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(TwoInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addAdditionalOutput(partitionWriters).setupOperatorChain(new OperatorID(), (StreamOperator<?>)new PassThroughOperator()).chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).setOperatorFactory((StreamOperatorFactory)SimpleOperatorFactory.of((StreamOperator)new OneInputStreamTaskTest.OddEvenOperator())).addNonChainedOutputsCount(new OutputTag("odd", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO), 2).addNonChainedOutputsCount(1).build().chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).setOperatorFactory((StreamOperatorFactory)SimpleOperatorFactory.of((StreamOperator)new OneInputStreamTaskTest.DuplicatingOperator())).addNonChainedOutputsCount(1).build().finish()).setTaskMetricGroup(taskMetricGroup).build();){
            int x;
            Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
            Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
            int numEvenRecords = 5;
            int numOddRecords = 3;
            for (x = 0; x < 5; ++x) {
                testHarness.processElement(new StreamRecord((Object)(2 * x)));
            }
            for (x = 0; x < 3; ++x) {
                testHarness.processElement(new StreamRecord((Object)(2 * x + 1)));
            }
            int oddEvenOperatorOutputsWithOddTag = 3;
            int oddEvenOperatorOutputsWithoutTag = 8;
            int duplicatingOperatorOutput = 16;
            Assertions.assertThat((long)numRecordsInCounter.getCount()).isEqualTo(8L);
            Assertions.assertThat((long)numRecordsOutCounter.getCount()).isEqualTo(27L);
            testHarness.waitForTaskCompletion();
        }
        finally {
            for (ResultPartitionWriter partitionWriter : partitionWriters) {
                partitionWriter.close();
            }
        }
    }

    private static class IdentityMap
    implements CoMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1L;

        private IdentityMap() {
        }

        public String map1(String value) {
            return value;
        }

        public String map2(Integer value) {
            return value.toString();
        }
    }

    private static class TestOpenCloseMapFunction
    extends RichCoMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1L;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        TestOpenCloseMapFunction() {
            openCalled = false;
            closeCalled = false;
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)openCalled).as("Close called before open.", new Object[0])).isFalse();
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)openCalled).as("Open was not called before close.", new Object[0])).isTrue();
            closeCalled = true;
        }

        public String map1(String value) {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)openCalled).as("Open was not called before run.", new Object[0])).isTrue();
            return value;
        }

        public String map2(Integer value) {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)openCalled).as("Open was not called before run.", new Object[0])).isTrue();
            return value.toString();
        }
    }

    static class OddEvenOperator
    extends AbstractStreamOperator<Integer>
    implements TwoInputStreamOperator<Integer, Integer, Integer> {
        private final OutputTag<Integer> oddOutputTag = new OutputTag("odd", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        private final OutputTag<Integer> evenOutputTag = new OutputTag("even", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);

        OddEvenOperator() {
        }

        public void processElement1(StreamRecord<Integer> element) throws Exception {
            this.processElement(element);
        }

        public void processElement2(StreamRecord<Integer> element) throws Exception {
            this.processElement(element);
        }

        private void processElement(StreamRecord<Integer> element) {
            if ((Integer)element.getValue() % 2 == 0) {
                this.output.collect(this.evenOutputTag, element);
            } else {
                this.output.collect(this.oddOutputTag, element);
            }
            this.output.collect(element);
        }
    }

    static class PassThroughOperator<T>
    extends AbstractStreamOperator<T>
    implements TwoInputStreamOperator<T, T, T> {
        PassThroughOperator() {
        }

        public void processElement1(StreamRecord<T> element) throws Exception {
            this.output.collect(element);
        }

        public void processElement2(StreamRecord<T> element) throws Exception {
            this.output.collect(element);
        }
    }

    static class DuplicatingOperator
    extends AbstractStreamOperator<String>
    implements TwoInputStreamOperator<String, String, String>,
    InputSelectable {
        DuplicatingOperator() {
        }

        public void processElement1(StreamRecord<String> element) {
            this.output.collect(element);
            this.output.collect(element);
        }

        public void processElement2(StreamRecord<String> element) {
            this.output.collect(element);
            this.output.collect(element);
        }

        public InputSelection nextSelection() {
            return InputSelection.ALL;
        }
    }
}

