/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.neo4j.concurrent.AsyncApply;
import org.neo4j.concurrent.Work;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;
import org.neo4j.unsafe.impl.batchimport.staging.AbstractStep;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.SendDownstream;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

public abstract class ProcessorStep<T>
extends AbstractStep<T> {
    private TaskExecutor<Sender> executor;
    private final int maxProcessors;
    private final AtomicLong lastBatchEndTime = new AtomicLong();

    protected ProcessorStep(StageControl control, String name, Configuration config, int maxProcessors, StatsProvider ... additionalStatsProviders) {
        super(control, name, config, additionalStatsProviders);
        this.maxProcessors = maxProcessors;
    }

    @Override
    public void start(int orderingGuarantees) {
        super.start(orderingGuarantees);
        this.executor = new DynamicTaskExecutor<Sender>(1, this.maxProcessors, this.config.maxNumberOfProcessors(), PARK, this.name(), () -> new Sender());
    }

    @Override
    public long receive(long ticket, T batch) {
        this.incrementQueue();
        long nanoTime = System.nanoTime();
        this.executor.submit(sender -> {
            this.assertHealthy();
            sender.initialize(ticket);
            try {
                long startTime = System.nanoTime();
                this.process(batch, (BatchSender)sender);
                if (this.downstream == null) {
                    this.doneBatches.incrementAndGet();
                    this.control.recycle(batch);
                }
                this.totalProcessingTime.add(System.nanoTime() - startTime - ((Sender)sender).sendTime);
                this.decrementQueue();
                this.checkNotifyEndDownstream();
            }
            catch (Throwable e) {
                this.issuePanic(e);
            }
        });
        return System.nanoTime() - nanoTime;
    }

    private void decrementQueue() {
        int queueSizeAfterThisJobDone = this.queuedBatches.decrementAndGet();
        assert (queueSizeAfterThisJobDone >= 0) : "Negative queue size " + queueSizeAfterThisJobDone;
        if (queueSizeAfterThisJobDone == 0) {
            this.lastBatchEndTime.set(System.currentTimeMillis());
        }
    }

    private void incrementQueue() {
        long lastBatchEnd;
        if (this.queuedBatches.getAndIncrement() == 0 && (lastBatchEnd = this.lastBatchEndTime.get()) != 0L) {
            this.upstreamIdleTime.add(System.currentTimeMillis() - lastBatchEnd);
        }
    }

    protected abstract void process(T var1, BatchSender var2) throws Throwable;

    @Override
    public void close() throws Exception {
        super.close();
        this.executor.close();
    }

    @Override
    public int processors(int delta) {
        return this.executor.processors(delta);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private AsyncApply sendDownstream(long ticket, Object batch, AsyncApply downstreamAsync) {
        if (this.guarantees(1)) {
            AsyncApply async = this.downstreamWorkSync.applyAsync((Work)new SendDownstream(ticket, batch, this.downstreamIdleTime));
            if (downstreamAsync == null) return async;
            try {
                downstreamAsync.await();
                async.await();
                return null;
            }
            catch (ExecutionException e) {
                this.issuePanic(e.getCause());
                return null;
            }
        } else {
            this.downstreamIdleTime.add(this.downstream.receive(ticket, batch));
            this.doneBatches.incrementAndGet();
        }
        return null;
    }

    @Override
    protected void done() {
        this.lastCallForEmittingOutstandingBatches(new Sender());
        if (this.downstreamWorkSync != null) {
            try {
                this.downstreamWorkSync.apply((Work)new SendDownstream(-1L, null, this.downstreamIdleTime));
            }
            catch (ExecutionException e) {
                this.issuePanic(e.getCause());
            }
        }
        super.done();
    }

    protected void lastCallForEmittingOutstandingBatches(BatchSender sender) {
    }

    private class Sender
    implements BatchSender {
        private long sendTime;
        private long ticket;
        private AsyncApply downstreamAsync;

        private Sender() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void send(Object batch) {
            long time = System.nanoTime();
            try {
                this.downstreamAsync = ProcessorStep.this.sendDownstream(this.ticket, batch, this.downstreamAsync);
            }
            finally {
                this.sendTime += System.nanoTime() - time;
            }
        }

        public void initialize(long ticket) {
            this.ticket = ticket;
            this.sendTime = 0L;
        }
    }
}

