package org.apache.flink.runtime.operators.coordination;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.runtime.util.Runnables;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.class */
public class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway {
    private static final String EVENT_LOSS_ERROR_MESSAGE = "An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency. Event: '%s', targetTask: %s";
    private static final long NO_CHECKPOINT = Long.MIN_VALUE;
    private final SubtaskAccess subtaskAccess;
    private final ComponentMainThreadExecutor mainThreadExecutor;
    private final IncompleteFuturesTracker incompleteFuturesTracker;
    private final TreeMap<Long, List<BlockedEvent>> blockedEventsMap = new TreeMap<>();
    private final TreeSet<Long> currentMarkedCheckpointIds = new TreeSet<>();
    private long latestAttemptedCheckpointId = NO_CHECKPOINT;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl$BlockedEvent.class */
    public static final class BlockedEvent {
        private final Callable<CompletableFuture<Acknowledge>> sendAction;
        private final CompletableFuture<Acknowledge> future;

        BlockedEvent(Callable<CompletableFuture<Acknowledge>> callable, CompletableFuture<Acknowledge> completableFuture) {
            this.sendAction = callable;
            this.future = completableFuture;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubtaskGatewayImpl(SubtaskAccess subtaskAccess, ComponentMainThreadExecutor componentMainThreadExecutor, IncompleteFuturesTracker incompleteFuturesTracker) {
        this.subtaskAccess = subtaskAccess;
        this.mainThreadExecutor = componentMainThreadExecutor;
        this.incompleteFuturesTracker = incompleteFuturesTracker;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway
    public CompletableFuture<Acknowledge> sendEvent(OperatorEvent operatorEvent) {
        if (!isReady()) {
            throw new FlinkRuntimeException("SubtaskGateway is not ready, task not yet running.");
        }
        try {
            Callable<CompletableFuture<Acknowledge>> createEventSendAction = this.subtaskAccess.createEventSendAction(new SerializedValue<>(operatorEvent));
            CompletableFuture completableFuture = new CompletableFuture();
            CompletableFuture<Acknowledge> whenCompleteAsync = completableFuture.whenCompleteAsync((acknowledge, th) -> {
                if (th == null || !this.subtaskAccess.isStillRunning()) {
                    return;
                }
                String format = String.format(EVENT_LOSS_ERROR_MESSAGE, operatorEvent, this.subtaskAccess.subtaskName());
                Runnables.assertNoException(() -> {
                    this.subtaskAccess.triggerTaskFailover(new FlinkException(format, th));
                });
            }, (Executor) this.mainThreadExecutor);
            this.mainThreadExecutor.execute(() -> {
                sendEventInternal(createEventSendAction, completableFuture);
                this.incompleteFuturesTracker.trackFutureWhileIncomplete(whenCompleteAsync);
            });
            return whenCompleteAsync;
        } catch (IOException e) {
            throw new FlinkRuntimeException("Cannot serialize operator event", e);
        }
    }

    private void sendEventInternal(Callable<CompletableFuture<Acknowledge>> callable, CompletableFuture<Acknowledge> completableFuture) {
        checkRunsInMainThread();
        if (this.blockedEventsMap.isEmpty()) {
            callSendAction(callable, completableFuture);
        } else {
            this.blockedEventsMap.lastEntry().getValue().add(new BlockedEvent(callable, completableFuture));
        }
    }

    private void callSendAction(Callable<CompletableFuture<Acknowledge>> callable, CompletableFuture<Acknowledge> completableFuture) {
        try {
            FutureUtils.forward(callable.call(), completableFuture);
        } catch (Throwable th) {
            ExceptionUtils.rethrowIfFatalError(th);
            completableFuture.completeExceptionally(th);
        }
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway
    public ExecutionAttemptID getExecution() {
        return this.subtaskAccess.currentAttempt();
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway
    public int getSubtask() {
        return this.subtaskAccess.getSubtaskIndex();
    }

    private boolean isReady() {
        return this.subtaskAccess.hasSwitchedToRunning().isDone();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markForCheckpoint(long j) {
        checkRunsInMainThread();
        if (j > this.latestAttemptedCheckpointId) {
            this.currentMarkedCheckpointIds.add(Long.valueOf(j));
            this.latestAttemptedCheckpointId = j;
        } else if (j != -1) {
            throw new IllegalStateException(String.format("Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d", Long.valueOf(this.latestAttemptedCheckpointId), Long.valueOf(j)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean tryCloseGateway(long j) {
        checkRunsInMainThread();
        if (!this.currentMarkedCheckpointIds.contains(Long.valueOf(j))) {
            return false;
        }
        this.blockedEventsMap.putIfAbsent(Long.valueOf(j), new LinkedList());
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void openGatewayAndUnmarkCheckpoint(long j) {
        checkRunsInMainThread();
        if (this.latestAttemptedCheckpointId < j) {
            throw new IllegalStateException(String.format("Trying to open gateway for unseen checkpoint: latest known checkpoint = %d, incoming checkpoint = %d", Long.valueOf(this.latestAttemptedCheckpointId), Long.valueOf(j)));
        }
        if (this.currentMarkedCheckpointIds.contains(Long.valueOf(j))) {
            if (this.blockedEventsMap.containsKey(Long.valueOf(j))) {
                if (this.blockedEventsMap.firstKey().longValue() == j) {
                    for (BlockedEvent blockedEvent : this.blockedEventsMap.firstEntry().getValue()) {
                        callSendAction(blockedEvent.sendAction, blockedEvent.future);
                    }
                } else {
                    this.blockedEventsMap.floorEntry(Long.valueOf(j - 1)).getValue().addAll(this.blockedEventsMap.get(Long.valueOf(j)));
                }
                this.blockedEventsMap.remove(Long.valueOf(j));
            }
            this.currentMarkedCheckpointIds.remove(Long.valueOf(j));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void openGatewayAndUnmarkAllCheckpoint() {
        checkRunsInMainThread();
        Iterator<List<BlockedEvent>> it = this.blockedEventsMap.values().iterator();
        while (it.hasNext()) {
            for (BlockedEvent blockedEvent : it.next()) {
                callSendAction(blockedEvent.sendAction, blockedEvent.future);
            }
        }
        this.blockedEventsMap.clear();
        this.currentMarkedCheckpointIds.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void openGatewayAndUnmarkLastCheckpointIfAny() {
        if (this.currentMarkedCheckpointIds.isEmpty()) {
            return;
        }
        openGatewayAndUnmarkCheckpoint(this.currentMarkedCheckpointIds.last().longValue());
    }

    private void checkRunsInMainThread() {
        this.mainThreadExecutor.assertRunningInMainThread();
    }
}
