package org.apache.flink.runtime.asyncprocessing;

import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.core.state.InternalStateFuture;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.class */
public class AsyncExecutionController<K> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class);
    public static final int DEFAULT_BATCH_SIZE = 1000;
    public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
    private final int batchSize;
    private final int maxInFlightRecordNum;
    private final MailboxExecutor mailboxExecutor;
    final KeyAccountingUnit<K> keyAccountingUnit;
    private final StateFutureFactory<K> stateFutureFactory;
    final StateExecutor stateExecutor;
    RecordContext<K> currentContext;
    StateRequestBuffer<K> stateRequestsBuffer;
    final AtomicInteger inFlightRecordNum;

    public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
        this(mailboxExecutor, stateExecutor, 1000, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
    }

    public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int i, int i2) {
        this.keyAccountingUnit = new KeyAccountingUnit<>(i2);
        this.mailboxExecutor = mailboxExecutor;
        this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
        this.stateExecutor = stateExecutor;
        this.batchSize = i;
        this.maxInFlightRecordNum = i2;
        this.stateRequestsBuffer = new StateRequestBuffer<>();
        this.inFlightRecordNum = new AtomicInteger(0);
        LOG.info("Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}", Integer.valueOf(i), Integer.valueOf(i2));
    }

    public RecordContext<K> buildContext(Object obj, K k) {
        return new RecordContext<>(obj, k, this::disposeContext);
    }

    public void setCurrentContext(RecordContext<K> recordContext) {
        this.currentContext = recordContext;
    }

    public void disposeContext(RecordContext<K> recordContext) {
        this.keyAccountingUnit.release(recordContext.getRecord(), recordContext.getKey());
        this.inFlightRecordNum.decrementAndGet();
        RecordContext<K> tryActivateOneByKey = this.stateRequestsBuffer.tryActivateOneByKey(recordContext.getKey());
        if (tryActivateOneByKey != null) {
            Preconditions.checkState(tryOccupyKey(tryActivateOneByKey), String.format("key(%s) is already occupied.", tryActivateOneByKey.getKey()));
        }
    }

    boolean tryOccupyKey(RecordContext<K> recordContext) {
        boolean isKeyOccupied = recordContext.isKeyOccupied();
        if (!isKeyOccupied && this.keyAccountingUnit.occupy(recordContext.getRecord(), recordContext.getKey())) {
            recordContext.setKeyOccupied();
            isKeyOccupied = true;
        }
        return isKeyOccupied;
    }

    public <IN, OUT> InternalStateFuture<OUT> handleRequest(@Nullable State state, StateRequestType stateRequestType, @Nullable IN in) {
        InternalStateFuture<OUT> create = this.stateFutureFactory.create(this.currentContext);
        StateRequest<K, IN, OUT> stateRequest = new StateRequest<>(state, stateRequestType, in, create, this.currentContext);
        seizeCapacity();
        if (tryOccupyKey(this.currentContext)) {
            insertActiveBuffer(stateRequest);
        } else {
            insertBlockingBuffer(stateRequest);
        }
        triggerIfNeeded(false);
        return create;
    }

    <IN, OUT> void insertActiveBuffer(StateRequest<K, IN, OUT> stateRequest) {
        this.stateRequestsBuffer.enqueueToActive(stateRequest);
    }

    <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> stateRequest) {
        this.stateRequestsBuffer.enqueueToBlocking(stateRequest);
    }

    void triggerIfNeeded(boolean z) {
        if (z || this.stateRequestsBuffer.activeQueueSize() >= this.batchSize) {
            this.stateExecutor.executeBatchRequests(this.stateRequestsBuffer.popActive(this.batchSize));
        }
    }

    private void seizeCapacity() {
        if (this.currentContext.isKeyOccupied()) {
            return;
        }
        RecordContext<K> recordContext = this.currentContext;
        while (this.inFlightRecordNum.get() > this.maxInFlightRecordNum) {
            try {
                if (!this.mailboxExecutor.tryYield()) {
                    triggerIfNeeded(true);
                    Thread.sleep(1L);
                }
            } catch (InterruptedException e) {
            }
        }
        setCurrentContext(recordContext);
        this.inFlightRecordNum.incrementAndGet();
    }

    public void syncPointRequestWithCallback(ThrowingRunnable<Exception> throwingRunnable) {
        handleRequest(null, StateRequestType.SYNC_POINT, null).thenAccept(obj -> {
            try {
                throwingRunnable.run();
            } catch (Exception e) {
                throw new FlinkRuntimeException("Unexpected runtime exception", e);
            }
        });
    }
}
