/*
 * Decompiled with CFR 0.152.
 */
package reactor.core.processor;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.processor.CancelException;
import reactor.core.processor.ExecutorPoweredProcessor;
import reactor.core.processor.MutableSignal;
import reactor.core.processor.ReactorProcessor;
import reactor.core.processor.util.RingBufferSubscriberUtils;
import reactor.core.support.SpecificationExceptions;
import reactor.jarjar.com.lmax.disruptor.AlertException;
import reactor.jarjar.com.lmax.disruptor.EventFactory;
import reactor.jarjar.com.lmax.disruptor.EventProcessor;
import reactor.jarjar.com.lmax.disruptor.LiteBlockingWaitStrategy;
import reactor.jarjar.com.lmax.disruptor.RingBuffer;
import reactor.jarjar.com.lmax.disruptor.Sequence;
import reactor.jarjar.com.lmax.disruptor.SequenceBarrier;
import reactor.jarjar.com.lmax.disruptor.TimeoutException;
import reactor.jarjar.com.lmax.disruptor.WaitStrategy;
import reactor.jarjar.com.lmax.disruptor.dsl.ProducerType;

public final class RingBufferWorkProcessor<E>
extends ExecutorPoweredProcessor<E, E> {
    private final Sequence workSequence = new Sequence(-1L);
    private final Queue<Sequence> cancelledSequences = new ConcurrentLinkedQueue<Sequence>();
    private final RingBuffer<MutableSignal<E>> ringBuffer;

    public static <E> RingBufferWorkProcessor<E> create() {
        return RingBufferWorkProcessor.create(RingBufferWorkProcessor.class.getSimpleName(), 32, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> create(boolean autoCancel) {
        return RingBufferWorkProcessor.create(RingBufferWorkProcessor.class.getSimpleName(), 32, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferWorkProcessor<E> create(ExecutorService service) {
        return RingBufferWorkProcessor.create(service, 32, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> create(ExecutorService service, boolean autoCancel) {
        return RingBufferWorkProcessor.create(service, 32, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferWorkProcessor<E> create(String name, int bufferSize) {
        return RingBufferWorkProcessor.create(name, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> create(String name, int bufferSize, boolean autoCancel) {
        return RingBufferWorkProcessor.create(name, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferWorkProcessor<E> create(ExecutorService service, int bufferSize) {
        return RingBufferWorkProcessor.create(service, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> create(ExecutorService service, int bufferSize, boolean autoCancel) {
        return RingBufferWorkProcessor.create(service, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferWorkProcessor<E> create(String name, int bufferSize, WaitStrategy strategy) {
        return RingBufferWorkProcessor.create(name, bufferSize, strategy, true);
    }

    public static <E> RingBufferWorkProcessor<E> create(String name, int bufferSize, WaitStrategy strategy, boolean autoCancel) {
        return new RingBufferWorkProcessor<E>(name, null, bufferSize, strategy, false, autoCancel);
    }

    public static <E> RingBufferWorkProcessor<E> create(ExecutorService executor, int bufferSize, WaitStrategy strategy) {
        return RingBufferWorkProcessor.create(executor, bufferSize, strategy, true);
    }

    public static <E> RingBufferWorkProcessor<E> create(ExecutorService executor, int bufferSize, WaitStrategy strategy, boolean autoCancel) {
        return new RingBufferWorkProcessor<E>(null, executor, bufferSize, strategy, false, autoCancel);
    }

    public static <E> RingBufferWorkProcessor<E> share() {
        return RingBufferWorkProcessor.share(RingBufferWorkProcessor.class.getSimpleName(), 32, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> share(boolean autoCancel) {
        return RingBufferWorkProcessor.share(RingBufferWorkProcessor.class.getSimpleName(), 32, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferWorkProcessor<E> share(ExecutorService service) {
        return RingBufferWorkProcessor.share(service, 32, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> share(ExecutorService service, boolean autoCancel) {
        return RingBufferWorkProcessor.share(service, 32, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferWorkProcessor<E> share(String name, int bufferSize) {
        return RingBufferWorkProcessor.share(name, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> share(String name, int bufferSize, boolean autoCancel) {
        return RingBufferWorkProcessor.share(name, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferWorkProcessor<E> share(ExecutorService service, int bufferSize) {
        return RingBufferWorkProcessor.share(service, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> share(ExecutorService service, int bufferSize, boolean autoCancel) {
        return RingBufferWorkProcessor.share(service, bufferSize, (WaitStrategy)new LiteBlockingWaitStrategy(), autoCancel);
    }

    public static <E> RingBufferWorkProcessor<E> share(String name, int bufferSize, WaitStrategy strategy) {
        return RingBufferWorkProcessor.share(name, bufferSize, strategy, true);
    }

    public static <E> RingBufferWorkProcessor<E> share(String name, int bufferSize, WaitStrategy strategy, boolean autoCancel) {
        return new RingBufferWorkProcessor<E>(name, null, bufferSize, strategy, true, autoCancel);
    }

    public static <E> RingBufferWorkProcessor<E> share(ExecutorService executor, int bufferSize, WaitStrategy strategy) {
        return RingBufferWorkProcessor.share(executor, bufferSize, strategy, true);
    }

    public static <E> RingBufferWorkProcessor<E> share(ExecutorService executor, int bufferSize, WaitStrategy strategy, boolean autoCancel) {
        return new RingBufferWorkProcessor<E>(null, executor, bufferSize, strategy, true, autoCancel);
    }

    private RingBufferWorkProcessor(String name, ExecutorService executor, int bufferSize, WaitStrategy waitStrategy, boolean share, boolean autoCancel) {
        super(name, executor, autoCancel);
        this.ringBuffer = RingBuffer.create(share ? ProducerType.MULTI : ProducerType.SINGLE, new EventFactory<MutableSignal<E>>(){

            @Override
            public MutableSignal<E> newInstance() {
                return new MutableSignal();
            }
        }, bufferSize, waitStrategy);
        this.ringBuffer.addGatingSequences(this.workSequence);
    }

    @Override
    public void subscribe(Subscriber<? super E> subscriber) {
        if (null == subscriber) {
            throw new NullPointerException("Cannot subscribe NULL subscriber");
        }
        try {
            WorkSignalProcessor<E> signalProcessor = new WorkSignalProcessor<E>(subscriber, this);
            ((WorkSignalProcessor)signalProcessor).sequence.set(this.workSequence.get());
            this.ringBuffer.addGatingSequences(((WorkSignalProcessor)signalProcessor).sequence);
            signalProcessor.setSubscription(new RingBufferSubscription(subscriber, signalProcessor));
            this.executor.execute(signalProcessor);
        }
        catch (Throwable t) {
            subscriber.onError(t);
        }
    }

    @Override
    public void onNext(E o) {
        RingBufferSubscriberUtils.onNext(o, this.ringBuffer);
    }

    @Override
    public void onError(Throwable t) {
        RingBufferSubscriberUtils.onError(t, this.ringBuffer);
        for (int i = 1; i < SUBSCRIBER_COUNT.get(this); ++i) {
            RingBufferSubscriberUtils.onError(t, this.ringBuffer);
        }
    }

    @Override
    public void onComplete() {
        RingBufferSubscriberUtils.onComplete(this.ringBuffer);
        for (int i = 0; i < SUBSCRIBER_COUNT.get(this); ++i) {
            RingBufferSubscriberUtils.onComplete(this.ringBuffer);
        }
        super.onComplete();
    }

    public Publisher<Void> writeWith(Publisher<? extends E> source) {
        return RingBufferSubscriberUtils.writeWith(source, this.ringBuffer);
    }

    public String toString() {
        return "RingBufferWorkProcessor{, ringBuffer=" + this.ringBuffer + ", executor=" + this.executor + ", workSequence=" + this.workSequence + ", cancelledSequence=" + this.cancelledSequences + '}';
    }

    @Override
    public long getCapacity() {
        return this.ringBuffer.getBufferSize();
    }

    @Override
    public long getAvailableCapacity() {
        return this.ringBuffer.remainingCapacity();
    }

    private static final class WorkSignalProcessor<T>
    implements EventProcessor {
        private final AtomicBoolean running = new AtomicBoolean(false);
        private final Sequence sequence = new Sequence(-1L);
        private final Sequence pendingRequest = new Sequence(0L);
        private final SequenceBarrier barrier;
        private final RingBufferWorkProcessor<T> processor;
        private final Subscriber<? super T> subscriber;
        private Subscription subscription;

        public WorkSignalProcessor(Subscriber<? super T> subscriber, RingBufferWorkProcessor<T> processor) {
            this.processor = processor;
            this.subscriber = subscriber;
            this.barrier = ((RingBufferWorkProcessor)processor).ringBuffer.newBarrier(new Sequence[0]);
        }

        public Subscription getSubscription() {
            return this.subscription;
        }

        public void setSubscription(Subscription subscription) {
            this.subscription = subscription;
        }

        @Override
        public Sequence getSequence() {
            return this.sequence;
        }

        @Override
        public void halt() {
            this.running.set(false);
            this.barrier.alert();
        }

        @Override
        public boolean isRunning() {
            return this.running.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (!this.running.compareAndSet(false, true)) {
                this.subscriber.onError(new IllegalStateException("Thread is already running"));
                this.processor.decrementSubscribers();
                return;
            }
            this.processor.incrementSubscribers();
            try {
                this.subscriber.onSubscribe(this.subscription);
            }
            catch (Throwable t) {
                this.subscriber.onError(t);
            }
            boolean processedSequence = true;
            long cachedAvailableSequence = Long.MIN_VALUE;
            long nextSequence = this.sequence.get();
            MutableSignal event = null;
            try {
                boolean unbounded;
                if (!RingBufferSubscriberUtils.waitRequestOrTerminalEvent(this.pendingRequest, ((RingBufferWorkProcessor)this.processor).ringBuffer, this.barrier, this.subscriber, this.running)) {
                    ((RingBufferWorkProcessor)this.processor).ringBuffer.removeGatingSequence(this.sequence);
                    return;
                }
                boolean bl = unbounded = this.pendingRequest.get() == Long.MAX_VALUE;
                if (this.replay(unbounded)) {
                    this.running.set(false);
                    return;
                }
                this.barrier.clearAlert();
                while (true) {
                    try {
                        while (true) {
                            if (processedSequence) {
                                processedSequence = false;
                                do {
                                    nextSequence = ((RingBufferWorkProcessor)this.processor).workSequence.get() + 1L;
                                    this.sequence.set(nextSequence - 1L);
                                } while (!((RingBufferWorkProcessor)this.processor).workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                            }
                            if (cachedAvailableSequence >= nextSequence) {
                                event = (MutableSignal)((RingBufferWorkProcessor)this.processor).ringBuffer.get(nextSequence);
                                this.readNextEvent(event, unbounded);
                                RingBufferSubscriberUtils.routeOnce(event, this.subscriber);
                                processedSequence = true;
                                continue;
                            }
                            cachedAvailableSequence = this.barrier.waitFor(nextSequence);
                        }
                    }
                    catch (CancelException ce) {
                        if (event != null && event.type == MutableSignal.Type.NEXT && event.value != null) {
                            this.sequence.set(nextSequence - 1L);
                        } else {
                            this.sequence.set(nextSequence);
                        }
                        ((RingBufferWorkProcessor)this.processor).cancelledSequences.add(this.sequence);
                    }
                    catch (AlertException ex) {
                        if (!this.running.get()) {
                            this.sequence.set(nextSequence - 1L);
                            ((RingBufferWorkProcessor)this.processor).cancelledSequences.add(this.sequence);
                            break;
                        }
                        this.barrier.clearAlert();
                        continue;
                    }
                    catch (Throwable ex) {
                        this.subscriber.onError(ex);
                        this.sequence.set(nextSequence);
                        processedSequence = true;
                        continue;
                    }
                    break;
                }
            }
            finally {
                this.processor.decrementSubscribers();
                this.running.set(false);
            }
        }

        private boolean replay(boolean unbounded) {
            Sequence replayedSequence;
            while ((replayedSequence = (Sequence)((RingBufferWorkProcessor)this.processor).cancelledSequences.poll()) != null) {
                MutableSignal signal = (MutableSignal)((RingBufferWorkProcessor)this.processor).ringBuffer.get(replayedSequence.get() + 1L);
                try {
                    if (signal.value == null) {
                        this.barrier.waitFor(replayedSequence.get() + 1L);
                    }
                    this.readNextEvent(signal, unbounded);
                    RingBufferSubscriberUtils.routeOnce(signal, this.subscriber);
                    ((RingBufferWorkProcessor)this.processor).ringBuffer.removeGatingSequence(replayedSequence);
                }
                catch (InterruptedException | CancelException | AlertException | TimeoutException ce) {
                    ((RingBufferWorkProcessor)this.processor).ringBuffer.removeGatingSequence(this.sequence);
                    ((RingBufferWorkProcessor)this.processor).cancelledSequences.add(replayedSequence);
                    return true;
                }
            }
            return false;
        }

        private void readNextEvent(MutableSignal<T> event, boolean unbounded) throws AlertException {
            if (event.type == MutableSignal.Type.NEXT) {
                if (event.value == null) {
                    return;
                }
                if (!unbounded && this.pendingRequest.addAndGet(-1L) < 0L) {
                    this.pendingRequest.incrementAndGet();
                    while (this.pendingRequest.addAndGet(-1L) < 0L) {
                        this.pendingRequest.incrementAndGet();
                        if (!this.running.get()) {
                            throw CancelException.INSTANCE;
                        }
                        LockSupport.parkNanos(1L);
                    }
                }
            } else if (event.type != null) {
                this.running.set(false);
                RingBufferSubscriberUtils.route(event, this.subscriber);
                Subscription s = this.processor.upstreamSubscription;
                if (s != null) {
                    s.cancel();
                }
                throw CancelException.INSTANCE;
            }
        }
    }

    private final class RingBufferSubscription
    implements Subscription {
        private final Subscriber<? super E> subscriber;
        private final WorkSignalProcessor eventProcessor;

        public RingBufferSubscription(Subscriber<? super E> subscriber, WorkSignalProcessor eventProcessor) {
            this.subscriber = subscriber;
            this.eventProcessor = eventProcessor;
        }

        @Override
        public void request(long n) {
            Subscription parent;
            if (n <= 0L) {
                this.subscriber.onError(SpecificationExceptions.spec_3_09_exception(n));
                return;
            }
            if (!this.eventProcessor.isRunning()) {
                return;
            }
            if (this.eventProcessor.pendingRequest.addAndGet(n) < 0L) {
                this.eventProcessor.pendingRequest.set(Long.MAX_VALUE);
            }
            if ((parent = RingBufferWorkProcessor.this.upstreamSubscription) != null) {
                parent.request(n);
            }
        }

        @Override
        public void cancel() {
            Subscription subscription = RingBufferWorkProcessor.this.upstreamSubscription;
            if (subscription != null && RingBufferWorkProcessor.this.autoCancel && ReactorProcessor.SUBSCRIBER_COUNT.get(RingBufferWorkProcessor.this) - 1 == 0) {
                RingBufferWorkProcessor.this.upstreamSubscription = null;
                subscription.cancel();
            }
            this.eventProcessor.halt();
        }
    }
}

