/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskExternallyInducedSourceInput;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskFinishedOnRestoreSourceInput;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;

@Internal
public class SourceOperatorStreamTask<T>
extends StreamTask<T, SourceOperator<T, ?>> {
    private AsyncDataOutputToOutput<T> output;
    private SortedMap<Long, UntriggeredCheckpoint> untriggeredCheckpoints = new TreeMap<Long, UntriggeredCheckpoint>();
    private SortedSet<Long> triggeredCheckpoints = new TreeSet<Long>();
    private CompletableFuture<Void> waitForRPC = FutureUtils.completedVoidFuture();
    private StreamTaskExternallyInducedSourceInput<T> externallyInducedSourceInput;

    public SourceOperatorStreamTask(Environment env) throws Exception {
        super(env);
    }

    @Override
    public void init() throws Exception {
        StreamTaskSourceInput input;
        SourceOperator sourceOperator = (SourceOperator)this.mainOperator;
        sourceOperator.initReader();
        SourceReader sourceReader = sourceOperator.getSourceReader();
        if (this.operatorChain.isTaskDeployedAsFinished()) {
            input = new StreamTaskFinishedOnRestoreSourceInput(sourceOperator, 0, 0);
        } else if (sourceReader instanceof ExternallyInducedSourceReader) {
            this.externallyInducedSourceInput = new StreamTaskExternallyInducedSourceInput(sourceOperator, this::triggerCheckpointForExternallyInducedSource, 0, 0);
            input = this.externallyInducedSourceInput;
        } else {
            input = new StreamTaskSourceInput(sourceOperator, 0, 0);
        }
        this.output = new AsyncDataOutputToOutput(this.operatorChain.getMainOperatorOutput(), sourceOperator.getSourceMetricGroup(), null);
        this.inputProcessor = new StreamOneInputProcessor(input, this.output, this.operatorChain);
        this.getEnvironment().getMetricGroup().getIOMetricGroup().gauge("checkpointStartDelayNanos", this::getAsyncCheckpointStartDelayNanos);
    }

    @Override
    public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        if (!this.isExternallyInducedSource()) {
            return this.triggerCheckpointNowAsync(checkpointMetaData, checkpointOptions);
        }
        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<Boolean>();
        this.mainMailboxExecutor.execute(() -> this.triggerCheckpointOnExternallyInducedSource(checkpointMetaData, checkpointOptions, triggerFuture), "SourceOperatorStreamTask#triggerCheckpointAsync(%s, %s)", checkpointMetaData, checkpointOptions);
        return triggerFuture;
    }

    private boolean isExternallyInducedSource() {
        return this.externallyInducedSourceInput != null;
    }

    private void triggerCheckpointOnExternallyInducedSource(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CompletableFuture<Boolean> triggerFuture) {
        assert (this.mailboxProcessor.isMailboxThread());
        if (!this.triggeredCheckpoints.remove(checkpointMetaData.getCheckpointId())) {
            this.untriggeredCheckpoints.put(checkpointMetaData.getCheckpointId(), new UntriggeredCheckpoint(checkpointMetaData, checkpointOptions));
            triggerFuture.complete(this.isRunning());
        } else {
            FutureUtils.forward(this.triggerCheckpointNowAsync(checkpointMetaData, checkpointOptions), triggerFuture);
            this.cleanupOldCheckpoints(checkpointMetaData.getCheckpointId());
        }
    }

    private CompletableFuture<Boolean> triggerCheckpointNowAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        if (this.isSynchronous(checkpointOptions.getCheckpointType())) {
            return this.triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
        }
        return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
    }

    private boolean isSynchronous(SnapshotType checkpointType) {
        return checkpointType.isSavepoint() && ((SavepointType)checkpointType).isSynchronous();
    }

    private CompletableFuture<Boolean> triggerStopWithSavepointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        CompletableFuture operatorFinished = new CompletableFuture();
        this.mainMailboxExecutor.execute(() -> {
            this.setSynchronousSavepoint(checkpointMetaData.getCheckpointId());
            FutureUtils.forward(((SourceOperator)this.mainOperator).stop(((SavepointType)checkpointOptions.getCheckpointType()).shouldDrain() ? StopMode.DRAIN : StopMode.NO_DRAIN), operatorFinished);
        }, "stop Flip-27 source for stop-with-savepoint");
        return operatorFinished.thenCompose(ignore -> super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions));
    }

    @Override
    protected void advanceToEndOfEventTime() {
        this.output.emitWatermark(Watermark.MAX_WATERMARK);
    }

    @Override
    protected void declineCheckpoint(long checkpointId) {
        this.cleanupCheckpoint(checkpointId);
        super.declineCheckpoint(checkpointId);
    }

    @Override
    public Future<Void> notifyCheckpointAbortAsync(long checkpointId, long latestCompletedCheckpointId) {
        this.mainMailboxExecutor.execute(() -> this.cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
        return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId);
    }

    @Override
    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
        this.mainMailboxExecutor.execute(() -> this.cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);
        return super.notifyCheckpointSubsumedAsync(checkpointId);
    }

    private void triggerCheckpointForExternallyInducedSource(long checkpointId) {
        UntriggeredCheckpoint untriggeredCheckpoint = (UntriggeredCheckpoint)this.untriggeredCheckpoints.remove(checkpointId);
        if (untriggeredCheckpoint != null) {
            this.triggerCheckpointNowAsync(untriggeredCheckpoint.getMetadata(), untriggeredCheckpoint.getCheckpointOptions());
            this.cleanupOldCheckpoints(checkpointId);
        } else {
            this.triggeredCheckpoints.add(checkpointId);
            if (this.waitForRPC.isDone()) {
                this.waitForRPC = new CompletableFuture();
                this.externallyInducedSourceInput.blockUntil(this.waitForRPC);
            }
        }
    }

    private void cleanupOldCheckpoints(long checkpointId) {
        assert (this.mailboxProcessor.isMailboxThread());
        this.triggeredCheckpoints.headSet(checkpointId).clear();
        this.untriggeredCheckpoints.headMap(checkpointId).clear();
        this.maybeResumeProcessing();
    }

    private void maybeResumeProcessing() {
        assert (this.mailboxProcessor.isMailboxThread());
        if (this.triggeredCheckpoints.isEmpty()) {
            this.waitForRPC.complete(null);
        }
    }

    private void cleanupCheckpoint(long checkpointId) {
        assert (this.mailboxProcessor.isMailboxThread());
        this.triggeredCheckpoints.remove(checkpointId);
        this.untriggeredCheckpoints.remove(checkpointId);
        this.maybeResumeProcessing();
    }

    private static class UntriggeredCheckpoint {
        private final CheckpointMetaData metadata;
        private final CheckpointOptions checkpointOptions;

        private UntriggeredCheckpoint(CheckpointMetaData metadata, CheckpointOptions checkpointOptions) {
            this.metadata = metadata;
            this.checkpointOptions = checkpointOptions;
        }

        public CheckpointMetaData getMetadata() {
            return this.metadata;
        }

        public CheckpointOptions getCheckpointOptions() {
            return this.checkpointOptions;
        }
    }

    public static class AsyncDataOutputToOutput<T>
    implements PushingAsyncDataInput.DataOutput<T> {
        private final Output<StreamRecord<T>> output;
        private final InternalSourceReaderMetricGroup metricGroup;
        @Nullable
        private final WatermarkGauge inputWatermarkGauge;

        public AsyncDataOutputToOutput(Output<StreamRecord<T>> output, InternalSourceReaderMetricGroup metricGroup, @Nullable WatermarkGauge inputWatermarkGauge) {
            this.output = Preconditions.checkNotNull(output);
            this.inputWatermarkGauge = inputWatermarkGauge;
            this.metricGroup = metricGroup;
        }

        @Override
        public void emitRecord(StreamRecord<T> streamRecord) {
            this.metricGroup.recordEmitted(streamRecord.getTimestamp());
            this.output.collect(streamRecord);
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            this.output.emitLatencyMarker(latencyMarker);
        }

        @Override
        public void emitRecordAttributes(RecordAttributes recordAttributes) {
            this.output.emitRecordAttributes(recordAttributes);
        }

        @Override
        public void emitWatermark(WatermarkEvent watermark) throws Exception {
            this.output.emitWatermark(watermark);
        }

        @Override
        public void emitWatermark(Watermark watermark) {
            long watermarkTimestamp = watermark.getTimestamp();
            if (this.inputWatermarkGauge != null) {
                this.inputWatermarkGauge.setCurrentWatermark(watermarkTimestamp);
            }
            this.metricGroup.watermarkEmitted(watermarkTimestamp);
            this.output.emitWatermark(watermark);
        }

        @Override
        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
            this.output.emitWatermarkStatus(watermarkStatus);
        }
    }
}

