package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

@Internal
@Deprecated
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTask.class */
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>> extends StreamTask<OUT, OP> {
    private final SourceStreamTask<OUT, SRC, OP>.LegacySourceFunctionThread sourceThread;
    private final Object lock;
    private volatile boolean externallyInducedCheckpoints;
    private final AtomicBoolean stopped;
    private volatile FinishingReason finishingReason;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTask$FinishingReason.class */
    public enum FinishingReason {
        END_OF_DATA(StopMode.DRAIN),
        STOP_WITH_SAVEPOINT_DRAIN(StopMode.DRAIN),
        STOP_WITH_SAVEPOINT_NO_DRAIN(StopMode.NO_DRAIN);

        private final StopMode stopMode;

        FinishingReason(StopMode stopMode) {
            this.stopMode = stopMode;
        }

        StopMode toStopMode() {
            return this.stopMode;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTask$LegacySourceFunctionThread.class */
    public class LegacySourceFunctionThread extends Thread {
        private final CompletableFuture<Void> completionFuture = new CompletableFuture<>();

        LegacySourceFunctionThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                if (!SourceStreamTask.this.operatorChain.isTaskDeployedAsFinished()) {
                    StreamTask.LOG.debug("Legacy source {} skip execution since the task is finished on restore", SourceStreamTask.this.getTaskNameWithSubtaskAndId());
                    ((StreamSource) SourceStreamTask.this.mainOperator).run(SourceStreamTask.this.lock, SourceStreamTask.this.operatorChain);
                }
                completeProcessing();
                this.completionFuture.complete(null);
            } catch (Throwable th) {
                if (SourceStreamTask.this.isCanceled() && ExceptionUtils.findThrowable(th, InterruptedException.class).isPresent()) {
                    this.completionFuture.completeExceptionally(new CancelTaskException(th));
                } else {
                    this.completionFuture.completeExceptionally(th);
                }
            }
        }

        private void completeProcessing() throws InterruptedException, ExecutionException {
            if (SourceStreamTask.this.isCanceled() || SourceStreamTask.this.isFailing()) {
                return;
            }
            SourceStreamTask.this.mainMailboxExecutor.submit(() -> {
                StopMode stopMode = SourceStreamTask.this.finishingReason.toStopMode();
                if (stopMode == StopMode.DRAIN) {
                    SourceStreamTask.this.operatorChain.endInput(1);
                }
                SourceStreamTask.this.endData(stopMode);
            }, "SourceStreamTask finished processing data.").get();
        }

        public void setTaskDescription(String str) {
            setName("Legacy Source Thread - " + str);
        }

        CompletableFuture<Void> getCompletionFuture() {
            return (!SourceStreamTask.this.isFailing() || isAlive()) ? this.completionFuture : CompletableFuture.completedFuture(null);
        }
    }

    public SourceStreamTask(Environment environment) throws Exception {
        this(environment, new Object());
    }

    private SourceStreamTask(Environment environment, Object obj) throws Exception {
        super(environment, null, FatalExitExceptionHandler.INSTANCE, StreamTaskActionExecutor.synchronizedExecutor(obj));
        this.stopped = new AtomicBoolean(false);
        this.finishingReason = FinishingReason.END_OF_DATA;
        this.lock = Preconditions.checkNotNull(obj);
        this.sourceThread = new LegacySourceFunctionThread();
        getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void init() {
        SourceFunction sourceFunction = (SourceFunction) ((StreamSource) this.mainOperator).getUserFunction();
        if (sourceFunction instanceof ExternallyInducedSource) {
            this.externallyInducedCheckpoints = true;
            ((ExternallyInducedSource) sourceFunction).setCheckpointTrigger(new ExternallyInducedSource.CheckpointTrigger() { // from class: org.apache.flink.streaming.runtime.tasks.SourceStreamTask.1
                @Override // org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource.CheckpointTrigger
                public void triggerCheckpoint(long j) throws FlinkException {
                    CheckpointOptions forConfig = CheckpointOptions.forConfig(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), SourceStreamTask.this.configuration.isExactlyOnceCheckpointMode(), SourceStreamTask.this.configuration.isUnalignedCheckpointsEnabled(), SourceStreamTask.this.configuration.getAlignedCheckpointTimeout().toMillis());
                    long currentTimeMillis = System.currentTimeMillis();
                    try {
                        SourceStreamTask.super.triggerCheckpointAsync(new CheckpointMetaData(j, currentTimeMillis, currentTimeMillis), forConfig).get();
                    } catch (RuntimeException e) {
                        throw e;
                    } catch (Exception e2) {
                        throw new FlinkException(e2.getMessage(), e2);
                    }
                }
            });
        }
        getEnvironment().getMetricGroup().getIOMetricGroup().gauge("checkpointStartDelayNanos", this::getAsyncCheckpointStartDelayNanos);
        this.recordWriter.setMaxOverdraftBuffersPerGate(0);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void advanceToEndOfEventTime() throws Exception {
        this.operatorChain.getMainOperatorOutput().emitWatermark(Watermark.MAX_WATERMARK);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cleanUpInternal() {
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        controller.suspendDefaultAction();
        this.sourceThread.setTaskDescription(getName());
        this.sourceThread.start();
        this.sourceThread.getCompletionFuture().whenComplete((r4, th) -> {
            if (th != null) {
                this.mailboxProcessor.reportThrowable(th);
            } else {
                notifyEndOfData();
                this.mailboxProcessor.suspend();
            }
        });
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cancelTask() {
        if (this.stopped.compareAndSet(false, true)) {
            cancelOperator();
        }
    }

    private void cancelOperator() {
        boolean isAlive;
        try {
            if (this.mainOperator != 0) {
                ((StreamSource) this.mainOperator).cancel();
            }
            if (isAlive) {
                return;
            }
        } finally {
            if (this.sourceThread.isAlive()) {
                interruptSourceThread();
            } else if (!this.sourceThread.getCompletionFuture().isDone()) {
                this.sourceThread.getCompletionFuture().complete(null);
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void maybeInterruptOnCancel(Thread thread, @Nullable String str, @Nullable Long l) {
        super.maybeInterruptOnCancel(thread, str, l);
        interruptSourceThread();
    }

    private void interruptSourceThread() {
        if ((this.operatorChain == null || !this.operatorChain.isTaskDeployedAsFinished()) && this.sourceThread.isAlive()) {
            this.sourceThread.interrupt();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected CompletableFuture<Void> getCompletionFuture() {
        return this.sourceThread.getCompletionFuture();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        CompletableFuture<Boolean> completedFuture;
        if (!this.externallyInducedCheckpoints) {
            return isSynchronousSavepoint(checkpointOptions.getCheckpointType()) ? triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions) : super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
        }
        if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
            throw new IllegalStateException("Using externally induced sources, we can not enforce taking a full checkpoint.If you are restoring from a snapshot in NO_CLAIM mode, please use CLAIM mode.");
        }
        synchronized (this.lock) {
            completedFuture = CompletableFuture.completedFuture(Boolean.valueOf(isRunning()));
        }
        return completedFuture;
    }

    private boolean isSynchronousSavepoint(SnapshotType snapshotType) {
        return snapshotType.isSavepoint() && ((SavepointType) snapshotType).isSynchronous();
    }

    private CompletableFuture<Boolean> triggerStopWithSavepointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        this.mainMailboxExecutor.execute(() -> {
            stopOperatorForStopWithSavepoint(checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType().shouldDrain());
        }, "stop legacy source for stop-with-savepoint --drain");
        return this.sourceThread.getCompletionFuture().thenCompose(r7 -> {
            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
        });
    }

    private void stopOperatorForStopWithSavepoint(long j, boolean z) {
        setSynchronousSavepoint(j);
        this.finishingReason = z ? FinishingReason.STOP_WITH_SAVEPOINT_DRAIN : FinishingReason.STOP_WITH_SAVEPOINT_NO_DRAIN;
        if (this.mainOperator != 0) {
            ((StreamSource) this.mainOperator).stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void declineCheckpoint(long j) {
        if (this.externallyInducedCheckpoints) {
            return;
        }
        super.declineCheckpoint(j);
    }
}
