package org.apache.flink.streaming.runtime.tasks;

import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskExternallyInducedSourceInput;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.class */
public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, ?>> {
    private AsyncDataOutputToOutput<T> output;
    private SortedMap<Long, UntriggeredCheckpoint> untriggeredCheckpoints;
    private SortedSet<Long> triggeredCheckpoints;
    private CompletableFuture<Void> waitForRPC;
    private StreamTaskExternallyInducedSourceInput<T> externallyInducedSourceInput;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask$AsyncDataOutputToOutput.class */
    public static class AsyncDataOutputToOutput<T> implements PushingAsyncDataInput.DataOutput<T> {
        private final Output<StreamRecord<T>> output;
        private final InternalSourceReaderMetricGroup metricGroup;

        @Nullable
        private final WatermarkGauge inputWatermarkGauge;

        public AsyncDataOutputToOutput(Output<StreamRecord<T>> output, InternalSourceReaderMetricGroup internalSourceReaderMetricGroup, @Nullable WatermarkGauge watermarkGauge) {
            this.output = (Output) Preconditions.checkNotNull(output);
            this.inputWatermarkGauge = watermarkGauge;
            this.metricGroup = internalSourceReaderMetricGroup;
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitRecord(StreamRecord<T> streamRecord) {
            this.metricGroup.recordEmitted(streamRecord.getTimestamp());
            this.output.collect(streamRecord);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            this.output.emitLatencyMarker(latencyMarker);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitRecordAttributes(RecordAttributes recordAttributes) {
            this.output.emitRecordAttributes(recordAttributes);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermark(Watermark watermark) {
            long timestamp = watermark.getTimestamp();
            if (this.inputWatermarkGauge != null) {
                this.inputWatermarkGauge.setCurrentWatermark(timestamp);
            }
            this.metricGroup.watermarkEmitted(timestamp);
            this.output.emitWatermark(watermark);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
            this.output.emitWatermarkStatus(watermarkStatus);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask$UntriggeredCheckpoint.class */
    public static class UntriggeredCheckpoint {
        private final CheckpointMetaData metadata;
        private final CheckpointOptions checkpointOptions;

        private UntriggeredCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            this.metadata = checkpointMetaData;
            this.checkpointOptions = checkpointOptions;
        }

        public CheckpointMetaData getMetadata() {
            return this.metadata;
        }

        public CheckpointOptions getCheckpointOptions() {
            return this.checkpointOptions;
        }
    }

    public SourceOperatorStreamTask(Environment environment) throws Exception {
        super(environment);
        this.untriggeredCheckpoints = new TreeMap();
        this.triggeredCheckpoints = new TreeSet();
        this.waitForRPC = FutureUtils.completedVoidFuture();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void init() throws Exception {
        StreamTaskSourceInput streamTaskSourceInput;
        SourceOperator sourceOperator = (SourceOperator) this.mainOperator;
        sourceOperator.initReader();
        SourceReader sourceReader = sourceOperator.getSourceReader();
        if (this.operatorChain.isTaskDeployedAsFinished()) {
            streamTaskSourceInput = new StreamTaskFinishedOnRestoreSourceInput(sourceOperator, 0, 0);
        } else if (sourceReader instanceof ExternallyInducedSourceReader) {
            this.externallyInducedSourceInput = new StreamTaskExternallyInducedSourceInput<>(sourceOperator, (v1) -> {
                triggerCheckpointForExternallyInducedSource(v1);
            }, 0, 0);
            streamTaskSourceInput = this.externallyInducedSourceInput;
        } else {
            streamTaskSourceInput = new StreamTaskSourceInput(sourceOperator, 0, 0);
        }
        this.output = new AsyncDataOutputToOutput<>(this.operatorChain.getMainOperatorOutput(), sourceOperator.getSourceMetricGroup(), null);
        this.inputProcessor = new StreamOneInputProcessor(streamTaskSourceInput, this.output, this.operatorChain);
        getEnvironment().getMetricGroup().getIOMetricGroup().gauge(MetricNames.CHECKPOINT_START_DELAY_TIME, this::getAsyncCheckpointStartDelayNanos);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask, org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask
    public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        if (!isExternallyInducedSource()) {
            return triggerCheckpointNowAsync(checkpointMetaData, checkpointOptions);
        }
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        this.mainMailboxExecutor.execute(() -> {
            triggerCheckpointOnExternallyInducedSource(checkpointMetaData, checkpointOptions, completableFuture);
        }, "SourceOperatorStreamTask#triggerCheckpointAsync(%s, %s)", new Object[]{checkpointMetaData, checkpointOptions});
        return completableFuture;
    }

    private boolean isExternallyInducedSource() {
        return this.externallyInducedSourceInput != null;
    }

    private void triggerCheckpointOnExternallyInducedSource(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CompletableFuture<Boolean> completableFuture) {
        if (!$assertionsDisabled && !this.mailboxProcessor.isMailboxThread()) {
            throw new AssertionError();
        }
        if (this.triggeredCheckpoints.remove(Long.valueOf(checkpointMetaData.getCheckpointId()))) {
            FutureUtils.forward(triggerCheckpointNowAsync(checkpointMetaData, checkpointOptions), completableFuture);
            cleanupOldCheckpoints(checkpointMetaData.getCheckpointId());
        } else {
            this.untriggeredCheckpoints.put(Long.valueOf(checkpointMetaData.getCheckpointId()), new UntriggeredCheckpoint(checkpointMetaData, checkpointOptions));
            completableFuture.complete(Boolean.valueOf(isRunning()));
        }
    }

    private CompletableFuture<Boolean> triggerCheckpointNowAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        return isSynchronous(checkpointOptions.getCheckpointType()) ? triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions) : super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
    }

    private boolean isSynchronous(SnapshotType snapshotType) {
        return snapshotType.isSavepoint() && ((SavepointType) snapshotType).isSynchronous();
    }

    private CompletableFuture<Boolean> triggerStopWithSavepointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.mainMailboxExecutor.execute(() -> {
            setSynchronousSavepoint(checkpointMetaData.getCheckpointId());
            FutureUtils.forward(((SourceOperator) this.mainOperator).stop(((SavepointType) checkpointOptions.getCheckpointType()).shouldDrain() ? StopMode.DRAIN : StopMode.NO_DRAIN), completableFuture);
        }, "stop Flip-27 source for stop-with-savepoint");
        return completableFuture.thenCompose(r7 -> {
            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
        });
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void advanceToEndOfEventTime() {
        this.output.emitWatermark(Watermark.MAX_WATERMARK);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void declineCheckpoint(long j) {
        cleanupCheckpoint(j);
        super.declineCheckpoint(j);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask, org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask
    public Future<Void> notifyCheckpointAbortAsync(long j, long j2) {
        this.mainMailboxExecutor.execute(() -> {
            cleanupCheckpoint(j);
        }, "Cleanup checkpoint %d", new Object[]{Long.valueOf(j)});
        return super.notifyCheckpointAbortAsync(j, j2);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask, org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask
    public Future<Void> notifyCheckpointSubsumedAsync(long j) {
        this.mainMailboxExecutor.execute(() -> {
            cleanupCheckpoint(j);
        }, "Cleanup checkpoint %d", new Object[]{Long.valueOf(j)});
        return super.notifyCheckpointSubsumedAsync(j);
    }

    private void triggerCheckpointForExternallyInducedSource(long j) {
        UntriggeredCheckpoint remove = this.untriggeredCheckpoints.remove(Long.valueOf(j));
        if (remove != null) {
            triggerCheckpointNowAsync(remove.getMetadata(), remove.getCheckpointOptions());
            cleanupOldCheckpoints(j);
            return;
        }
        this.triggeredCheckpoints.add(Long.valueOf(j));
        if (this.waitForRPC.isDone()) {
            this.waitForRPC = new CompletableFuture<>();
            this.externallyInducedSourceInput.blockUntil(this.waitForRPC);
        }
    }

    private void cleanupOldCheckpoints(long j) {
        if (!$assertionsDisabled && !this.mailboxProcessor.isMailboxThread()) {
            throw new AssertionError();
        }
        this.triggeredCheckpoints.headSet(Long.valueOf(j)).clear();
        this.untriggeredCheckpoints.headMap(Long.valueOf(j)).clear();
        maybeResumeProcessing();
    }

    private void maybeResumeProcessing() {
        if (!$assertionsDisabled && !this.mailboxProcessor.isMailboxThread()) {
            throw new AssertionError();
        }
        if (this.triggeredCheckpoints.isEmpty()) {
            this.waitForRPC.complete(null);
        }
    }

    private void cleanupCheckpoint(long j) {
        if (!$assertionsDisabled && !this.mailboxProcessor.isMailboxThread()) {
            throw new AssertionError();
        }
        this.triggeredCheckpoints.remove(Long.valueOf(j));
        this.untriggeredCheckpoints.remove(Long.valueOf(j));
        maybeResumeProcessing();
    }

    static {
        $assertionsDisabled = !SourceOperatorStreamTask.class.desiredAssertionStatus();
    }
}
