/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.LifeCycleMonitor;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTestBase;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.TestFinishedOnRestoreStreamOperator;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.FunctionWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

class SourceOperatorStreamTaskTest
extends SourceStreamTaskTestBase {
    private static final OperatorID OPERATOR_ID = new OperatorID();
    private static final int NUM_RECORDS = 10;
    public static final CheckpointStorageLocationReference SAVEPOINT_LOCATION = new CheckpointStorageLocationReference("Savepoint".getBytes());
    public static final CheckpointStorageLocationReference CHECKPOINT_LOCATION = new CheckpointStorageLocationReference("Checkpoint".getBytes());

    SourceOperatorStreamTaskTest() {
    }

    @Test
    void testMetrics() throws Exception {
        this.testMetrics((FunctionWithException<Environment, ? extends StreamTask<Integer, ?>, Exception>)((FunctionWithException)SourceOperatorStreamTask::new), (StreamOperatorFactory<?>)new SourceOperatorFactory((Source)new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), busyTime -> busyTime.isLessThanOrEqualTo(1000000.0));
    }

    @Test
    void testSnapshotAndRestore() throws Exception {
        TaskStateSnapshot taskStateSnapshot = this.executeAndWaitForCheckpoint(1L, null, IntStream.range(0, 10));
        this.executeAndWaitForCheckpoint(2L, taskStateSnapshot, IntStream.range(10, 20));
    }

    @Test
    void testSnapshotAndAdvanceToEndOfEventTime() throws Exception {
        boolean checkpointId = true;
        try (StreamTaskMailboxTestHarness<Integer> testHarness = this.createTestHarness(1L, null);){
            this.getAndMaybeAssignSplit(testHarness);
            CheckpointOptions checkpointOptions = new CheckpointOptions((SnapshotType)SavepointType.terminate((SavepointFormatType)SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault());
            this.triggerCheckpointWaitForFinish(testHarness, 1L, checkpointOptions);
            LinkedList<Object> expectedOutput = new LinkedList<Object>();
            expectedOutput.add(Watermark.MAX_WATERMARK);
            expectedOutput.add(new EndOfData(StopMode.DRAIN));
            expectedOutput.add(new CheckpointBarrier(1L, 1L, checkpointOptions));
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    void testEmittingMaxWatermarkAfterReadingAllRecords() throws Exception {
        try (StreamTaskMailboxTestHarness<Integer> testHarness = this.createTestHarness();){
            testHarness.processAll();
            testHarness.finishProcessing();
            LinkedList<Object> expectedOutput = new LinkedList<Object>();
            expectedOutput.add(Watermark.MAX_WATERMARK);
            expectedOutput.add(new EndOfData(StopMode.DRAIN));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
        }
    }

    @Test
    void testNotEmittingMaxWatermarkAfterCancelling() throws Exception {
        try (StreamTaskMailboxTestHarness<Integer> testHarness = this.createTestHarness();){
            testHarness.getStreamTask().cancel();
            testHarness.finishProcessing();
            Assertions.assertThat(testHarness.getOutput()).hasSize(0);
        }
    }

    static Stream<?> provideExternallyInducedParameters() {
        return Stream.of(CheckpointOptions.alignedNoTimeout((SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL), (CheckpointStorageLocationReference)SAVEPOINT_LOCATION), CheckpointOptions.alignedNoTimeout((SnapshotType)SavepointType.terminate((SavepointFormatType)SavepointFormatType.CANONICAL), (CheckpointStorageLocationReference)SAVEPOINT_LOCATION), CheckpointOptions.alignedNoTimeout((SnapshotType)SavepointType.suspend((SavepointFormatType)SavepointFormatType.CANONICAL), (CheckpointStorageLocationReference)SAVEPOINT_LOCATION), CheckpointOptions.alignedNoTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CHECKPOINT_LOCATION), CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CHECKPOINT_LOCATION, (long)123L), CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CHECKPOINT_LOCATION), CheckpointOptions.notExactlyOnce((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CHECKPOINT_LOCATION)).flatMap(options -> Stream.of({options, true}, {options, false}));
    }

    @ParameterizedTest
    @MethodSource(value={"provideExternallyInducedParameters"})
    void testExternallyInducedSource(CheckpointOptions checkpointOptions, boolean rpcFirst) throws Exception {
        int numEventsBeforeCheckpoint = 10;
        int totalNumEvents = 20;
        TestingExternallyInducedSourceReader testingReader = new TestingExternallyInducedSourceReader(10, 20);
        try (StreamTaskMailboxTestHarness<Integer> testHarness = this.createTestHarness(new TestingExternallyInducedSource(testingReader), 0L, null);){
            TestingExternallyInducedSourceReader runtimeTestingReader = (TestingExternallyInducedSourceReader)((SourceOperator)testHarness.getStreamTask().mainOperator).getSourceReader();
            CheckpointMetaData checkpointMetaData = new CheckpointMetaData(1234L, 2L);
            if (rpcFirst) {
                testHarness.streamTask.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
                testHarness.processAll();
            } else {
                do {
                    testHarness.processSingleStep();
                } while (!runtimeTestingReader.shouldTriggerCheckpoint().isPresent());
                Assertions.assertThat((boolean)testHarness.streamTask.inputProcessor.isAvailable()).isFalse();
                CompletableFuture triggerCheckpointAsync = testHarness.streamTask.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
                while (!triggerCheckpointAsync.isDone()) {
                    testHarness.processSingleStep();
                }
                Assertions.assertThat((boolean)testHarness.streamTask.inputProcessor.isAvailable()).isTrue();
                testHarness.processAll();
            }
            int expectedEvents = checkpointOptions.getCheckpointType().isSavepoint() && ((SavepointType)checkpointOptions.getCheckpointType()).isSynchronous() ? 10 : 20;
            Assertions.assertThat((int)runtimeTestingReader.numEmittedEvents).isEqualTo(expectedEvents);
            Assertions.assertThat((boolean)runtimeTestingReader.checkpointed).isTrue();
            Assertions.assertThat((long)runtimeTestingReader.checkpointedId).isEqualTo(1234L);
            Assertions.assertThat((int)runtimeTestingReader.checkpointedAt).isEqualTo(10);
            Assertions.assertThat(testHarness.getOutput()).contains(new Object[]{new CheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions)});
        }
    }

    @Test
    void testSkipExecutionIfFinishedOnRestore() throws Exception {
        TaskStateSnapshot taskStateSnapshot = TaskStateSnapshot.FINISHED_ON_RESTORE;
        LifeCycleMonitorSource testingSource = new LifeCycleMonitorSource(Boundedness.CONTINUOUS_UNBOUNDED, 10);
        SourceOperatorFactory sourceOperatorFactory = new SourceOperatorFactory((Source)testingSource, WatermarkStrategy.noWatermarks());
        ArrayList output = new ArrayList();
        try (StreamTaskMailboxTestHarness testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).setTaskStateSnapshot(1L, taskStateSnapshot).addAdditionalOutput(new ResultPartitionWriter[]{new RecordOrEventCollectingResultPartitionWriter<StreamElement>(output, (TypeSerializer)new StreamElementSerializer((TypeSerializer)IntSerializer.INSTANCE)){

            public void notifyEndOfData(StopMode mode) throws IOException {
                this.broadcastEvent((AbstractEvent)new EndOfData(mode), false);
            }
        }}).setupOperatorChain((StreamOperatorFactory<?>)sourceOperatorFactory).chain((OneInputStreamOperator)new TestFinishedOnRestoreStreamOperator(), (TypeSerializer)StringSerializer.INSTANCE).finish()).build();){
            testHarness.getStreamTask().invoke();
            testHarness.processAll();
            Assertions.assertThat(output).containsExactly(new Object[]{Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)});
            LifeCycleMonitorSourceReader sourceReader = (LifeCycleMonitorSourceReader)((SourceOperator)testHarness.getStreamTask().getMainOperator()).getSourceReader();
            sourceReader.getLifeCycleMonitor().assertCallTimes(0, LifeCycleMonitor.LifeCyclePhase.values());
        }
    }

    @Test
    void testTriggeringStopWithSavepointWithDrain() throws Exception {
        SourceOperatorFactory sourceOperatorFactory = new SourceOperatorFactory((Source)new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2), WatermarkStrategy.noWatermarks());
        final CompletableFuture checkpointCompleted = new CompletableFuture();
        TestCheckpointResponder checkpointResponder = new TestCheckpointResponder(){

            public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
                super.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, subtaskState);
                checkpointCompleted.complete(null);
            }
        };
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)sourceOperatorFactory).setCheckpointResponder((CheckpointResponder)checkpointResponder).build();){
            CompletableFuture triggerResult = testHarness.streamTask.triggerCheckpointAsync(new CheckpointMetaData(2L, 2L), CheckpointOptions.alignedNoTimeout((SnapshotType)SavepointType.terminate((SavepointFormatType)SavepointFormatType.CANONICAL), (CheckpointStorageLocationReference)SAVEPOINT_LOCATION));
            checkpointCompleted.whenComplete((ignored, exception) -> testHarness.streamTask.notifyCheckpointCompleteAsync(2L));
            testHarness.waitForTaskCompletion();
            testHarness.finishProcessing();
            Assertions.assertThat((boolean)triggerResult.isDone()).isTrue();
            Assertions.assertThat((Boolean)((Boolean)triggerResult.get())).isTrue();
            Assertions.assertThat((boolean)checkpointCompleted.isDone()).isTrue();
        }
    }

    private TaskStateSnapshot executeAndWaitForCheckpoint(long checkpointId, TaskStateSnapshot initialSnapshot, IntStream expectedRecords) throws Exception {
        try (StreamTaskMailboxTestHarness<Integer> testHarness = this.createTestHarness(checkpointId, initialSnapshot);){
            MockSourceSplit split = this.getAndMaybeAssignSplit(testHarness);
            this.addRecords(split, 10);
            testHarness.processAll();
            CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
            this.triggerCheckpointWaitForFinish(testHarness, checkpointId, checkpointOptions);
            LinkedList<CheckpointBarrier> expectedOutput = new LinkedList<CheckpointBarrier>();
            expectedRecords.forEach(r -> expectedOutput.offer((CheckpointBarrier)new StreamRecord((Object)r, Long.MIN_VALUE)));
            expectedOutput.add(new CheckpointBarrier(checkpointId, checkpointId, checkpointOptions));
            Assertions.assertThat((long)testHarness.taskStateManager.getReportedCheckpointId()).isEqualTo(checkpointId);
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
            TaskStateSnapshot taskStateSnapshot = testHarness.taskStateManager.getLastJobManagerTaskStateSnapshot();
            return taskStateSnapshot;
        }
    }

    private void triggerCheckpointWaitForFinish(StreamTaskMailboxTestHarness<Integer> testHarness, long checkpointId, CheckpointOptions checkpointOptions) throws Exception {
        testHarness.taskStateManager.getWaitForReportLatch().reset();
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointId);
        CompletableFuture checkpointFuture = testHarness.getStreamTask().triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
        this.getSourceReaderFromTask(testHarness).markAvailable();
        this.processUntil(testHarness, checkpointFuture::isDone);
        Future checkpointNotified = testHarness.getStreamTask().notifyCheckpointCompleteAsync(checkpointId);
        this.processUntil(testHarness, checkpointNotified::isDone);
        testHarness.taskStateManager.getWaitForReportLatch().await();
    }

    private void processUntil(StreamTaskMailboxTestHarness testHarness, Supplier<Boolean> condition) throws Exception {
        do {
            testHarness.getStreamTask().runMailboxStep();
        } while (!condition.get().booleanValue());
    }

    private StreamTaskMailboxTestHarness<Integer> createTestHarness() throws Exception {
        return this.createTestHarness(0L, null);
    }

    private StreamTaskMailboxTestHarness<Integer> createTestHarness(long checkpointId, TaskStateSnapshot snapshot) throws Exception {
        return this.createTestHarness(new MockSource(Boundedness.BOUNDED, 1), checkpointId, snapshot);
    }

    private StreamTaskMailboxTestHarness<Integer> createTestHarness(MockSource source, long checkpointId, TaskStateSnapshot snapshot) throws Exception {
        SourceOperatorFactory sourceOperatorFactory = new SourceOperatorFactory((Source)source, WatermarkStrategy.noWatermarks());
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);
        if (snapshot != null) {
            builder.setTaskStateSnapshot(checkpointId, snapshot);
        }
        return builder.setCollectNetworkEvents().setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)sourceOperatorFactory, OPERATOR_ID).build();
    }

    private MockSourceSplit getAndMaybeAssignSplit(StreamTaskMailboxTestHarness<Integer> testHarness) throws Exception {
        List assignedSplits = this.getSourceReaderFromTask(testHarness).getAssignedSplits();
        if (assignedSplits.isEmpty()) {
            MockSourceSplit split = new MockSourceSplit(0, 0);
            AddSplitEvent addSplitEvent = new AddSplitEvent(Collections.singletonList(split), (SimpleVersionedSerializer)new MockSourceSplitSerializer());
            testHarness.getStreamTask().dispatchOperatorEvent(OPERATOR_ID, new SerializedValue((Object)addSplitEvent));
            while (assignedSplits.isEmpty()) {
                testHarness.getStreamTask().runMailboxStep();
            }
            this.getSourceReaderFromTask(testHarness).markAvailable();
        }
        return (MockSourceSplit)assignedSplits.get(0);
    }

    private void addRecords(MockSourceSplit split, int numRecords) {
        int startingIndex;
        for (int i = startingIndex = split.index(); i < startingIndex + numRecords; ++i) {
            split.addRecord(i);
        }
    }

    private MockSourceReader getSourceReaderFromTask(StreamTaskMailboxTestHarness<Integer> testHarness) {
        return (MockSourceReader)((SourceOperator)testHarness.getStreamTask().mainOperator).getSourceReader();
    }

    static class LifeCycleMonitorSourceReader
    extends MockSourceReader {
        private final LifeCycleMonitor lifeCycleMonitor = new LifeCycleMonitor();

        LifeCycleMonitorSourceReader() {
        }

        public void start() {
            this.lifeCycleMonitor.incrementCallTime(LifeCycleMonitor.LifeCyclePhase.OPEN);
            super.start();
        }

        public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws Exception {
            this.lifeCycleMonitor.incrementCallTime(LifeCycleMonitor.LifeCyclePhase.PROCESS_ELEMENT);
            return super.pollNext(sourceOutput);
        }

        public void close() throws Exception {
            this.lifeCycleMonitor.incrementCallTime(LifeCycleMonitor.LifeCyclePhase.CLOSE);
            super.close();
        }

        public LifeCycleMonitor getLifeCycleMonitor() {
            return this.lifeCycleMonitor;
        }
    }

    static class LifeCycleMonitorSource
    extends MockSource {
        public LifeCycleMonitorSource(Boundedness boundedness, int numSplits) {
            super(boundedness, numSplits);
        }

        public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
            return new LifeCycleMonitorSourceReader();
        }
    }

    private static class TestingExternallyInducedSourceReader
    implements ExternallyInducedSourceReader<Integer, MockSourceSplit>,
    Serializable {
        private static final long CHECKPOINT_ID = 1234L;
        private final int numEventsBeforeCheckpoint;
        private final int totalNumEvents;
        private int numEmittedEvents;
        private boolean checkpointed;
        private int checkpointedAt;
        private long checkpointedId;

        TestingExternallyInducedSourceReader(int numEventsBeforeCheckpoint, int totalNumEvents) {
            this.numEventsBeforeCheckpoint = numEventsBeforeCheckpoint;
            this.totalNumEvents = totalNumEvents;
            this.numEmittedEvents = 0;
            this.checkpointed = false;
            this.checkpointedAt = -1;
        }

        public Optional<Long> shouldTriggerCheckpoint() {
            if (this.numEmittedEvents == this.numEventsBeforeCheckpoint && !this.checkpointed) {
                return Optional.of(1234L);
            }
            return Optional.empty();
        }

        public void start() {
        }

        public InputStatus pollNext(ReaderOutput<Integer> output) throws Exception {
            if (this.numEmittedEvents == this.numEventsBeforeCheckpoint - 1) {
                ++this.numEmittedEvents;
                return InputStatus.NOTHING_AVAILABLE;
            }
            if (this.numEmittedEvents < this.totalNumEvents) {
                ++this.numEmittedEvents;
                return InputStatus.MORE_AVAILABLE;
            }
            return InputStatus.END_OF_INPUT;
        }

        public List<MockSourceSplit> snapshotState(long checkpointId) {
            this.checkpointed = true;
            this.checkpointedAt = this.numEmittedEvents;
            this.checkpointedId = checkpointId;
            return Collections.emptyList();
        }

        public CompletableFuture<Void> isAvailable() {
            return CompletableFuture.completedFuture(null);
        }

        public void addSplits(List<MockSourceSplit> splits) {
        }

        public void notifyNoMoreSplits() {
        }

        public void close() throws Exception {
        }
    }

    private static class TestingExternallyInducedSource
    extends MockSource {
        private static final long serialVersionUID = 3078454109555893721L;
        private final TestingExternallyInducedSourceReader reader;

        private TestingExternallyInducedSource(TestingExternallyInducedSourceReader reader) {
            super(Boundedness.CONTINUOUS_UNBOUNDED, 1);
            this.reader = reader;
        }

        public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
            return this.reader;
        }
    }
}

