/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongPredicate;
import org.neo4j.graphdb.Resource;
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;
import org.neo4j.unsafe.impl.batchimport.staging.AbstractStep;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.Processing;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

public abstract class ProcessorStep<T>
extends AbstractStep<T> {
    private TaskExecutor<Sender> executor;
    private final int maxProcessors;
    private final Configuration config;
    private final LongPredicate catchUp = queueSizeThreshold -> (long)this.queuedBatches.get() <= queueSizeThreshold;
    protected final AtomicLong begunBatches = new AtomicLong();
    private final LongPredicate rightBeginTicket = ticket -> this.begunBatches.get() == ticket;
    private final AtomicLong lastBatchEndTime = new AtomicLong();
    private final ParkStrategy park = new ParkStrategy.Park(1L, TimeUnit.MILLISECONDS);

    protected ProcessorStep(StageControl control, String name, Configuration config, int maxProcessors, StatsProvider ... additionalStatsProviders) {
        super(control, name, config, additionalStatsProviders);
        this.config = config;
        this.maxProcessors = maxProcessors;
    }

    @Override
    public void start(int orderingGuarantees) {
        super.start(orderingGuarantees);
        this.executor = new DynamicTaskExecutor<Sender>(1, this.maxProcessors, this.theoreticalMaxProcessors(), DynamicTaskExecutor.DEFAULT_PARK_STRATEGY, this.name(), () -> new Sender());
    }

    private int theoreticalMaxProcessors() {
        return this.maxProcessors == 0 ? this.config.maxNumberOfProcessors() : this.maxProcessors;
    }

    @Override
    public long receive(long ticket, T batch) {
        long idleTime = Processing.await(this.catchUp, this.executor.processors(0), this.healthChecker, this.park);
        this.incrementQueue();
        this.executor.submit(sender -> {
            this.assertHealthy();
            sender.initialize(ticket);
            try {
                if (this.guarantees(2)) {
                    Processing.await(this.rightBeginTicket, ticket, this.healthChecker, this.park);
                }
                try (Resource precondition = this.permit(batch);){
                    this.begunBatches.incrementAndGet();
                    long startTime1 = System.nanoTime();
                    this.process(batch, (BatchSender)sender);
                    if (this.downstream == null) {
                        this.doneBatches.incrementAndGet();
                    }
                    this.totalProcessingTime.add(System.nanoTime() - startTime1 - ((Sender)sender).sendTime);
                }
                this.decrementQueue();
                this.checkNotifyEndDownstream();
            }
            catch (Throwable e) {
                this.issuePanic(e);
            }
        });
        return idleTime;
    }

    protected Resource permit(T batch) throws Throwable {
        return Resource.EMPTY;
    }

    private void decrementQueue() {
        int queueSizeAfterThisJobDone = this.queuedBatches.decrementAndGet();
        assert (queueSizeAfterThisJobDone >= 0) : "Negative queue size " + queueSizeAfterThisJobDone;
        if (queueSizeAfterThisJobDone == 0) {
            this.lastBatchEndTime.set(System.currentTimeMillis());
        }
    }

    private void incrementQueue() {
        long lastBatchEnd;
        if (this.queuedBatches.getAndIncrement() == 0 && (lastBatchEnd = this.lastBatchEndTime.get()) != 0L) {
            this.upstreamIdleTime.addAndGet(System.currentTimeMillis() - lastBatchEnd);
        }
    }

    protected abstract void process(T var1, BatchSender var2) throws Throwable;

    @Override
    public void close() throws Exception {
        super.close();
        this.executor.shutdown(this.panic == null ? 1 : 2);
    }

    @Override
    public int processors(int delta) {
        return this.executor.processors(delta);
    }

    private void sendDownstream(long ticket, Object batch) {
        if (this.guarantees(1)) {
            Processing.await(this.rightDoneTicket, ticket, this.healthChecker, this.park);
        }
        this.downstreamIdleTime.addAndGet(this.downstream.receive(ticket, batch));
        this.doneBatches.incrementAndGet();
    }

    @Override
    protected void done() {
        this.lastCallForEmittingOutstandingBatches(new Sender());
        super.done();
    }

    protected void lastCallForEmittingOutstandingBatches(BatchSender sender) {
    }

    private class Sender
    implements BatchSender {
        private long sendTime;
        private long ticket;

        private Sender() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void send(Object batch) {
            long time = System.nanoTime();
            try {
                ProcessorStep.this.sendDownstream(this.ticket, batch);
            }
            finally {
                this.sendTime += System.nanoTime() - time;
            }
        }

        public void initialize(long ticket) {
            this.ticket = ticket;
            this.sendTime = 0L;
        }
    }
}

