/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing;

import java.util.LinkedList;
import javax.annotation.Nullable;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EpochManager {
    private static final Logger LOG = LoggerFactory.getLogger(EpochManager.class);
    final AsyncExecutionController<?, ?> asyncExecutionController;
    long epochNum = 0L;
    LinkedList<Epoch> outputQueue = new LinkedList();
    Epoch activeEpoch;
    @Nullable
    Epoch finishingEpoch;
    boolean recursiveFlag;

    public EpochManager(AsyncExecutionController<?, ?> aec) {
        this.asyncExecutionController = aec;
        this.activeEpoch = new Epoch(this.epochNum++);
        this.finishingEpoch = null;
        this.recursiveFlag = false;
    }

    public Epoch onRecord() {
        if (this.finishingEpoch != null) {
            ++this.finishingEpoch.ongoingRecordCount;
            return this.finishingEpoch;
        }
        ++this.activeEpoch.ongoingRecordCount;
        return this.activeEpoch;
    }

    public Epoch onEpoch(Epoch epoch) {
        ++epoch.ongoingRecordCount;
        return epoch;
    }

    public void onNonRecord(@Nullable Runnable triggerAction, @Nullable Runnable finalAction, ParallelMode parallelMode) {
        LOG.trace("on NonRecord, old epoch: {}, outputQueue size: {}", (Object)this.activeEpoch, (Object)this.outputQueue.size());
        this.switchActiveEpoch(triggerAction, finalAction);
        if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
            this.asyncExecutionController.drainInflightRecords(0);
        }
    }

    public void completeOneRecord(Epoch epoch) {
        if (--epoch.ongoingRecordCount == 0 && epoch != this.activeEpoch) {
            this.tryFinishInQueue();
        }
    }

    private void tryFinishInQueue() {
        if (this.recursiveFlag) {
            return;
        }
        this.recursiveFlag = true;
        while (!this.outputQueue.isEmpty()) {
            Epoch epoch;
            this.finishingEpoch = epoch = this.outputQueue.peek();
            try {
                if (!epoch.tryFinish()) break;
                this.outputQueue.pop();
            }
            finally {
                this.finishingEpoch = null;
            }
        }
        this.recursiveFlag = false;
    }

    private void switchActiveEpoch(@Nullable Runnable triggerAction, @Nullable Runnable finalAction) {
        this.activeEpoch.close(triggerAction, finalAction);
        this.outputQueue.offer(this.activeEpoch);
        this.activeEpoch = new Epoch(this.epochNum++);
        this.tryFinishInQueue();
    }

    public static class Epoch {
        long id;
        int ongoingRecordCount;
        @Nullable
        Runnable triggerAction;
        @Nullable
        Runnable finalAction;
        EpochStatus status;

        public Epoch(long id) {
            this.id = id;
            this.ongoingRecordCount = 0;
            this.status = EpochStatus.OPEN;
            this.triggerAction = null;
            this.finalAction = null;
        }

        boolean tryFinish() {
            if (this.ongoingRecordCount == 0) {
                if (this.status == EpochStatus.CLOSED) {
                    this.transition(EpochStatus.FINISHING);
                    if (this.triggerAction != null) {
                        this.triggerAction.run();
                    }
                }
                if (this.ongoingRecordCount == 0 && this.status == EpochStatus.FINISHING) {
                    this.transition(EpochStatus.FINISHED);
                    if (this.finalAction != null) {
                        this.finalAction.run();
                    }
                }
                return this.status == EpochStatus.FINISHED;
            }
            return false;
        }

        void transition(EpochStatus newStatus) {
            if (this.status != newStatus) {
                LOG.trace("Epoch {} transit from {} to {}", new Object[]{this, this.status, newStatus});
                this.status = newStatus;
            }
        }

        void close(@Nullable Runnable triggerAction, @Nullable Runnable finalAction) {
            this.triggerAction = triggerAction;
            this.finalAction = finalAction;
            this.transition(EpochStatus.CLOSED);
        }

        public String toString() {
            return String.format("Epoch{id=%d, ongoingRecord=%d, status=%s}", new Object[]{this.id, this.ongoingRecordCount, this.status});
        }
    }

    static enum EpochStatus {
        OPEN,
        CLOSED,
        FINISHING,
        FINISHED;

    }

    public static enum ParallelMode {
        SERIAL_BETWEEN_EPOCH,
        PARALLEL_BETWEEN_EPOCH;

    }
}

