/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing;

import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.asyncprocessing.AsyncFuture;
import org.apache.flink.core.asyncprocessing.AsyncFutureImpl;
import org.apache.flink.core.asyncprocessing.InternalAsyncFuture;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutor;
import org.apache.flink.runtime.asyncprocessing.AsyncFutureFactory;
import org.apache.flink.runtime.asyncprocessing.AsyncRequest;
import org.apache.flink.runtime.asyncprocessing.AsyncRequestBuffer;
import org.apache.flink.runtime.asyncprocessing.CallbackRunnerWrapper;
import org.apache.flink.runtime.asyncprocessing.EpochManager;
import org.apache.flink.runtime.asyncprocessing.KeyAccountingUnit;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.SyncPointRequest;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncExecutionController<K, REQUEST extends AsyncRequest<?>>
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class);
    private static final long DEFAULT_BUFFER_TIMEOUT_CHECK_INTERVAL = 100L;
    private final int batchSize;
    final CallbackRunnerWrapper callbackRunner;
    private final long bufferTimeout;
    private final long bufferTimeoutCheckInterval = 100L;
    private final int maxInFlightRecordNum;
    final MailboxExecutor mailboxExecutor;
    final AsyncFutureImpl.AsyncFrameworkExceptionHandler exceptionHandler;
    final KeyAccountingUnit<K> keyAccountingUnit;
    final AsyncFutureFactory<K> asyncFutureFactory;
    final AsyncExecutor<REQUEST> asyncExecutor;
    final DeclarationManager declarationManager;
    RecordContext<K> currentContext;
    AsyncRequestBuffer<K> asyncRequestsBuffer;
    final AtomicInteger inFlightRecordNum;
    final int maxParallelism;
    final EpochManager epochManager;
    final SwitchContextListener<K> switchContextListener;
    private final EpochManager.ParallelMode epochParallelMode;
    private final Object notifyLock = new Object();
    private volatile boolean waitingMail = false;
    private int drainDepth = 0;

    public AsyncExecutionController(MailboxExecutor mailboxExecutor, AsyncFutureImpl.AsyncFrameworkExceptionHandler exceptionHandler, AsyncExecutor<REQUEST> asyncExecutor, DeclarationManager declarationManager, EpochManager.ParallelMode epochParallelMode, int maxParallelism, int batchSize, long bufferTimeout, int maxInFlightRecords, @Nullable SwitchContextListener<K> switchContextListener, @Nullable MetricGroup metricGroup) {
        this.keyAccountingUnit = new KeyAccountingUnit(maxInFlightRecords);
        this.mailboxExecutor = mailboxExecutor;
        this.exceptionHandler = exceptionHandler;
        this.callbackRunner = new CallbackRunnerWrapper(mailboxExecutor, this::notifyNewMail);
        this.asyncFutureFactory = new AsyncFutureFactory(this, this.callbackRunner, exceptionHandler);
        this.asyncExecutor = asyncExecutor;
        this.declarationManager = declarationManager;
        this.epochParallelMode = epochParallelMode;
        this.batchSize = batchSize;
        this.bufferTimeout = bufferTimeout;
        this.maxInFlightRecordNum = maxInFlightRecords;
        this.inFlightRecordNum = new AtomicInteger(0);
        this.maxParallelism = maxParallelism;
        this.asyncRequestsBuffer = new AsyncRequestBuffer(bufferTimeout, 100L, scheduledSeq -> mailboxExecutor.execute(() -> {
            if (this.asyncRequestsBuffer.checkCurrentSeq((long)scheduledSeq)) {
                this.triggerIfNeeded(true);
            }
        }, "AEC-buffer-timeout"));
        this.epochManager = new EpochManager(this);
        this.switchContextListener = switchContextListener;
        if (metricGroup != null) {
            metricGroup.gauge("numInFlightRecords", this::getInFlightRecordNum);
            metricGroup.gauge("activeBufferSize", this.asyncRequestsBuffer::activeQueueSize);
            metricGroup.gauge("blockingBufferSize", this.asyncRequestsBuffer::blockingQueueSize);
            metricGroup.gauge("numBlockingKeys", this.asyncRequestsBuffer::blockingKeyNum);
        }
        LOG.info("Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordNum {}, epochParallelMode {}", new Object[]{this.batchSize, this.bufferTimeout, this.maxInFlightRecordNum, this.epochParallelMode});
    }

    public RecordContext<K> buildContext(Object record, K key) {
        return this.buildContext(record, key, false);
    }

    public RecordContext<K> buildContext(Object record, K key, boolean inherit) {
        if (inherit && this.currentContext != null) {
            return new RecordContext<K>(record == null ? RecordContext.EMPTY_RECORD : record, key, this::disposeContext, KeyGroupRangeAssignment.assignToKeyGroup(key, this.maxParallelism), this.epochManager.onEpoch(this.currentContext.getEpoch()), this.currentContext.getVariablesReference(), this.currentContext.getPriority() + 1);
        }
        return new RecordContext<K>(record == null ? RecordContext.EMPTY_RECORD : record, key, this::disposeContext, KeyGroupRangeAssignment.assignToKeyGroup(key, this.maxParallelism), this.epochManager.onRecord(), this.declarationManager.variableCount());
    }

    public void setCurrentContext(RecordContext<K> switchingContext) {
        if (this.currentContext != switchingContext) {
            this.currentContext = switchingContext;
            this.declarationManager.setCurrentContext(switchingContext);
            if (this.switchContextListener != null) {
                this.switchContextListener.switchContext(switchingContext);
            }
        }
    }

    public RecordContext<K> getCurrentContext() {
        return this.currentContext;
    }

    void disposeContext(RecordContext<K> toDispose) {
        this.epochManager.completeOneRecord(toDispose.getEpoch());
        this.keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey());
        this.inFlightRecordNum.decrementAndGet();
        AsyncRequest<K> nextRequest = this.asyncRequestsBuffer.unblockOneByKey(toDispose.getKey());
        if (nextRequest != null) {
            Preconditions.checkState(this.tryOccupyKey(nextRequest.getRecordContext()));
            this.insertActiveBuffer(nextRequest);
        }
    }

    boolean tryOccupyKey(RecordContext<K> recordContext) {
        boolean occupied = recordContext.isKeyOccupied();
        if (!occupied && this.keyAccountingUnit.occupy(recordContext.getRecord(), recordContext.getKey())) {
            recordContext.setKeyOccupied();
            occupied = true;
        }
        return occupied;
    }

    public void handleRequest(AsyncRequest<K> request, boolean allowOverdraft) {
        this.seizeCapacity(allowOverdraft);
        if (this.tryOccupyKey(this.currentContext)) {
            this.insertActiveBuffer(request);
        } else {
            this.insertBlockingBuffer(request);
        }
        this.triggerIfNeeded(false);
    }

    public void waitUntil(Supplier<Boolean> checker) {
        if (!checker.get().booleanValue()) {
            this.triggerIfNeeded(true);
            try {
                while (!checker.get().booleanValue()) {
                    if (this.mailboxExecutor.tryYield()) continue;
                    if (!this.asyncExecutor.fullyLoaded()) {
                        this.triggerIfNeeded(true);
                    }
                    this.waitForNewMails();
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    void insertActiveBuffer(AsyncRequest<K> request) {
        if (request.isSync()) {
            if (request instanceof SyncPointRequest) {
                request.getFuture().complete(null);
            } else {
                this.asyncExecutor.executeRequestSync(request);
            }
        } else {
            this.asyncRequestsBuffer.enqueueToActive(request);
        }
    }

    void insertBlockingBuffer(AsyncRequest<K> request) {
        this.asyncRequestsBuffer.enqueueToBlocking(request);
    }

    public boolean triggerIfNeeded(boolean force) {
        if (!force && this.asyncRequestsBuffer.activeQueueSize() < this.batchSize) {
            return false;
        }
        Optional toRun = this.asyncRequestsBuffer.popActive(this.batchSize, this.asyncExecutor::createRequestContainer);
        if (toRun.isEmpty() || toRun.get().isEmpty()) {
            return false;
        }
        this.asyncExecutor.executeBatchRequests(toRun.get());
        this.asyncRequestsBuffer.advanceSeq();
        return true;
    }

    private void seizeCapacity(boolean allowOverdraft) {
        if (this.currentContext.isKeyOccupied()) {
            return;
        }
        this.drainInflightRecords(this.maxInFlightRecordNum, !allowOverdraft);
        this.inFlightRecordNum.incrementAndGet();
    }

    public AsyncFuture<Void> syncPointRequestWithCallback(ThrowingRunnable<Exception> callback, boolean allowOverdraft) {
        InternalAsyncFuture<Void> asyncFuture = this.asyncFutureFactory.create(this.currentContext);
        this.handleRequest(new SyncPointRequest<K>(this.currentContext, asyncFuture), allowOverdraft);
        return asyncFuture.thenAccept(v -> callback.run());
    }

    public void drainInflightRecords(int targetNum) {
        this.drainInflightRecords(targetNum, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void drainInflightRecords(int targetNum, boolean forceToWait) {
        if (!forceToWait && this.drainDepth > 5) {
            return;
        }
        RecordContext<K> storedContext = this.currentContext;
        ++this.drainDepth;
        try {
            boolean shouldWait = true;
            while (shouldWait && this.inFlightRecordNum.get() > targetNum) {
                if (this.mailboxExecutor.tryYield()) continue;
                boolean triggered = false;
                if (targetNum == 0 || !this.asyncExecutor.fullyLoaded()) {
                    triggered = this.triggerIfNeeded(true);
                }
                if (!(forceToWait || triggered || this.asyncExecutor.fullyLoaded() || this.callbackRunner.isHasMail())) {
                    shouldWait = false;
                    continue;
                }
                this.waitForNewMails();
            }
        }
        catch (InterruptedException interruptedException) {
        }
        finally {
            --this.drainDepth;
            this.setCurrentContext(storedContext);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForNewMails() throws InterruptedException {
        if (!this.callbackRunner.isHasMail()) {
            Object object = this.notifyLock;
            synchronized (object) {
                if (!this.callbackRunner.isHasMail()) {
                    this.waitingMail = true;
                    this.notifyLock.wait(1L);
                    this.waitingMail = false;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyNewMail() {
        if (this.waitingMail) {
            Object object = this.notifyLock;
            synchronized (object) {
                if (this.waitingMail) {
                    this.notifyLock.notify();
                }
            }
        }
    }

    public void processNonRecord(@Nullable ThrowingRunnable<? extends Exception> triggerAction, @Nullable ThrowingRunnable<? extends Exception> finalAction) {
        this.epochManager.onNonRecord(triggerAction == null ? null : () -> {
            try {
                RecordContext<K> previousContext = this.currentContext;
                this.setCurrentContext(null);
                triggerAction.run();
                this.setCurrentContext(previousContext);
            }
            catch (Exception e) {
                this.exceptionHandler.handleException("Failed to process non-record.", e);
            }
        }, finalAction == null ? null : () -> {
            try {
                RecordContext<K> previousContext = this.currentContext;
                this.setCurrentContext(null);
                finalAction.run();
                this.setCurrentContext(previousContext);
            }
            catch (Exception e) {
                this.exceptionHandler.handleException("Failed to process non-record.", e);
            }
        }, this.epochParallelMode);
    }

    @VisibleForTesting
    public AsyncExecutor<REQUEST> getAsyncExecutor() {
        return this.asyncExecutor;
    }

    @VisibleForTesting
    public int getInFlightRecordNum() {
        return this.inFlightRecordNum.get();
    }

    @VisibleForTesting
    public MailboxExecutor getMailboxExecutor() {
        return this.mailboxExecutor;
    }

    @Override
    public void close() throws IOException {
        this.asyncRequestsBuffer.close();
    }

    public static interface SwitchContextListener<K> {
        public void switchContext(@Nullable RecordContext<K> var1);
    }
}

