/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import io.atleon.core.Alo;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFailureStrategy;
import io.atleon.core.ComposedSubscription;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class AcknowledgingPublisher<T>
implements Publisher<Alo<T>> {
    private final Alo<Publisher<T>> aloSource;
    private final AtomicBoolean subscribedOnce = new AtomicBoolean(false);

    private AcknowledgingPublisher(Alo<Publisher<T>> aloSource) {
        this.aloSource = aloSource;
    }

    public static <T> Publisher<Alo<T>> fromAloPublisher(Alo<Publisher<T>> aloPublisher) {
        return new AcknowledgingPublisher<T>(aloPublisher);
    }

    public void subscribe(Subscriber<? super Alo<T>> subscriber) {
        if (!this.subscribedOnce.compareAndSet(false, true)) {
            throw new IllegalStateException("AcknowledgingPublisher may only be subscribed to once");
        }
        AcknowledgingSubscriber<T> acknowledgingSubscriber = new AcknowledgingSubscriber<T>(this.aloSource, subscriber);
        this.aloSource.runInContext(() -> this.aloSource.get().subscribe(acknowledgingSubscriber));
    }

    private static final class AcknowledgingSubscriber<T>
    implements Subscriber<T> {
        private final AtomicReference<State> state = new AtomicReference<State>(State.ACTIVE);
        private final Collection<Reference<T>> unacknowledged = Collections.newSetFromMap(new IdentityHashMap());
        private final Alo<Publisher<T>> aloSource;
        private final AloFactory<T> factory;
        private final Subscriber<? super Alo<T>> subscriber;

        public AcknowledgingSubscriber(Alo<Publisher<T>> aloSource, Subscriber<? super Alo<T>> subscriber) {
            this.aloSource = aloSource;
            this.factory = aloSource.propagator();
            this.subscriber = subscriber;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscriber.onSubscribe((Subscription)new ComposedSubscription(arg_0 -> ((Subscription)subscription).request(arg_0), this.decorateCancellation(subscription)));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onNext(T value) {
            WeakReference<T> valueReference = new WeakReference<T>(Objects.requireNonNull(value, "Empty value emitted - Adhering to ReactiveStreams rule 2.13"));
            Collection<Reference<T>> collection = this.unacknowledged;
            synchronized (collection) {
                if (this.state.get() == State.ACTIVE) {
                    this.unacknowledged.add(valueReference);
                }
            }
            this.subscriber.onNext(this.wrap(value, valueReference));
        }

        public void onError(Throwable error) {
            Throwable errorToEmit;
            AtomicReference<Object> errorToEmitReference = new AtomicReference<Object>(null);
            if (!AloFailureStrategy.choose(this.subscriber).process(this.aloSource, error, errorToEmitReference::set)) {
                this.maybeExecuteNacknowledger(error);
            }
            if ((errorToEmit = (Throwable)errorToEmitReference.get()) != null) {
                this.subscriber.onError(errorToEmit);
            } else {
                this.onComplete();
            }
        }

        public void onComplete() {
            if (this.state.compareAndSet(State.ACTIVE, State.IN_FLIGHT)) {
                this.maybeExecuteAcknowledger();
            }
            this.subscriber.onComplete();
        }

        private Runnable decorateCancellation(Subscription subscription) {
            return () -> {
                subscription.cancel();
                if (this.state.compareAndSet(State.ACTIVE, State.IN_FLIGHT)) {
                    this.maybeExecuteAcknowledger();
                }
            };
        }

        private Alo<T> wrap(T value, Reference<T> valueReference) {
            return this.factory.create(value, () -> {
                Collection<Reference<T>> collection = this.unacknowledged;
                synchronized (collection) {
                    if (this.unacknowledged.remove(valueReference)) {
                        this.maybeExecuteAcknowledger();
                    }
                }
            }, error -> {
                Collection<Reference<T>> collection = this.unacknowledged;
                synchronized (collection) {
                    if (this.unacknowledged.contains(valueReference)) {
                        this.maybeExecuteNacknowledger((Throwable)error);
                    }
                }
            });
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void maybeExecuteAcknowledger() {
            Collection<Reference<T>> collection = this.unacknowledged;
            synchronized (collection) {
                if (this.unacknowledged.isEmpty() && this.state.compareAndSet(State.IN_FLIGHT, State.EXECUTED)) {
                    Alo.acknowledge(this.aloSource);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void maybeExecuteNacknowledger(Throwable error) {
            Collection<Reference<T>> collection = this.unacknowledged;
            synchronized (collection) {
                if (this.state.compareAndSet(State.ACTIVE, State.EXECUTED) || this.state.compareAndSet(State.IN_FLIGHT, State.EXECUTED)) {
                    this.unacknowledged.clear();
                    Alo.nacknowledge(this.aloSource, error);
                }
            }
        }

        private static enum State {
            ACTIVE,
            IN_FLIGHT,
            EXECUTED;

        }
    }
}

