/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
import org.apache.flink.state.forst.ForStDBGetRequest;
import org.apache.flink.state.forst.ForStDBPutRequest;
import org.apache.flink.state.forst.ForStGeneralMultiGetOperation;
import org.apache.flink.state.forst.ForStStateRequestClassifier;
import org.apache.flink.state.forst.ForStWriteBatchOperation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.rocksdb.RocksDB;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForStStateExecutor
implements StateExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(ForStStateExecutor.class);
    private final ExecutorService coordinatorThread = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new ExecutorThreadFactory("ForSt-StateExecutor-Coordinator"));
    private final ExecutorService workerThreads;
    private final RocksDB db;
    private final WriteOptions writeOptions;
    private Throwable executionError;

    public ForStStateExecutor(int ioParallelism, RocksDB db, WriteOptions writeOptions) {
        this.workerThreads = Executors.newFixedThreadPool(ioParallelism, (ThreadFactory)new ExecutorThreadFactory("ForSt-StateExecutor-IO"));
        this.db = db;
        this.writeOptions = writeOptions;
    }

    public CompletableFuture<Void> executeBatchRequests(StateRequestContainer stateRequestContainer) {
        this.checkState();
        Preconditions.checkArgument((boolean)(stateRequestContainer instanceof ForStStateRequestClassifier));
        ForStStateRequestClassifier stateRequestClassifier = (ForStStateRequestClassifier)stateRequestContainer;
        CompletableFuture<Void> resultFuture = new CompletableFuture<Void>();
        this.coordinatorThread.execute(() -> {
            List<ForStDBGetRequest<?, ?>> getRequests;
            long startTime = System.currentTimeMillis();
            ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(2);
            List<ForStDBPutRequest<?, ?>> putRequests = stateRequestClassifier.pollDbPutRequests();
            if (!putRequests.isEmpty()) {
                ForStWriteBatchOperation writeOperations = new ForStWriteBatchOperation(this.db, putRequests, this.writeOptions, this.workerThreads);
                futures.add(writeOperations.process());
            }
            if (!(getRequests = stateRequestClassifier.pollDbGetRequests()).isEmpty()) {
                ForStGeneralMultiGetOperation getOperations = new ForStGeneralMultiGetOperation(this.db, getRequests, this.workerThreads);
                futures.add(getOperations.process());
            }
            FutureUtils.combineAll(futures).thenAcceptAsync(e -> {
                long duration = System.currentTimeMillis() - startTime;
                LOG.debug("Complete executing a batch of state requests, putRequest size {}, getRequest size {}, duration {} ms", new Object[]{putRequests.size(), getRequests.size(), duration});
                resultFuture.complete(null);
            }, (Executor)this.coordinatorThread).exceptionally(e -> {
                this.executionError = e;
                resultFuture.completeExceptionally((Throwable)e);
                return null;
            });
        });
        return resultFuture;
    }

    public StateRequestContainer createStateRequestContainer() {
        this.checkState();
        return new ForStStateRequestClassifier();
    }

    private void checkState() {
        if (this.executionError != null) {
            throw new IllegalStateException("previous state request already failed : ", this.executionError);
        }
    }

    public void shutdown() {
        this.workerThreads.shutdown();
        this.coordinatorThread.shutdown();
        LOG.info("Shutting down the ForStStateExecutor.");
    }
}

