/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.protocol.tcp.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.reactivex.netty.channel.AbstractConnectionToChannelBridge;
import io.reactivex.netty.channel.ChannelSubscriberEvent;
import io.reactivex.netty.channel.ConnectionImpl;
import io.reactivex.netty.channel.EmitConnectionEvent;
import io.reactivex.netty.events.Clock;
import io.reactivex.netty.events.EventAttributeKeys;
import io.reactivex.netty.protocol.tcp.server.ConnectionHandler;
import io.reactivex.netty.protocol.tcp.server.events.TcpServerEventPublisher;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;

public class TcpServerConnectionToChannelBridge<R, W>
extends AbstractConnectionToChannelBridge<R, W> {
    private static final Logger logger = LoggerFactory.getLogger(TcpServerConnectionToChannelBridge.class);
    private static final String HANDLER_NAME = "server-conn-channel-bridge";
    private final ConnectionHandler<R, W> connectionHandler;
    private final TcpServerEventPublisher eventPublisher;
    private final boolean isSecure;
    private final ChannelSubscriberEvent<R, W> channelSubscriberEvent;

    private TcpServerConnectionToChannelBridge(ConnectionHandler<R, W> connectionHandler, TcpServerEventPublisher eventPublisher, boolean isSecure) {
        super(HANDLER_NAME, eventPublisher, eventPublisher);
        this.connectionHandler = connectionHandler;
        this.eventPublisher = eventPublisher;
        this.isSecure = isSecure;
        this.channelSubscriberEvent = new ChannelSubscriberEvent(new NewChannelSubscriber());
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        this.userEventTriggered(ctx, this.channelSubscriberEvent);
        if (!this.isSecure) {
            this.userEventTriggered(ctx, EmitConnectionEvent.INSTANCE);
        }
        super.channelRegistered(ctx);
    }

    public static <R, W> TcpServerConnectionToChannelBridge<R, W> addToPipeline(ChannelPipeline pipeline, ConnectionHandler<R, W> connectionHandler, TcpServerEventPublisher eventPublisher, boolean isSecure) {
        TcpServerConnectionToChannelBridge<R, W> toAdd = new TcpServerConnectionToChannelBridge<R, W>(connectionHandler, eventPublisher, isSecure);
        pipeline.addLast(HANDLER_NAME, toAdd);
        return toAdd;
    }

    private final class NewChannelSubscriber
    extends Subscriber<Channel> {
        private NewChannelSubscriber() {
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
            logger.error("Error while listening for new client connections.", e);
        }

        @Override
        public void onNext(Channel channel) {
            Observable<Void> handledObservable;
            long startTimeNanos;
            channel.attr(EventAttributeKeys.EVENT_PUBLISHER).set(TcpServerConnectionToChannelBridge.this.eventPublisher);
            channel.attr(EventAttributeKeys.CONNECTION_EVENT_LISTENER).set(TcpServerConnectionToChannelBridge.this.eventPublisher);
            final ConnectionImpl connection = ConnectionImpl.fromChannel(channel);
            long l = startTimeNanos = TcpServerConnectionToChannelBridge.this.eventPublisher.publishingEnabled() ? Clock.newStartTimeNanos() : -1L;
            if (TcpServerConnectionToChannelBridge.this.eventPublisher.publishingEnabled()) {
                TcpServerConnectionToChannelBridge.this.eventPublisher.onNewClientConnected();
            }
            try {
                if (TcpServerConnectionToChannelBridge.this.eventPublisher.publishingEnabled()) {
                    TcpServerConnectionToChannelBridge.this.eventPublisher.onConnectionHandlingStart(Clock.onEndNanos(startTimeNanos), TimeUnit.NANOSECONDS);
                }
                handledObservable = TcpServerConnectionToChannelBridge.this.connectionHandler.handle(connection);
            }
            catch (Throwable throwable) {
                handledObservable = Observable.error(throwable);
            }
            if (null == handledObservable) {
                logger.error("Connection handler returned null.");
                handledObservable = Observable.empty();
            }
            handledObservable.onErrorResumeNext(new Func1<Throwable, Observable<? extends Void>>(){

                @Override
                public Observable<? extends Void> call(Throwable throwable) {
                    if (throwable instanceof ClosedChannelException) {
                        return Observable.empty();
                    }
                    if (TcpServerConnectionToChannelBridge.this.eventPublisher.publishingEnabled()) {
                        TcpServerConnectionToChannelBridge.this.eventPublisher.onConnectionHandlingFailed(Clock.onEndNanos(startTimeNanos), TimeUnit.NANOSECONDS, throwable);
                    }
                    logger.error("Error processing connection.", throwable);
                    return connection.close();
                }
            }).ambWith(connection.closeListener()).concatWith(connection.close()).doOnCompleted(new Action0(){

                @Override
                public void call() {
                    if (TcpServerConnectionToChannelBridge.this.eventPublisher.publishingEnabled()) {
                        TcpServerConnectionToChannelBridge.this.eventPublisher.onConnectionHandlingSuccess(Clock.onEndNanos(startTimeNanos), TimeUnit.NANOSECONDS);
                    }
                }
            }).subscribe();
        }
    }
}

