/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.endpoint;

import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.ResponseHandler;
import com.couchbase.client.core.endpoint.BootstrapAdapter;
import com.couchbase.client.core.endpoint.Endpoint;
import com.couchbase.client.core.endpoint.SSLEngineFactory;
import com.couchbase.client.core.endpoint.kv.AuthenticationException;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.internal.SignalConfigReload;
import com.couchbase.client.core.message.internal.SignalFlush;
import com.couchbase.client.core.state.AbstractStateMachine;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.core.state.NotConnectedException;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import com.couchbase.client.deps.io.netty.bootstrap.Bootstrap;
import com.couchbase.client.deps.io.netty.buffer.AbstractByteBufAllocator;
import com.couchbase.client.deps.io.netty.buffer.PooledByteBufAllocator;
import com.couchbase.client.deps.io.netty.buffer.UnpooledByteBufAllocator;
import com.couchbase.client.deps.io.netty.channel.Channel;
import com.couchbase.client.deps.io.netty.channel.ChannelFuture;
import com.couchbase.client.deps.io.netty.channel.ChannelFutureListener;
import com.couchbase.client.deps.io.netty.channel.ChannelHandler;
import com.couchbase.client.deps.io.netty.channel.ChannelInitializer;
import com.couchbase.client.deps.io.netty.channel.ChannelOption;
import com.couchbase.client.deps.io.netty.channel.ChannelPipeline;
import com.couchbase.client.deps.io.netty.channel.ConnectTimeoutException;
import com.couchbase.client.deps.io.netty.channel.epoll.EpollEventLoopGroup;
import com.couchbase.client.deps.io.netty.channel.epoll.EpollSocketChannel;
import com.couchbase.client.deps.io.netty.channel.oio.OioEventLoopGroup;
import com.couchbase.client.deps.io.netty.channel.socket.nio.NioSocketChannel;
import com.couchbase.client.deps.io.netty.channel.socket.oio.OioSocketChannel;
import com.couchbase.client.deps.io.netty.handler.logging.LogLevel;
import com.couchbase.client.deps.io.netty.handler.logging.LoggingHandler;
import com.couchbase.client.deps.io.netty.handler.ssl.SslHandler;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLHandshakeException;
import rx.Observable;
import rx.Subscriber;
import rx.subjects.AsyncSubject;
import rx.subjects.Subject;

public abstract class AbstractEndpoint
extends AbstractStateMachine<LifecycleState>
implements Endpoint {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Endpoint.class);
    private static final ChannelHandler LOGGING_HANDLER_INSTANCE = new LoggingHandler(LogLevel.TRACE);
    private static final NotConnectedException NOT_CONNECTED_EXCEPTION = new NotConnectedException();
    private final BootstrapAdapter bootstrap;
    private final String bucket;
    private final String password;
    private final RingBuffer<ResponseEvent> responseBuffer;
    private final CoreEnvironment env;
    private final boolean isTransient;
    private SSLEngineFactory sslEngineFactory;
    private volatile Channel channel;
    private volatile boolean hasWritten;
    private volatile long reconnectAttempt = 1L;
    private volatile boolean disconnected;

    protected AbstractEndpoint(String bucket, String password, BootstrapAdapter adapter, boolean isTransient) {
        super(LifecycleState.DISCONNECTED);
        this.bootstrap = adapter;
        this.bucket = bucket;
        this.password = password;
        this.responseBuffer = null;
        this.env = null;
        this.isTransient = isTransient;
        this.disconnected = false;
    }

    protected AbstractEndpoint(String hostname, String bucket, String password, int port, final CoreEnvironment environment, RingBuffer<ResponseEvent> responseBuffer, boolean isTransient) {
        super(LifecycleState.DISCONNECTED);
        this.bucket = bucket;
        this.password = password;
        this.responseBuffer = responseBuffer;
        this.env = environment;
        this.isTransient = isTransient;
        if (environment.sslEnabled()) {
            this.sslEngineFactory = new SSLEngineFactory(environment);
        }
        Class channelClass = NioSocketChannel.class;
        if (environment.ioPool() instanceof EpollEventLoopGroup) {
            channelClass = EpollSocketChannel.class;
        } else if (environment.ioPool() instanceof OioEventLoopGroup) {
            channelClass = OioSocketChannel.class;
        }
        AbstractByteBufAllocator allocator = this.env.bufferPoolingEnabled() ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT;
        boolean tcpNodelay = this.environment().tcpNodelayEnabled();
        this.bootstrap = new BootstrapAdapter((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().remoteAddress(hostname, port).group(environment.ioPool())).channel(channelClass)).option(ChannelOption.ALLOCATOR, allocator)).option(ChannelOption.TCP_NODELAY, tcpNodelay)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.env.socketConnectTimeout())).handler(new ChannelInitializer<Channel>(){

            @Override
            protected void initChannel(Channel channel) throws Exception {
                ChannelPipeline pipeline = channel.pipeline();
                if (environment.sslEnabled()) {
                    pipeline.addLast(new SslHandler(AbstractEndpoint.this.sslEngineFactory.get()));
                }
                if (LOGGER.isTraceEnabled()) {
                    pipeline.addLast(LOGGING_HANDLER_INSTANCE);
                }
                AbstractEndpoint.this.customEndpointHandlers(pipeline);
            }
        }));
    }

    protected abstract void customEndpointHandlers(ChannelPipeline var1);

    @Override
    public Observable<LifecycleState> connect() {
        return this.connect(true);
    }

    protected Observable<LifecycleState> connect(boolean bootstrapping) {
        if (this.state() != LifecycleState.DISCONNECTED) {
            return Observable.just(this.state());
        }
        AsyncSubject observable = AsyncSubject.create();
        this.transitionState(LifecycleState.CONNECTING);
        this.hasWritten = false;
        this.doConnect((Subject<LifecycleState, LifecycleState>)observable, bootstrapping);
        return observable;
    }

    protected void doConnect(final Subject<LifecycleState, LifecycleState> observable, final boolean bootstrapping) {
        this.bootstrap.connect().addListener(new ChannelFutureListener(){

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (AbstractEndpoint.this.state() == LifecycleState.DISCONNECTING || AbstractEndpoint.this.state() == LifecycleState.DISCONNECTED) {
                    LOGGER.debug(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Endpoint connect completed, but got instructed to disconnect in the meantime.");
                    AbstractEndpoint.this.transitionState(LifecycleState.DISCONNECTED);
                    AbstractEndpoint.this.channel = null;
                } else if (future.isSuccess()) {
                    AbstractEndpoint.this.channel = future.channel();
                    LOGGER.debug(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Connected Endpoint.");
                    AbstractEndpoint.this.transitionState(LifecycleState.CONNECTED);
                } else {
                    if (future.cause() instanceof AuthenticationException) {
                        LOGGER.warn(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Authentication Failure.");
                        AbstractEndpoint.this.transitionState(LifecycleState.DISCONNECTED);
                        observable.onError(future.cause());
                    } else if (future.cause() instanceof SSLHandshakeException) {
                        LOGGER.warn(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "SSL Handshake Failure during connect.");
                        AbstractEndpoint.this.transitionState(LifecycleState.DISCONNECTED);
                        observable.onError(future.cause());
                    } else if (future.cause() instanceof ClosedChannelException) {
                        LOGGER.warn(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Generic Failure.");
                        AbstractEndpoint.this.transitionState(LifecycleState.DISCONNECTED);
                        LOGGER.warn(future.cause().getMessage());
                        observable.onError(future.cause());
                    } else if (future.cause() instanceof ConnectTimeoutException) {
                        LOGGER.warn(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Socket connect took longer than specified timeout.");
                        AbstractEndpoint.this.transitionState(LifecycleState.DISCONNECTED);
                        observable.onError(future.cause());
                    } else if (future.cause() instanceof ConnectException) {
                        LOGGER.warn(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Could not connect to remote socket.");
                        AbstractEndpoint.this.transitionState(LifecycleState.DISCONNECTED);
                        observable.onError(future.cause());
                    } else if (AbstractEndpoint.this.isTransient) {
                        AbstractEndpoint.this.transitionState(LifecycleState.DISCONNECTED);
                        LOGGER.warn(future.cause().getMessage());
                        observable.onError(future.cause());
                    } else {
                        LOGGER.debug("Unhandled exception during channel connect, ignoring.", future.cause());
                    }
                    if (bootstrapping) {
                        AbstractEndpoint.this.connect(false).subscribe((Subscriber)new Subscriber<LifecycleState>(){

                            public void onCompleted() {
                            }

                            public void onNext(LifecycleState lifecycleState) {
                            }

                            public void onError(Throwable e) {
                                LOGGER.warn("Error during reconnect: ", e);
                            }
                        });
                    } else if (!AbstractEndpoint.this.disconnected && !AbstractEndpoint.this.isTransient) {
                        long delay = AbstractEndpoint.this.env.reconnectDelay().calculate(AbstractEndpoint.this.reconnectAttempt++);
                        TimeUnit delayUnit = AbstractEndpoint.this.env.reconnectDelay().unit();
                        LOGGER.warn(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Could not connect to endpoint, retrying with delay " + delay + " " + (Object)((Object)delayUnit) + ": ", future.cause());
                        if (AbstractEndpoint.this.responseBuffer != null) {
                            AbstractEndpoint.this.responseBuffer.publishEvent(ResponseHandler.RESPONSE_TRANSLATOR, SignalConfigReload.INSTANCE, null);
                        }
                        AbstractEndpoint.this.transitionState(LifecycleState.CONNECTING);
                        future.channel().eventLoop().schedule(new Runnable(){

                            @Override
                            public void run() {
                                if (!AbstractEndpoint.this.disconnected) {
                                    AbstractEndpoint.this.doConnect((Subject<LifecycleState, LifecycleState>)observable, bootstrapping);
                                } else {
                                    LOGGER.debug("{}Explicitly breaking retry loop because already disconnected.", (Object)AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this));
                                    AbstractEndpoint.this.disconnect();
                                }
                            }
                        }, delay, delayUnit);
                    } else {
                        LOGGER.debug("{}Not retrying because already disconnected or transient.", (Object)AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this));
                    }
                }
                observable.onNext(AbstractEndpoint.this.state());
                observable.onCompleted();
            }
        });
    }

    @Override
    public Observable<LifecycleState> disconnect() {
        this.disconnected = true;
        if (this.state() == LifecycleState.DISCONNECTED || this.state() == LifecycleState.DISCONNECTING) {
            return Observable.just(this.state());
        }
        if (this.state() == LifecycleState.CONNECTING) {
            this.transitionState(LifecycleState.DISCONNECTED);
            return Observable.just(this.state());
        }
        this.transitionState(LifecycleState.DISCONNECTING);
        final AsyncSubject observable = AsyncSubject.create();
        this.channel.disconnect().addListener(new ChannelFutureListener(){

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    LOGGER.debug(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Disconnected Endpoint.");
                } else {
                    LOGGER.warn(AbstractEndpoint.logIdent(AbstractEndpoint.this.channel, AbstractEndpoint.this) + "Received an error during disconnect.", future.cause());
                }
                AbstractEndpoint.this.transitionState(LifecycleState.DISCONNECTED);
                observable.onNext(AbstractEndpoint.this.state());
                observable.onCompleted();
                AbstractEndpoint.this.channel = null;
            }
        });
        return observable;
    }

    @Override
    public void send(CouchbaseRequest request) {
        if (this.state() == LifecycleState.CONNECTED) {
            if (request instanceof SignalFlush) {
                if (this.hasWritten && this.channel.isActive()) {
                    this.channel.flush();
                    this.hasWritten = false;
                }
            } else if (this.channel.isActive() && this.channel.isWritable()) {
                this.channel.write(request, this.channel.voidPromise());
                this.hasWritten = true;
            } else {
                this.responseBuffer.publishEvent(ResponseHandler.RESPONSE_TRANSLATOR, request, request.observable());
            }
        } else {
            if (request instanceof SignalFlush) {
                return;
            }
            request.observable().onError((Throwable)NOT_CONNECTED_EXCEPTION);
        }
    }

    public void notifyChannelInactive() {
        LOGGER.debug(AbstractEndpoint.logIdent(this.channel, this) + "Got notified from Channel as inactive.");
        if (this.isTransient) {
            return;
        }
        if (this.state() != LifecycleState.DISCONNECTED && this.state() != LifecycleState.DISCONNECTING) {
            this.signalConfigReload();
        }
        if (this.state() == LifecycleState.CONNECTED || this.state() == LifecycleState.CONNECTING) {
            this.transitionState(LifecycleState.DISCONNECTED);
            this.connect(false).subscribe((Subscriber)new Subscriber<LifecycleState>(){

                public void onCompleted() {
                }

                public void onNext(LifecycleState lifecycleState) {
                }

                public void onError(Throwable e) {
                    LOGGER.warn("Error during reconnect: ", e);
                }
            });
        }
    }

    public void signalConfigReload() {
        this.responseBuffer.publishEvent(ResponseHandler.RESPONSE_TRANSLATOR, SignalConfigReload.INSTANCE, null);
    }

    protected String bucket() {
        return this.bucket;
    }

    protected String password() {
        return this.password;
    }

    public CoreEnvironment environment() {
        return this.env;
    }

    public RingBuffer<ResponseEvent> responseBuffer() {
        return this.responseBuffer;
    }

    protected static String logIdent(Channel chan, Endpoint endpoint) {
        SocketAddress addr = chan != null ? chan.remoteAddress() : null;
        return "[" + addr + "][" + endpoint.getClass().getSimpleName() + "]: ";
    }

    static {
        NOT_CONNECTED_EXCEPTION.setStackTrace(new StackTraceElement[0]);
    }
}

