/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.metrics.ProbeUnit;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.accumulator.LongLongAccumulator;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.SnapshotBarrier;
import com.hazelcast.jet.impl.execution.SnapshotContext;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.util.AsyncSnapshotWriter;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.logging.ILogger;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nonnull;

public class StoreSnapshotTasklet
implements Tasklet {
    long pendingSnapshotId;
    private final SnapshotContext snapshotContext;
    private final InboundEdgeStream inboundEdgeStream;
    private final ILogger logger;
    private final String vertexName;
    private final boolean isHigherPrioritySource;
    private final AsyncSnapshotWriter ssWriter;
    private final ProgressTracker progTracker = new ProgressTracker();
    private final AtomicReference<LongLongAccumulator> metrics = new AtomicReference<LongLongAccumulator>(new LongLongAccumulator());
    private State state = State.DRAIN;
    private boolean hasReachedBarrier;
    private Map.Entry<Data, Data> pendingEntry;
    private Consumer<Object> addToInboxFunction;

    public StoreSnapshotTasklet(SnapshotContext snapshotContext, InboundEdgeStream inboundEdgeStream, AsyncSnapshotWriter ssWriter, ILogger logger2, String vertexName, boolean isHigherPrioritySource) {
        this.snapshotContext = snapshotContext;
        this.inboundEdgeStream = inboundEdgeStream;
        this.logger = logger2;
        this.vertexName = vertexName;
        this.isHigherPrioritySource = isHigherPrioritySource;
        this.ssWriter = ssWriter;
        this.pendingSnapshotId = snapshotContext.activeSnapshotIdPhase1() + 1L;
        this.addToInboxFunction = this::addToInbox;
    }

    @Override
    @Nonnull
    public ProgressState call() {
        this.progTracker.reset();
        this.stateMachineStep();
        return this.progTracker.toProgressState();
    }

    private void stateMachineStep() {
        switch (this.state) {
            case DRAIN: {
                this.progTracker.notDone();
                if (this.pendingEntry != null) {
                    if (!this.ssWriter.offer(this.pendingEntry)) {
                        return;
                    }
                    this.progTracker.madeProgress();
                }
                this.pendingEntry = null;
                ProgressState result = this.inboundEdgeStream.drainTo(this.addToInboxFunction);
                if (result.isDone()) {
                    assert (this.ssWriter.isEmpty()) : "input is done, but we had some entries and not the barrier";
                    this.snapshotContext.storeSnapshotTaskletDone(this.pendingSnapshotId - 1L, this.isHigherPrioritySource);
                    this.state = State.DONE;
                    this.progTracker.reset();
                }
                this.progTracker.madeProgress(result.isMadeProgress());
                if (!this.hasReachedBarrier) break;
                this.state = State.FLUSH;
                this.stateMachineStep();
                break;
            }
            case FLUSH: {
                this.progTracker.notDone();
                if (!this.ssWriter.flushAndResetMap()) break;
                this.progTracker.madeProgress();
                this.state = State.REACHED_BARRIER;
                break;
            }
            case REACHED_BARRIER: {
                if (this.ssWriter.hasPendingAsyncOps()) {
                    this.progTracker.notDone();
                    return;
                }
                Throwable error = this.ssWriter.getError();
                if (error != null) {
                    this.logger.severe("Error writing to snapshot map", error);
                    this.snapshotContext.reportError(error);
                }
                this.progTracker.madeProgress();
                long bytes = this.ssWriter.getTotalPayloadBytes();
                long keys2 = this.ssWriter.getTotalKeys();
                long chunks = this.ssWriter.getTotalChunks();
                this.snapshotContext.phase1DoneForTasklet(bytes, keys2, chunks);
                this.metrics.set(new LongLongAccumulator(bytes, keys2));
                this.ssWriter.resetStats();
                ++this.pendingSnapshotId;
                this.hasReachedBarrier = false;
                this.state = State.DRAIN;
                this.progTracker.notDone();
                break;
            }
            default: {
                throw new JetException("Unexpected state: " + (Object)((Object)this.state));
            }
        }
    }

    private boolean addToInbox(Object o) {
        if (o instanceof SnapshotBarrier) {
            SnapshotBarrier barrier = (SnapshotBarrier)o;
            assert (this.pendingSnapshotId == barrier.snapshotId()) : "Unexpected barrier, expected was " + this.pendingSnapshotId + ", but barrier was " + barrier.snapshotId() + ", this=" + this;
            this.hasReachedBarrier = true;
        } else if (!this.ssWriter.offer((Map.Entry)o)) {
            this.pendingEntry = (Map.Entry)o;
            return false;
        }
        return true;
    }

    @Override
    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        descriptor = descriptor.withTag("vertex", this.vertexName);
        LongLongAccumulator metricValues = this.metrics.get();
        context.collect(descriptor, "snapshotBytes", ProbeLevel.INFO, ProbeUnit.COUNT, metricValues.get1());
        context.collect(descriptor, "snapshotKeys", ProbeLevel.INFO, ProbeUnit.COUNT, metricValues.get2());
    }

    public String toString() {
        return StoreSnapshotTasklet.class.getSimpleName() + '{' + this.vertexName + '}';
    }

    static enum State {
        DRAIN,
        FLUSH,
        REACHED_BARRIER,
        DONE;

    }
}

