/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.neo4j.function.primitive.PrimitiveLongPredicate;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.unsafe.impl.batchimport.staging.AbstractStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

public abstract class ExecutorServiceStep<T>
extends AbstractStep<T> {
    private final ExecutorService executor;
    private final int workAheadSize;
    private final PrimitiveLongPredicate catchUp = new PrimitiveLongPredicate(){

        public boolean accept(long queueSizeThreshold) {
            return (long)ExecutorServiceStep.this.queuedBatches.get() <= queueSizeThreshold;
        }
    };
    private final AtomicLong lastBatchEndTime = new AtomicLong();

    protected ExecutorServiceStep(StageControl control, String name, int workAheadSize, int numberOfExecutors) {
        super(control, name);
        this.workAheadSize = workAheadSize;
        NamedThreadFactory threadFactory = new NamedThreadFactory(name);
        this.executor = numberOfExecutors == 1 ? Executors.newSingleThreadExecutor(threadFactory) : Executors.newFixedThreadPool(numberOfExecutors, threadFactory);
    }

    @Override
    public long receive(final long ticket, final T batch) {
        long idleTime = this.await(this.catchUp, this.workAheadSize);
        this.receivedBatch();
        this.executor.submit(new Runnable(){

            @Override
            public void run() {
                ExecutorServiceStep.this.assertHealthy();
                long startTime = System.currentTimeMillis();
                try {
                    Object result = ExecutorServiceStep.this.process(ticket, batch);
                    ExecutorServiceStep.this.totalProcessingTime.addAndGet(System.currentTimeMillis() - startTime);
                    ExecutorServiceStep.this.await(ExecutorServiceStep.this.rightTicket, ticket);
                    ExecutorServiceStep.this.sendDownstream(ticket, result);
                    long expectedTicket = ExecutorServiceStep.this.doneBatches.incrementAndGet();
                    assert (expectedTicket == ticket) : "Unexpected ticket " + ticket + ", expected " + expectedTicket;
                    int queueSizeAfterThisJobDone = ExecutorServiceStep.this.queuedBatches.decrementAndGet();
                    assert (queueSizeAfterThisJobDone >= 0) : "Negative queue size " + queueSizeAfterThisJobDone;
                    if (queueSizeAfterThisJobDone == 0) {
                        ExecutorServiceStep.this.lastBatchEndTime.set(System.currentTimeMillis());
                    }
                    ExecutorServiceStep.this.checkNotifyEndDownstream();
                }
                catch (Throwable e) {
                    ExecutorServiceStep.this.issuePanic(e);
                }
            }
        });
        return idleTime;
    }

    private void receivedBatch() {
        long lastBatchEnd;
        if (this.queuedBatches.getAndIncrement() == 0 && (lastBatchEnd = this.lastBatchEndTime.get()) != 0L) {
            this.upstreamIdleTime.addAndGet(System.currentTimeMillis() - lastBatchEnd);
        }
    }

    protected abstract Object process(long var1, T var3);

    @Override
    protected void done() {
        this.executor.shutdown();
        super.done();
    }
}

