/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.channel.ObservableConnectionFactory;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.server.DefaultErrorHandler;
import io.reactivex.netty.server.ErrorHandler;
import io.reactivex.netty.server.ServerMetricsEvent;
import rx.Observable;
import rx.Subscriber;

public class ConnectionLifecycleHandler<I, O>
extends ChannelInboundHandlerAdapter {
    private final ConnectionHandler<I, O> connectionHandler;
    private final ErrorHandler errorHandler;
    private final ObservableConnectionFactory<I, O> connectionFactory;
    private final MetricEventsSubject<ServerMetricsEvent<?>> eventsSubject;
    private ObservableConnection<I, O> connection;

    public ConnectionLifecycleHandler(ConnectionHandler<I, O> connectionHandler, ObservableConnectionFactory<I, O> connectionFactory, ErrorHandler errorHandler, MetricEventsSubject<ServerMetricsEvent<?>> eventsSubject) {
        this.connectionHandler = connectionHandler;
        this.connectionFactory = connectionFactory;
        this.eventsSubject = eventsSubject;
        this.errorHandler = null == errorHandler ? new DefaultErrorHandler() : errorHandler;
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        if (null != this.connection) {
            this.connection.close();
        }
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (null == ctx.channel().pipeline().get(SslHandler.class)) {
            long startTimeMillis = Clock.newStartTimeMillis();
            this.connection = this.connectionFactory.newConnection(ctx.channel());
            this.eventsSubject.onEvent(ServerMetricsEvent.NEW_CLIENT_CONNECTED);
            super.channelActive(ctx);
            this.handleConnection(startTimeMillis);
        } else {
            super.channelActive(ctx);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
        if (evt instanceof SslHandshakeCompletionEvent) {
            long startTimeMillis = Clock.newStartTimeMillis();
            this.connection = this.connectionFactory.newConnection(ctx.channel());
            this.handleConnection(startTimeMillis);
        }
    }

    private void handleConnection(final long startTimeMillis) {
        Observable<Void> handledObservable;
        try {
            this.eventsSubject.onEvent(ServerMetricsEvent.CONNECTION_HANDLING_START, Clock.onEndMillis(startTimeMillis));
            handledObservable = this.connectionHandler.handle(this.connection);
        }
        catch (Throwable throwable) {
            handledObservable = Observable.error(throwable);
        }
        if (null == handledObservable) {
            handledObservable = Observable.empty();
        }
        handledObservable.subscribe(new Subscriber<Void>(){

            @Override
            public void onCompleted() {
                ConnectionLifecycleHandler.this.eventsSubject.onEvent(ServerMetricsEvent.CONNECTION_HANDLING_SUCCESS, Clock.onEndMillis(startTimeMillis));
                ConnectionLifecycleHandler.this.connection.close();
            }

            @Override
            public void onError(Throwable e) {
                ConnectionLifecycleHandler.this.invokeErrorHandler(e);
                ConnectionLifecycleHandler.this.eventsSubject.onEvent(ServerMetricsEvent.CONNECTION_HANDLING_FAILED, Clock.onEndMillis(startTimeMillis), e);
                ConnectionLifecycleHandler.this.connection.close();
            }

            @Override
            public void onNext(Void aVoid) {
            }
        });
    }

    private void invokeErrorHandler(Throwable throwable) {
        try {
            this.errorHandler.handleError(throwable);
        }
        catch (Exception e) {
            System.err.println("Error while invoking error handler. Error: " + e.getMessage());
            e.printStackTrace(System.err);
        }
    }
}

