/*
 * Decompiled with CFR 0.152.
 */
package rx.internal.operators;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.internal.operators.NotificationLite;

public class OperatorOnBackpressureBuffer<T>
implements Observable.Operator<T, T> {
    private final NotificationLite<T> on = NotificationLite.instance();
    private final Long capacity;
    private final Action0 onOverflow;

    public OperatorOnBackpressureBuffer() {
        this.capacity = null;
        this.onOverflow = null;
    }

    public OperatorOnBackpressureBuffer(long capacity) {
        this(capacity, null);
    }

    public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow) {
        if (capacity <= 0L) {
            throw new IllegalArgumentException("Buffer capacity must be > 0");
        }
        this.capacity = capacity;
        this.onOverflow = onOverflow;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> child) {
        final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
        final AtomicLong capacity = this.capacity == null ? null : new AtomicLong(this.capacity);
        final AtomicLong wip = new AtomicLong();
        final AtomicLong requested = new AtomicLong();
        child.setProducer(new Producer(){

            @Override
            public void request(long n) {
                if (requested.getAndAdd(n) == 0L) {
                    OperatorOnBackpressureBuffer.this.pollQueue(wip, requested, capacity, queue, child);
                }
            }
        });
        Subscriber parent = new Subscriber<T>(){
            private AtomicBoolean saturated = new AtomicBoolean(false);

            @Override
            public void onStart() {
                this.request(Long.MAX_VALUE);
            }

            @Override
            public void onCompleted() {
                if (!this.saturated.get()) {
                    queue.offer(OperatorOnBackpressureBuffer.this.on.completed());
                    OperatorOnBackpressureBuffer.this.pollQueue(wip, requested, capacity, queue, child);
                }
            }

            @Override
            public void onError(Throwable e) {
                if (!this.saturated.get()) {
                    queue.offer(OperatorOnBackpressureBuffer.this.on.error(e));
                    OperatorOnBackpressureBuffer.this.pollQueue(wip, requested, capacity, queue, child);
                }
            }

            @Override
            public void onNext(T t) {
                if (!this.assertCapacity()) {
                    return;
                }
                queue.offer(OperatorOnBackpressureBuffer.this.on.next(t));
                OperatorOnBackpressureBuffer.this.pollQueue(wip, requested, capacity, queue, child);
            }

            private boolean assertCapacity() {
                long currCapacity;
                if (capacity == null) {
                    return true;
                }
                do {
                    if ((currCapacity = capacity.get()) > 0L) continue;
                    if (this.saturated.compareAndSet(false, true)) {
                        this.unsubscribe();
                        child.onError(new MissingBackpressureException("Overflowed buffer of " + OperatorOnBackpressureBuffer.this.capacity));
                        if (OperatorOnBackpressureBuffer.this.onOverflow != null) {
                            OperatorOnBackpressureBuffer.this.onOverflow.call();
                        }
                    }
                    return false;
                } while (!capacity.compareAndSet(currCapacity, currCapacity - 1L));
                return true;
            }
        };
        child.add(parent);
        return parent;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pollQueue(AtomicLong wip, AtomicLong requested, AtomicLong capacity, Queue<Object> queue, Subscriber<? super T> child) {
        if (requested.get() > 0L) {
            try {
                if (wip.getAndIncrement() == 0L) {
                    while (requested.getAndDecrement() != 0L) {
                        Object o = queue.poll();
                        if (o == null) {
                            requested.incrementAndGet();
                            return;
                        }
                        if (capacity != null) {
                            capacity.incrementAndGet();
                        }
                        this.on.accept(child, o);
                    }
                    requested.incrementAndGet();
                    return;
                }
            }
            finally {
                wip.decrementAndGet();
            }
        }
    }
}

