package org.apache.flink.runtime.source.coordinator;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.InternalSplitEnumeratorMetricGroup;
import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.ThrowableCatchingRunnable;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.class */
public class SourceCoordinatorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT>, SupportsIntermediateNoMoreSplits, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinatorContext.class);
    private final ScheduledExecutorService workerExecutor;
    private final ScheduledExecutorService coordinatorExecutor;
    private final ExecutorNotifier notifier;
    private final OperatorCoordinator.Context operatorCoordinatorContext;
    private final SimpleVersionedSerializer<SplitT> splitSerializer;
    private final ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>> registeredReaders;
    private final SplitAssignmentTracker<SplitT> assignmentTracker;
    private final SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory;
    private SubtaskGateways subtaskGateways;
    private final String coordinatorThreadName;
    private final boolean supportsConcurrentExecutionAttempts;
    private boolean[] subtaskHasNoMoreSplits;
    private volatile boolean closed;
    private volatile TernaryBoolean backlog;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext$SubtaskGateways.class */
    public static class SubtaskGateways {
        private final Map<Integer, OperatorCoordinator.SubtaskGateway>[] gateways;

        private SubtaskGateways(int i) {
            this.gateways = new Map[i];
            for (int i2 = 0; i2 < i; i2++) {
                this.gateways[i2] = new HashMap();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway subtaskGateway) {
            int subtask = subtaskGateway.getSubtask();
            int attemptNumber = subtaskGateway.getExecution().getAttemptNumber();
            Preconditions.checkState(!this.gateways[subtask].containsKey(Integer.valueOf(attemptNumber)), "Already have a subtask gateway for %s (#%s).", new Object[]{Integer.valueOf(subtask), Integer.valueOf(attemptNumber)});
            this.gateways[subtask].put(Integer.valueOf(attemptNumber), subtaskGateway);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void unregisterSubtaskGateway(int i, int i2) {
            this.gateways[i].remove(Integer.valueOf(i2));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public OperatorCoordinator.SubtaskGateway getOnlyGatewayAndCheckReady(int i) {
            Preconditions.checkState(this.gateways[i].size() > 0, "Subtask %s is not ready yet to receive events.", new Object[]{Integer.valueOf(i)});
            return (OperatorCoordinator.SubtaskGateway) Iterables.getOnlyElement(this.gateways[i].values());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public OperatorCoordinator.SubtaskGateway getOnlyGatewayAndNotCheckReady(int i) {
            if (this.gateways[i].size() > 0) {
                return (OperatorCoordinator.SubtaskGateway) Iterables.getOnlyElement(this.gateways[i].values());
            }
            return null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public OperatorCoordinator.SubtaskGateway getGatewayAndCheckReady(int i, int i2) {
            OperatorCoordinator.SubtaskGateway subtaskGateway = this.gateways[i].get(Integer.valueOf(i2));
            if (subtaskGateway != null) {
                return subtaskGateway;
            }
            throw new IllegalStateException(String.format("Subtask %d (#%d) is not ready yet to receive events.", Integer.valueOf(i), Integer.valueOf(i2)));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void reset(int i) {
            this.gateways[i].clear();
        }
    }

    public SourceCoordinatorContext(SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorExecutorThreadFactory, int i, OperatorCoordinator.Context context, SimpleVersionedSerializer<SplitT> simpleVersionedSerializer, boolean z) {
        this(Executors.newScheduledThreadPool(1, coordinatorExecutorThreadFactory), Executors.newScheduledThreadPool(i, new ExecutorThreadFactory(coordinatorExecutorThreadFactory.getCoordinatorThreadName() + "-worker")), coordinatorExecutorThreadFactory, context, simpleVersionedSerializer, new SplitAssignmentTracker(), z);
    }

    @VisibleForTesting
    SourceCoordinatorContext(ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorExecutorThreadFactory, OperatorCoordinator.Context context, SimpleVersionedSerializer<SplitT> simpleVersionedSerializer, SplitAssignmentTracker<SplitT> splitAssignmentTracker, boolean z) {
        this.backlog = TernaryBoolean.UNDEFINED;
        this.workerExecutor = scheduledExecutorService2;
        this.coordinatorExecutor = scheduledExecutorService;
        this.coordinatorThreadFactory = coordinatorExecutorThreadFactory;
        this.operatorCoordinatorContext = context;
        this.splitSerializer = simpleVersionedSerializer;
        this.registeredReaders = new ConcurrentHashMap();
        this.assignmentTracker = splitAssignmentTracker;
        this.coordinatorThreadName = coordinatorExecutorThreadFactory.getCoordinatorThreadName();
        this.supportsConcurrentExecutionAttempts = z;
        this.notifier = new ExecutorNotifier(scheduledExecutorService2, runnable -> {
            scheduledExecutorService.execute(new ThrowableCatchingRunnable(this::handleUncaughtExceptionFromAsyncCall, runnable));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isConcurrentExecutionAttemptsSupported() {
        return this.supportsConcurrentExecutionAttempts;
    }

    public SplitEnumeratorMetricGroup metricGroup() {
        return new InternalSplitEnumeratorMetricGroup(this.operatorCoordinatorContext.metricGroup());
    }

    public void sendEventToSourceReader(int i, SourceEvent sourceEvent) {
        checkAndLazyInitialize();
        Preconditions.checkState(!this.supportsConcurrentExecutionAttempts, "The split enumerator must invoke SplitEnumeratorContext#sendEventToSourceReader(int, int, SourceEvent) instead of SplitEnumeratorContext#sendEventToSourceReader(int, SourceEvent) to send custom source events in concurrent execution attempts scenario (e.g. if speculative execution is enabled).");
        checkSubtaskIndex(i);
        callInCoordinatorThread(() -> {
            this.subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(new SourceEventWrapper(sourceEvent));
            return null;
        }, String.format("Failed to send event %s to subtask %d", sourceEvent, Integer.valueOf(i)));
    }

    public void sendEventToSourceReader(int i, int i2, SourceEvent sourceEvent) {
        checkAndLazyInitialize();
        checkSubtaskIndex(i);
        callInCoordinatorThread(() -> {
            this.subtaskGateways.getGatewayAndCheckReady(i, i2).sendEvent(new SourceEventWrapper(sourceEvent));
            return null;
        }, String.format("Failed to send event %s to subtask %d (#%d)", sourceEvent, Integer.valueOf(i), Integer.valueOf(i2)));
    }

    void sendEventToSourceOperator(int i, OperatorEvent operatorEvent) {
        checkAndLazyInitialize();
        checkSubtaskIndex(i);
        callInCoordinatorThread(() -> {
            this.subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(operatorEvent);
            return null;
        }, String.format("Failed to send event %s to subtask %d", operatorEvent, Integer.valueOf(i)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendEventToSourceOperatorIfTaskReady(int i, OperatorEvent operatorEvent) {
        checkAndLazyInitialize();
        checkSubtaskIndex(i);
        callInCoordinatorThread(() -> {
            OperatorCoordinator.SubtaskGateway onlyGatewayAndNotCheckReady = this.subtaskGateways.getOnlyGatewayAndNotCheckReady(i);
            if (onlyGatewayAndNotCheckReady == null) {
                return null;
            }
            onlyGatewayAndNotCheckReady.sendEvent(operatorEvent);
            return null;
        }, String.format("Failed to send event %s to subtask %d", operatorEvent, Integer.valueOf(i)));
    }

    public int currentParallelism() {
        return this.operatorCoordinatorContext.currentParallelism();
    }

    public Map<Integer, ReaderInfo> registeredReaders() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<Integer, ConcurrentMap<Integer, ReaderInfo>> entry : this.registeredReaders.entrySet()) {
            int intValue = entry.getKey().intValue();
            ConcurrentMap<Integer, ReaderInfo> value = entry.getValue();
            int i = Integer.MAX_VALUE;
            Iterator<Integer> it = value.keySet().iterator();
            while (it.hasNext()) {
                int intValue2 = it.next().intValue();
                if (intValue2 < i) {
                    i = intValue2;
                }
            }
            hashMap.put(Integer.valueOf(intValue), value.get(Integer.valueOf(i)));
        }
        return Collections.unmodifiableMap(hashMap);
    }

    public Map<Integer, Map<Integer, ReaderInfo>> registeredReadersOfAttempts() {
        return Collections.unmodifiableMap(this.registeredReaders);
    }

    public void assignSplits(SplitsAssignment<SplitT> splitsAssignment) {
        callInCoordinatorThread(() -> {
            splitsAssignment.assignment().forEach((num, list) -> {
                if (!this.registeredReaders.containsKey(num)) {
                    throw new IllegalArgumentException(String.format("Cannot assign splits %s to subtask %d because the subtask is not registered.", list, num));
                }
            });
            this.assignmentTracker.recordSplitAssignment(splitsAssignment);
            assignSplitsToAttempts(splitsAssignment);
            return null;
        }, String.format("Failed to assign splits %s due to ", splitsAssignment));
    }

    public void signalNoMoreSplits(int i) {
        checkSubtaskIndex(i);
        callInCoordinatorThread(() -> {
            this.subtaskHasNoMoreSplits[i] = true;
            signalNoMoreSplitsToAttempts(i);
            return null;
        }, "Failed to send 'NoMoreSplits' to reader " + i);
    }

    public void signalIntermediateNoMoreSplits(int i) {
        checkSubtaskIndex(i);
        callInCoordinatorThread(() -> {
            signalNoMoreSplitsToAttempts(i);
            return null;
        }, "Failed to send 'IntermediateNoMoreSplits' to reader " + i);
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer, long j, long j2) {
        this.notifier.notifyReadyAsync(callable, biConsumer, j, j2);
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer) {
        this.notifier.notifyReadyAsync(callable, biConsumer);
    }

    public void runInCoordinatorThread(Runnable runnable) {
        this.coordinatorExecutor.execute(new ThrowableCatchingRunnable(th -> {
            this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), th);
        }, runnable));
    }

    @Override // java.lang.AutoCloseable
    public void close() throws InterruptedException {
        this.closed = true;
        ComponentClosingUtils.shutdownExecutorForcefully(this.workerExecutor, Duration.ofNanos(Long.MAX_VALUE));
        ComponentClosingUtils.shutdownExecutorForcefully(this.coordinatorExecutor, Duration.ofNanos(Long.MAX_VALUE));
    }

    @VisibleForTesting
    boolean isClosed() {
        return this.closed;
    }

    public void setIsProcessingBacklog(boolean z) {
        CheckpointCoordinator checkpointCoordinator = getCoordinatorContext().getCheckpointCoordinator();
        OperatorID operatorId = getCoordinatorContext().getOperatorId();
        if (checkpointCoordinator != null) {
            checkpointCoordinator.setIsProcessingBacklog(operatorId, z);
        }
        this.backlog = TernaryBoolean.fromBoolean(z);
        callInCoordinatorThread(() -> {
            IsProcessingBacklogEvent isProcessingBacklogEvent = new IsProcessingBacklogEvent(z);
            for (int i = 0; i < getCoordinatorContext().currentParallelism(); i++) {
                sendEventToSourceOperatorIfTaskReady(i, isProcessingBacklogEvent);
            }
            return null;
        }, "Failed to send BacklogEvent to reader.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void attemptReady(OperatorCoordinator.SubtaskGateway subtaskGateway) {
        checkAndLazyInitialize();
        Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
        this.subtaskGateways.registerSubtaskGateway(subtaskGateway);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void attemptFailed(int i, int i2) {
        checkAndLazyInitialize();
        Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
        this.subtaskGateways.unregisterSubtaskGateway(i, i2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void subtaskReset(int i) {
        checkAndLazyInitialize();
        Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
        this.subtaskGateways.reset(i);
        this.registeredReaders.remove(Integer.valueOf(i));
        this.subtaskHasNoMoreSplits[i] = false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasNoMoreSplits(int i) {
        checkAndLazyInitialize();
        return this.subtaskHasNoMoreSplits[i];
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void failJob(Throwable th) {
        this.operatorCoordinatorContext.failJob(th);
    }

    void handleUncaughtExceptionFromAsyncCall(Throwable th) {
        if (this.closed) {
            return;
        }
        ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
        LOG.error("Exception while handling result from async call in {}. Triggering job failover.", this.coordinatorThreadName, th);
        failJob(th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onCheckpoint(long j) throws Exception {
        this.assignmentTracker.onCheckpoint(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerSourceReader(int i, int i2, String str) {
        ConcurrentMap<Integer, ReaderInfo> computeIfAbsent = this.registeredReaders.computeIfAbsent(Integer.valueOf(i), num -> {
            return new ConcurrentHashMap();
        });
        Preconditions.checkState(!computeIfAbsent.containsKey(Integer.valueOf(i2)), "ReaderInfo of subtask %s (#%s) already exists.", new Object[]{Integer.valueOf(i), Integer.valueOf(i2)});
        computeIfAbsent.put(Integer.valueOf(i2), new ReaderInfo(i, str));
        sendCachedSplitsToNewlyRegisteredReader(i, i2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unregisterSourceReader(int i, int i2) {
        ConcurrentMap<Integer, ReaderInfo> concurrentMap = this.registeredReaders.get(Integer.valueOf(i));
        if (concurrentMap != null) {
            concurrentMap.remove(Integer.valueOf(i2));
            if (concurrentMap.isEmpty()) {
                this.registeredReaders.remove(Integer.valueOf(i));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SplitT> getAndRemoveUncheckpointedAssignment(int i, long j) {
        return this.assignmentTracker.getAndRemoveUncheckpointedAssignment(i, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onCheckpointComplete(long j) {
        this.assignmentTracker.onCheckpointComplete(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OperatorCoordinator.Context getCoordinatorContext() {
        return this.operatorCoordinatorContext;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SplitAssignmentTracker<SplitT> getAssignmentTracker() {
        return this.assignmentTracker;
    }

    Future<?> submitTask(Runnable runnable) {
        return this.coordinatorExecutor.submit(runnable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ScheduledFuture<?> schedulePeriodTask(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
        return this.coordinatorExecutor.scheduleAtFixedRate(() -> {
            try {
                runnable.run();
            } catch (Throwable th) {
                handleUncaughtExceptionFromAsyncCall(th);
            }
        }, j, j2, timeUnit);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<?> supplyAsync(Supplier<?> supplier) {
        return CompletableFuture.supplyAsync(supplier, this.coordinatorExecutor);
    }

    private void checkSubtaskIndex(int i) {
        if (i < 0 || i >= getCoordinatorContext().currentParallelism()) {
            throw new IllegalArgumentException(String.format("Subtask index %d is out of bounds [0, %s)", Integer.valueOf(i), Integer.valueOf(getCoordinatorContext().currentParallelism())));
        }
    }

    private void checkAndLazyInitialize() {
        if (this.subtaskGateways == null) {
            int currentParallelism = this.operatorCoordinatorContext.currentParallelism();
            Preconditions.checkState(currentParallelism != -1);
            this.subtaskGateways = new SubtaskGateways(currentParallelism);
            this.subtaskHasNoMoreSplits = new boolean[currentParallelism];
            Arrays.fill(this.subtaskHasNoMoreSplits, false);
        }
    }

    private <V> V callInCoordinatorThread(Callable<V> callable, String str) {
        if (this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
            try {
                return callable.call();
            } catch (Throwable th) {
                LOG.error("Uncaught Exception in Source Coordinator Executor", th);
                throw new FlinkRuntimeException(str, th);
            }
        }
        try {
            return (V) this.coordinatorExecutor.submit(() -> {
                try {
                    return callable.call();
                } catch (Throwable th2) {
                    LOG.error("Uncaught Exception in Source Coordinator Executor", th2);
                    ExceptionUtils.rethrowException(th2);
                    return null;
                }
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new FlinkRuntimeException(str, e);
        }
    }

    private void assignSplitsToAttempts(SplitsAssignment<SplitT> splitsAssignment) {
        splitsAssignment.assignment().forEach((num, list) -> {
            assignSplitsToAttempts(num.intValue(), list);
        });
    }

    private void assignSplitsToAttempts(int i, List<SplitT> list) {
        getRegisteredAttempts(i).forEach(num -> {
            assignSplitsToAttempt(i, num.intValue(), list);
        });
    }

    private void assignSplitsToAttempt(int i, int i2, List<SplitT> list) {
        checkAndLazyInitialize();
        if (list.isEmpty()) {
            return;
        }
        checkAttemptReaderReady(i, i2);
        try {
            this.subtaskGateways.getGatewayAndCheckReady(i, i2).sendEvent(new AddSplitEvent(list, this.splitSerializer));
        } catch (IOException e) {
            throw new FlinkRuntimeException("Failed to serialize splits.", e);
        }
    }

    private void signalNoMoreSplitsToAttempts(int i) {
        getRegisteredAttempts(i).forEach(num -> {
            signalNoMoreSplitsToAttempt(i, num.intValue());
        });
    }

    private void signalNoMoreSplitsToAttempt(int i, int i2) {
        checkAndLazyInitialize();
        checkAttemptReaderReady(i, i2);
        this.subtaskGateways.getGatewayAndCheckReady(i, i2).sendEvent(new NoMoreSplitsEvent());
    }

    private void checkAttemptReaderReady(int i, int i2) {
        Preconditions.checkState(this.registeredReaders.containsKey(Integer.valueOf(i)));
        Preconditions.checkState(getRegisteredAttempts(i).contains(Integer.valueOf(i2)));
    }

    private Set<Integer> getRegisteredAttempts(int i) {
        return this.registeredReaders.get(Integer.valueOf(i)).keySet();
    }

    private void sendCachedSplitsToNewlyRegisteredReader(int i, int i2) {
        LinkedHashSet<SplitT> linkedHashSet = this.assignmentTracker.uncheckpointedAssignments().get(Integer.valueOf(i));
        if (linkedHashSet != null) {
            if (!this.supportsConcurrentExecutionAttempts) {
                throw new IllegalStateException("No cached split is expected.");
            }
            assignSplitsToAttempt(i, i2, new ArrayList(linkedHashSet));
            if (hasNoMoreSplits(i)) {
                signalNoMoreSplitsToAttempt(i, i2);
            }
        }
    }

    public TernaryBoolean isBacklog() {
        return this.backlog;
    }
}
