/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.asyncprocessing.AsyncRequest;
import org.apache.flink.runtime.asyncprocessing.AsyncRequestContainer;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

@NotThreadSafe
public class AsyncRequestBuffer<K>
implements Closeable {
    private static final ScheduledThreadPoolExecutor DELAYER = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new ExecutorThreadFactory("asyncRequestBuffer-timeout-scheduler"));
    final LinkedList<AsyncRequest<K>> activeQueue = new LinkedList();
    final Map<K, LinkedList<AsyncRequest<K>>> blockingQueue = new HashMap<K, LinkedList<AsyncRequest<K>>>();
    int blockingQueueSize = 0;
    final long bufferTimeout;
    final long bufferTimeoutCheckInterval;
    final Consumer<Long> timeoutHandler;
    final ScheduledExecutorService scheduledExecutor;
    ScheduledFuture<?> currentScheduledFuture;
    volatile Tuple2<Long, Long> seqAndTimeout = null;
    final AtomicLong currentSeq;

    public AsyncRequestBuffer(long bufferTimeout, long bufferTimeoutCheckInterval, Consumer<Long> timeoutHandler) {
        this.bufferTimeout = bufferTimeout;
        this.bufferTimeoutCheckInterval = bufferTimeoutCheckInterval;
        this.timeoutHandler = timeoutHandler;
        this.currentSeq = new AtomicLong(0L);
        if (bufferTimeout > 0L) {
            this.scheduledExecutor = DELAYER;
            this.initPeriodicTimeoutCheck();
        } else {
            this.scheduledExecutor = null;
        }
    }

    private void initPeriodicTimeoutCheck() {
        this.currentScheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(() -> {
            Tuple2<Long, Long> theSeqAndTimeout = this.seqAndTimeout;
            if (theSeqAndTimeout != null && ((Long)theSeqAndTimeout.f0).longValue() == this.currentSeq.get() && (Long)theSeqAndTimeout.f1 <= System.currentTimeMillis()) {
                this.timeoutHandler.accept((Long)theSeqAndTimeout.f0);
            }
        }, this.bufferTimeout, this.bufferTimeoutCheckInterval, TimeUnit.MILLISECONDS);
    }

    void advanceSeq() {
        this.seqAndTimeout = null;
        this.currentSeq.incrementAndGet();
    }

    boolean checkCurrentSeq(long seq) {
        return this.currentSeq.get() == seq;
    }

    void enqueueToActive(AsyncRequest<K> request) {
        this.activeQueue.add(request);
        if (this.bufferTimeout > 0L && this.seqAndTimeout == null) {
            this.seqAndTimeout = Tuple2.of((Object)this.currentSeq.get(), (Object)(System.currentTimeMillis() + this.bufferTimeout));
        }
    }

    void enqueueToBlocking(AsyncRequest<K> request) {
        LinkedList currentList = this.blockingQueue.computeIfAbsent(request.getRecordContext().getKey(), k -> new LinkedList());
        if (currentList.isEmpty()) {
            currentList.addLast(request);
        } else {
            int priority = request.getRecordContext().getPriority();
            if (priority == 0) {
                currentList.addLast(request);
            } else {
                boolean inserted = false;
                ListIterator<AsyncRequest<K>> iterator = currentList.listIterator(0);
                while (!inserted && iterator.hasNext()) {
                    AsyncRequest iterRequest = (AsyncRequest)iterator.next();
                    if (iterRequest.getRecordContext().getPriority() >= priority) continue;
                    iterator.previous();
                    iterator.add(request);
                    inserted = true;
                }
                if (!inserted) {
                    currentList.addLast(request);
                }
            }
        }
        ++this.blockingQueueSize;
    }

    @Nullable
    AsyncRequest<K> unblockOneByKey(K key) {
        if (!this.blockingQueue.containsKey(key)) {
            return null;
        }
        AsyncRequest<K> asyncRequest = this.blockingQueue.get(key).removeFirst();
        if (this.blockingQueue.get(key).isEmpty()) {
            this.blockingQueue.remove(key);
        }
        --this.blockingQueueSize;
        return asyncRequest;
    }

    int blockingQueueSize() {
        return this.blockingQueueSize;
    }

    int blockingKeyNum() {
        return this.blockingQueue.size();
    }

    int activeQueueSize() {
        return this.activeQueue.size();
    }

    <REQUEST extends AsyncRequest<?>> Optional<AsyncRequestContainer<REQUEST>> popActive(int n, Supplier<AsyncRequestContainer<REQUEST>> requestContainerInitializer) {
        int count = Math.min(n, this.activeQueue.size());
        if (count <= 0) {
            return Optional.empty();
        }
        AsyncRequestContainer<AsyncRequest<K>> asyncRequestContainer = requestContainerInitializer.get();
        for (int i = 0; i < count; ++i) {
            asyncRequestContainer.offer(this.activeQueue.pop());
        }
        return Optional.of(asyncRequestContainer);
    }

    @Override
    public synchronized void close() throws IOException {
        if (this.currentScheduledFuture != null) {
            this.currentScheduledFuture.cancel(true);
            this.currentScheduledFuture = null;
        }
    }

    static {
        DELAYER.setRemoveOnCancelPolicy(true);
        DELAYER.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        DELAYER.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    }
}

