/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.ProcessorSignalsConsumer;
import io.servicetalk.concurrent.api.PublisherProcessorSignalsHolder;
import io.servicetalk.concurrent.api.SubscribableSources;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.ConcurrentUtils;
import io.servicetalk.concurrent.internal.DelayedSubscription;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class PublisherProcessor<T>
extends SubscribableSources.SubscribablePublisher<T>
implements PublisherSource.Processor<T, T>,
PublisherSource.Subscription {
    private static final Logger LOGGER = LoggerFactory.getLogger(PublisherProcessor.class);
    private static final ProcessorSignalsConsumer CANCELLED = new NoopProcessorSignalsConsumer();
    private static final AtomicReferenceFieldUpdater<PublisherProcessor, ProcessorSignalsConsumer> consumerUpdater = AtomicReferenceFieldUpdater.newUpdater(PublisherProcessor.class, ProcessorSignalsConsumer.class, "consumer");
    private static final AtomicIntegerFieldUpdater<PublisherProcessor> emittingUpdater = AtomicIntegerFieldUpdater.newUpdater(PublisherProcessor.class, "emitting");
    private static final AtomicLongFieldUpdater<PublisherProcessor> pendingUpdater = AtomicLongFieldUpdater.newUpdater(PublisherProcessor.class, "pending");
    private final DelayedSubscription delayedSubscription;
    private final PublisherProcessorSignalsHolder<T> buffer;
    @Nullable
    private Throwable fatalError;
    @Nullable
    private volatile ProcessorSignalsConsumer<T> consumer;
    private volatile int emitting;
    private volatile long pending;

    PublisherProcessor(PublisherProcessorSignalsHolder<T> buffer) {
        this.buffer = Objects.requireNonNull(buffer);
        this.delayedSubscription = new DelayedSubscription();
    }

    public void onSubscribe(PublisherSource.Subscription subscription) {
        this.delayedSubscription.delayedSubscription((PublisherSource.Subscription)ConcurrentSubscription.wrap((PublisherSource.Subscription)subscription));
    }

    public void onNext(@Nullable T t) {
        this.buffer.add(t);
        this.tryEmitSignals();
    }

    public void onError(Throwable t) {
        this.buffer.terminate(t);
        this.tryEmitSignals();
    }

    public void onComplete() {
        this.buffer.terminate();
        this.tryEmitSignals();
    }

    @Override
    protected void handleSubscribe(PublisherSource.Subscriber<? super T> subscriber) {
        DelayedSubscription delayedSubscription = new DelayedSubscription();
        try {
            subscriber.onSubscribe((PublisherSource.Subscription)delayedSubscription);
        }
        catch (Throwable t) {
            SubscriberUtils.handleExceptionFromOnSubscribe(subscriber, (Throwable)t);
            return;
        }
        if (consumerUpdater.compareAndSet(this, null, new SubscriberProcessorSignalsConsumer<T>(subscriber))) {
            try {
                delayedSubscription.delayedSubscription((PublisherSource.Subscription)this);
                this.tryEmitSignals();
            }
            catch (Throwable t) {
                LOGGER.error("Unexpected error while delivering signals to the subscriber {}", subscriber, (Object)t);
            }
        } else {
            ProcessorSignalsConsumer<T> existingConsumer = this.consumer;
            assert (existingConsumer != null);
            PublisherSource.Subscriber existingSubscriber = existingConsumer instanceof SubscriberProcessorSignalsConsumer ? ((SubscriberProcessorSignalsConsumer)existingConsumer).subscriber : null;
            SubscriberUtils.safeOnError(subscriber, (Throwable)new DuplicateSubscribeException((Object)existingSubscriber, subscriber));
        }
    }

    public void request(long n) {
        if (!SubscriberUtils.isRequestNValid((long)n)) {
            this.fatalError = SubscriberUtils.newExceptionForInvalidRequestN((long)n);
        } else {
            pendingUpdater.accumulateAndGet(this, n, FlowControlUtils::addWithOverflowProtectionIfNotNegative);
            this.delayedSubscription.request(n);
        }
        this.tryEmitSignals();
    }

    public void cancel() {
        if (pendingUpdater.getAndSet(this, Long.MIN_VALUE) >= 0L) {
            ProcessorSignalsConsumer cancelled;
            this.consumer = cancelled = CANCELLED;
            this.delayedSubscription.cancel();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryEmitSignals() {
        boolean tryAcquire = true;
        while (tryAcquire && ConcurrentUtils.tryAcquireLock(emittingUpdater, (Object)this)) {
            block4: {
                SubscriberProcessorSignalsConsumer target;
                block5: {
                    ProcessorSignalsConsumer<T> consumer = this.consumer;
                    try {
                        if (!(consumer instanceof SubscriberProcessorSignalsConsumer)) break block4;
                        target = (SubscriberProcessorSignalsConsumer)consumer;
                        if (this.fatalError == null) break block5;
                        this.earlyTerminateConsumerHoldingLock(target, this.fatalError);
                    }
                    catch (Throwable throwable) {
                        tryAcquire = !ConcurrentUtils.releaseLock(emittingUpdater, (Object)this);
                        throw throwable;
                    }
                    tryAcquire = !ConcurrentUtils.releaseLock(emittingUpdater, (Object)this);
                    return;
                }
                this.emitSignalsHoldingLock(target);
            }
            tryAcquire = !ConcurrentUtils.releaseLock(emittingUpdater, (Object)this);
        }
    }

    private void emitSignalsHoldingLock(SubscriberProcessorSignalsConsumer<T> target) {
        while (true) {
            long cPending;
            if ((cPending = this.pending) > 0L && pendingUpdater.compareAndSet(this, cPending, cPending - 1L)) {
                boolean consumed;
                try {
                    consumed = this.buffer.tryConsume(target);
                }
                catch (Throwable t) {
                    this.earlyTerminateConsumerHoldingLock(target, t);
                    return;
                }
                if (target.isTerminated()) {
                    this.pending = Long.MIN_VALUE;
                    continue;
                }
                if (consumed) continue;
                pendingUpdater.accumulateAndGet(this, 1L, FlowControlUtils::addWithOverflowProtectionIfNotNegative);
                return;
            }
            if (cPending < 0L) {
                return;
            }
            if (cPending == 0L) break;
        }
        try {
            if (this.buffer.tryConsumeTerminal(target)) {
                this.pending = Long.MIN_VALUE;
            }
        }
        catch (Throwable t) {
            this.earlyTerminateConsumerHoldingLock(target, t);
        }
    }

    private void earlyTerminateConsumerHoldingLock(SubscriberProcessorSignalsConsumer<T> consumer, Throwable cause) {
        this.pending = Long.MIN_VALUE;
        try {
            this.delayedSubscription.cancel();
        }
        finally {
            consumer.consumeTerminal(cause);
        }
    }

    private static final class SubscriberProcessorSignalsConsumer<T>
    implements ProcessorSignalsConsumer<T> {
        private final PublisherSource.Subscriber<? super T> subscriber;
        private boolean terminated;

        SubscriberProcessorSignalsConsumer(PublisherSource.Subscriber<? super T> subscriber) {
            this.subscriber = subscriber;
        }

        @Override
        public void consumeItem(@Nullable T item) {
            this.subscriber.onNext(item);
        }

        @Override
        public void consumeTerminal(Throwable cause) {
            this.terminated = true;
            SubscriberUtils.safeOnError(this.subscriber, (Throwable)cause);
        }

        @Override
        public void consumeTerminal() {
            this.terminated = true;
            SubscriberUtils.safeOnComplete(this.subscriber);
        }

        boolean isTerminated() {
            return this.terminated;
        }
    }

    private static final class NoopProcessorSignalsConsumer
    implements ProcessorSignalsConsumer {
        private NoopProcessorSignalsConsumer() {
        }

        public void consumeItem(@Nullable Object item) {
        }

        @Override
        public void consumeTerminal(Throwable cause) {
        }

        @Override
        public void consumeTerminal() {
        }
    }
}

