/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.client.pool;

import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import io.reactivex.netty.client.ClientConnectionToChannelBridge;
import io.reactivex.netty.client.pool.FIFOIdleConnectionsHolder;
import io.reactivex.netty.client.pool.IdleConnectionsHolder;
import io.reactivex.netty.client.pool.PooledConnection;
import io.reactivex.netty.threads.PreferCurrentEventLoopGroup;
import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Actions;
import rx.functions.Func0;

public class PreferCurrentEventLoopHolder<W, R>
extends IdleConnectionsHolder<W, R> {
    private static final Logger logger = LoggerFactory.getLogger(PreferCurrentEventLoopHolder.class);
    private final FastThreadLocal<IdleConnectionsHolder<W, R>> perElHolder = new FastThreadLocal();
    private final ArrayList<IdleConnectionsHolder<W, R>> allElHolders;
    private final Observable<PooledConnection<R, W>> pollObservable;
    private final Observable<PooledConnection<R, W>> peekObservable;

    PreferCurrentEventLoopHolder(PreferCurrentEventLoopGroup eventLoopGroup) {
        this(eventLoopGroup, new FIFOIdleConnectionsHolderFactory());
    }

    PreferCurrentEventLoopHolder(PreferCurrentEventLoopGroup eventLoopGroup, IdleConnectionsHolderFactory<W, R> holderFactory) {
        ArrayList _allElHolders = new ArrayList();
        this.allElHolders = _allElHolders;
        for (EventExecutor child : eventLoopGroup) {
            final IdleConnectionsHolder newHolder = (IdleConnectionsHolder)holderFactory.call();
            this.allElHolders.add(newHolder);
            child.submit(new Runnable(){

                @Override
                public void run() {
                    PreferCurrentEventLoopHolder.this.perElHolder.set(newHolder);
                }
            });
        }
        Observable<PooledConnection<R, W>> pollOverAllHolders = Observable.empty();
        Observable<PooledConnection<R, W>> peekOverAllHolders = Observable.empty();
        for (IdleConnectionsHolder<W, R> anElHolder : this.allElHolders) {
            pollOverAllHolders = pollOverAllHolders.concatWith(anElHolder.poll());
            peekOverAllHolders = peekOverAllHolders.concatWith(anElHolder.peek());
        }
        this.pollObservable = pollOverAllHolders;
        this.peekObservable = peekOverAllHolders;
    }

    @Override
    public Observable<PooledConnection<R, W>> poll() {
        return this.pollObservable;
    }

    @Override
    public Observable<PooledConnection<R, W>> pollThisEventLoopConnections() {
        return Observable.create(new Observable.OnSubscribe<PooledConnection<R, W>>(){

            @Override
            public void call(Subscriber<? super PooledConnection<R, W>> subscriber) {
                IdleConnectionsHolder holderForThisEL = (IdleConnectionsHolder)PreferCurrentEventLoopHolder.this.perElHolder.get();
                if (null == holderForThisEL) {
                    PreferCurrentEventLoopHolder.super.pollThisEventLoopConnections().unsafeSubscribe(subscriber);
                } else {
                    holderForThisEL.poll().unsafeSubscribe(subscriber);
                }
            }
        });
    }

    @Override
    public Observable<PooledConnection<R, W>> peek() {
        return this.peekObservable;
    }

    @Override
    public void add(final PooledConnection<R, W> toAdd) {
        IdleConnectionsHolder<W, R> holderForThisEL = this.perElHolder.get();
        if (null != holderForThisEL) {
            holderForThisEL.add(toAdd);
        } else {
            toAdd.unsafeNettyChannel().eventLoop().execute(new Runnable(){

                @Override
                public void run() {
                    IdleConnectionsHolder holderForThisEl = (IdleConnectionsHolder)PreferCurrentEventLoopHolder.this.perElHolder.get();
                    if (null == holderForThisEl) {
                        logger.error("Unrecognized eventloop: " + Thread.currentThread().getName() + ". Returned connection can not be added to the pool. Closing the connection.");
                        toAdd.unsafeNettyChannel().attr(ClientConnectionToChannelBridge.DISCARD_CONNECTION).set(true);
                        toAdd.close().subscribe(Actions.empty(), new Action1<Throwable>(){

                            @Override
                            public void call(Throwable throwable) {
                                logger.error("Failed to discard connection.", throwable);
                            }
                        });
                    } else {
                        holderForThisEl.add(toAdd);
                    }
                }
            });
        }
    }

    @Override
    public boolean remove(PooledConnection<R, W> toRemove) {
        for (IdleConnectionsHolder<W, R> anElHolder : this.allElHolders) {
            if (!anElHolder.remove(toRemove)) continue;
            return true;
        }
        return false;
    }

    private static class FIFOIdleConnectionsHolderFactory<W, R>
    implements IdleConnectionsHolderFactory<W, R> {
        private FIFOIdleConnectionsHolderFactory() {
        }

        @Override
        public IdleConnectionsHolder<W, R> call() {
            return new FIFOIdleConnectionsHolder();
        }
    }

    public static interface IdleConnectionsHolderFactory<W, R>
    extends Func0<IdleConnectionsHolder<W, R>> {
    }
}

