/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.loadbalance;

import io.netty.util.ReferenceCountUtil;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.frame.FrameType;
import io.rsocket.loadbalance.FluxDeferredResolution;
import io.rsocket.loadbalance.LoadbalanceStrategy;
import io.rsocket.loadbalance.LoadbalanceTarget;
import io.rsocket.loadbalance.MonoDeferredResolution;
import io.rsocket.loadbalance.PooledWeightedRSocket;
import io.rsocket.loadbalance.ResolvingOperator;
import io.rsocket.loadbalance.Stats;
import io.rsocket.loadbalance.WeightedLoadbalanceStrategy;
import io.rsocket.loadbalance.WeightedRSocket;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.annotation.Nullable;

class RSocketPool
extends ResolvingOperator<Void>
implements CoreSubscriber<List<LoadbalanceTarget>>,
List<RSocket> {
    final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket(this);
    final RSocketConnector connector;
    final LoadbalanceStrategy loadbalanceStrategy;
    final Supplier<Stats> statsSupplier;
    volatile PooledWeightedRSocket[] activeSockets;
    static final AtomicReferenceFieldUpdater<RSocketPool, PooledWeightedRSocket[]> ACTIVE_SOCKETS = AtomicReferenceFieldUpdater.newUpdater(RSocketPool.class, PooledWeightedRSocket[].class, "activeSockets");
    static final PooledWeightedRSocket[] EMPTY = new PooledWeightedRSocket[0];
    static final PooledWeightedRSocket[] TERMINATED = new PooledWeightedRSocket[0];
    volatile Subscription s;
    static final AtomicReferenceFieldUpdater<RSocketPool, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(RSocketPool.class, Subscription.class, "s");

    public RSocketPool(RSocketConnector connector, Publisher<List<LoadbalanceTarget>> targetPublisher, LoadbalanceStrategy loadbalanceStrategy) {
        this.connector = connector;
        this.loadbalanceStrategy = loadbalanceStrategy;
        this.statsSupplier = loadbalanceStrategy instanceof WeightedLoadbalanceStrategy ? Stats::create : Stats::noOps;
        ACTIVE_SOCKETS.lazySet(this, EMPTY);
        targetPublisher.subscribe((Subscriber)this);
    }

    @Override
    protected void doOnDispose() {
        RSocket[] activeSockets;
        Operators.terminate(S, (Object)this);
        for (RSocket rSocket : activeSockets = (RSocket[])ACTIVE_SOCKETS.getAndSet(this, TERMINATED)) {
            rSocket.dispose();
        }
    }

    public void onSubscribe(Subscription s) {
        if (Operators.setOnce(S, (Object)this, (Subscription)s)) {
            s.request(Long.MAX_VALUE);
        }
    }

    public void onNext(List<LoadbalanceTarget> targets) {
        PooledWeightedRSocket[] nextActiveSockets;
        int position;
        PooledWeightedRSocket[] activeSockets;
        PooledWeightedRSocket[] previouslyActiveSockets;
        if (this.isDisposed()) {
            return;
        }
        do {
            HashMap<LoadbalanceTarget, Integer> rSocketSuppliersCopy = new HashMap<LoadbalanceTarget, Integer>();
            int j = 0;
            for (LoadbalanceTarget target : targets) {
                rSocketSuppliersCopy.put(target, j++);
            }
            previouslyActiveSockets = this.activeSockets;
            nextActiveSockets = new PooledWeightedRSocket[previouslyActiveSockets.length + rSocketSuppliersCopy.size()];
            position = 0;
            for (int i = 0; i < previouslyActiveSockets.length; ++i) {
                PooledWeightedRSocket rSocket = previouslyActiveSockets[i];
                Integer index = (Integer)rSocketSuppliersCopy.remove(rSocket.target());
                if (index == null) {
                    if (rSocket.isDisposed()) continue;
                    rSocket.dispose();
                    continue;
                }
                if (!rSocket.isDisposed()) {
                    nextActiveSockets[position++] = rSocket;
                    continue;
                }
                LoadbalanceTarget target = targets.get(index);
                nextActiveSockets[position++] = new PooledWeightedRSocket(this, this.connector.connect(target.getTransport()), target, this.statsSupplier.get());
            }
            for (LoadbalanceTarget target : rSocketSuppliersCopy.keySet()) {
                nextActiveSockets[position++] = new PooledWeightedRSocket(this, this.connector.connect(target.getTransport()), target, this.statsSupplier.get());
            }
        } while (!ACTIVE_SOCKETS.compareAndSet(this, previouslyActiveSockets, activeSockets = position == 0 ? EMPTY : Arrays.copyOf(nextActiveSockets, position)));
        if (this.isPending() && activeSockets != EMPTY) {
            this.complete(null);
        }
    }

    public void onError(Throwable t) {
        S.set(this, Operators.cancelledSubscription());
        this.terminate(t);
    }

    public void onComplete() {
        S.set(this, Operators.cancelledSubscription());
    }

    RSocket select() {
        if (this.isDisposed()) {
            return this.deferredResolutionRSocket;
        }
        RSocket selected = this.doSelect();
        if (selected == null) {
            if (this.s == Operators.cancelledSubscription()) {
                this.terminate(new CancellationException("Pool is exhausted"));
            } else {
                this.invalidate();
            }
            return this.deferredResolutionRSocket;
        }
        return selected;
    }

    @Nullable
    RSocket doSelect() {
        PooledWeightedRSocket[] sockets = this.activeSockets;
        if (sockets == EMPTY) {
            return null;
        }
        return this.loadbalanceStrategy.select(this);
    }

    @Override
    public WeightedRSocket get(int index) {
        return this.activeSockets[index];
    }

    @Override
    public int size() {
        return this.activeSockets.length;
    }

    @Override
    public boolean isEmpty() {
        return this.activeSockets.length == 0;
    }

    @Override
    public Object[] toArray() {
        return this.activeSockets;
    }

    @Override
    public <T> T[] toArray(T[] a) {
        return this.activeSockets;
    }

    @Override
    public boolean contains(Object o) {
        throw new UnsupportedOperationException();
    }

    @Override
    public Iterator<RSocket> iterator() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean add(RSocket weightedRSocket) {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean remove(Object o) {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean addAll(Collection<? extends RSocket> c) {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean addAll(int index, Collection<? extends RSocket> c) {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void clear() {
        throw new UnsupportedOperationException();
    }

    @Override
    public WeightedRSocket set(int index, RSocket element) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void add(int index, RSocket element) {
        throw new UnsupportedOperationException();
    }

    @Override
    public WeightedRSocket remove(int index) {
        throw new UnsupportedOperationException();
    }

    @Override
    public int indexOf(Object o) {
        throw new UnsupportedOperationException();
    }

    @Override
    public int lastIndexOf(Object o) {
        throw new UnsupportedOperationException();
    }

    @Override
    public ListIterator<RSocket> listIterator() {
        throw new UnsupportedOperationException();
    }

    @Override
    public ListIterator<RSocket> listIterator(int index) {
        throw new UnsupportedOperationException();
    }

    @Override
    public List<RSocket> subList(int fromIndex, int toIndex) {
        throw new UnsupportedOperationException();
    }

    static final class FluxInner<INPUT>
    extends FluxDeferredResolution<INPUT, Void> {
        FluxInner(RSocketPool parent, INPUT fluxOrPayload, FrameType requestType) {
            super(parent, fluxOrPayload, requestType);
        }

        @Override
        public void accept(Void aVoid, Throwable t) {
            if (this.isTerminated()) {
                return;
            }
            if (t != null) {
                if (this.requestType == FrameType.REQUEST_STREAM) {
                    ReferenceCountUtil.safeRelease((Object)this.fluxOrPayload);
                }
                this.onError(t);
                return;
            }
            RSocketPool parent = (RSocketPool)this.parent;
            RSocket rSocket = parent.doSelect();
            if (rSocket != null) {
                Flux<Payload> source;
                switch (this.requestType) {
                    case REQUEST_STREAM: {
                        source = rSocket.requestStream((Payload)this.fluxOrPayload);
                        break;
                    }
                    case REQUEST_CHANNEL: {
                        source = rSocket.requestChannel((Publisher<Payload>)((Flux)this.fluxOrPayload));
                        break;
                    }
                    default: {
                        Operators.error((Subscriber)this.actual, (Throwable)new IllegalStateException("Should never happen"));
                        return;
                    }
                }
                source.subscribe((CoreSubscriber)this);
            } else {
                parent.add(this);
            }
        }
    }

    static final class MonoInner<T>
    extends MonoDeferredResolution<T, Void> {
        MonoInner(RSocketPool parent, Payload payload, FrameType requestType) {
            super(parent, payload, requestType);
        }

        @Override
        public void accept(Void aVoid, Throwable t) {
            if (this.isTerminated()) {
                return;
            }
            if (t != null) {
                ReferenceCountUtil.safeRelease((Object)this.payload);
                this.onError(t);
                return;
            }
            RSocketPool parent = (RSocketPool)this.parent;
            RSocket rSocket = parent.doSelect();
            if (rSocket != null) {
                Object source;
                switch (this.requestType) {
                    case REQUEST_FNF: {
                        source = rSocket.fireAndForget(this.payload);
                        break;
                    }
                    case REQUEST_RESPONSE: {
                        source = rSocket.requestResponse(this.payload);
                        break;
                    }
                    case METADATA_PUSH: {
                        source = rSocket.metadataPush(this.payload);
                        break;
                    }
                    default: {
                        Operators.error((Subscriber)this.actual, (Throwable)new IllegalStateException("Should never happen"));
                        return;
                    }
                }
                source.subscribe((CoreSubscriber)this);
            } else {
                parent.add(this);
            }
        }
    }

    static class DeferredResolutionRSocket
    implements RSocket {
        final RSocketPool parent;

        DeferredResolutionRSocket(RSocketPool parent) {
            this.parent = parent;
        }

        @Override
        public Mono<Void> fireAndForget(Payload payload) {
            return new MonoInner<Void>(this.parent, payload, FrameType.REQUEST_FNF);
        }

        @Override
        public Mono<Payload> requestResponse(Payload payload) {
            return new MonoInner<Payload>(this.parent, payload, FrameType.REQUEST_RESPONSE);
        }

        @Override
        public Flux<Payload> requestStream(Payload payload) {
            return new FluxInner<Payload>(this.parent, payload, FrameType.REQUEST_STREAM);
        }

        @Override
        public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
            return new FluxInner<Publisher<Payload>>(this.parent, payloads, FrameType.REQUEST_STREAM);
        }

        @Override
        public Mono<Void> metadataPush(Payload payload) {
            return new MonoInner<Void>(this.parent, payload, FrameType.METADATA_PUSH);
        }
    }
}

