/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.EpochManager;
import org.apache.flink.runtime.asyncprocessing.StateExecutionController;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncKeyOrderedStreamOperator;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncKeyOrderedProcessingOperator;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class AbstractAsyncStateStreamOperator<OUT>
extends AbstractAsyncKeyOrderedStreamOperator<OUT>
implements AsyncKeyOrderedProcessingOperator {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractAsyncStateStreamOperator.class);

    @Override
    protected KeySelector getKeySelectorForAsyncKeyedContext(int index) {
        switch (index) {
            case 1: {
                return this.stateKeySelector1;
            }
            case 2: {
                return this.stateKeySelector2;
            }
        }
        throw new ArrayIndexOutOfBoundsException("Try to get key selector for index " + index);
    }

    @Override
    protected AsyncExecutionController createAsyncExecutionController() {
        StreamTask<?, ?> containingTask = Preconditions.checkNotNull(this.getContainingTask());
        MailboxExecutor mailboxExecutor = containingTask.getMailboxExecutorFactory().createExecutor(this.getOperatorConfig().getChainIndex());
        int maxParallelism = this.environment.getTaskInfo().getMaxNumberOfParallelSubtasks();
        int inFlightRecordsLimit = this.environment.getExecutionConfig().getAsyncStateTotalBufferSize();
        int asyncBufferSize = this.environment.getExecutionConfig().getAsyncStateActiveBufferSize();
        long asyncBufferTimeout = this.environment.getExecutionConfig().getAsyncStateActiveBufferTimeout();
        if (this.isAsyncKeyOrderedProcessingEnabled()) {
            AsyncKeyedStateBackend asyncKeyedStateBackend = this.stateHandler.getAsyncKeyedStateBackend();
            if (asyncKeyedStateBackend != null) {
                StateExecutionController asyncExecutionController = new StateExecutionController(mailboxExecutor, this::handleAsyncException, asyncKeyedStateBackend.createStateExecutor(), this.declarationManager, EpochManager.ParallelMode.SERIAL_BETWEEN_EPOCH, maxParallelism, asyncBufferSize, asyncBufferTimeout, inFlightRecordsLimit, asyncKeyedStateBackend, this.getMetricGroup().addGroup("asyncStateProcessing"));
                asyncKeyedStateBackend.setup(asyncExecutionController);
                if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) {
                    LOG.warn("A normal KeyedStateBackend({}) is used when enabling the async state processing. Parallel asynchronous processing does not work. All state access will be processed synchronously.", this.stateHandler.getKeyedStateBackend());
                }
                return asyncExecutionController;
            }
            if (this.stateHandler.getKeyedStateBackend() != null) {
                throw new UnsupportedOperationException("Current State Backend doesn't support async access, AsyncExecutionController could not work");
            }
        }
        return null;
    }
}

