package io.moquette.server.netty;

import io.moquette.BrokerConstants;
import io.moquette.server.ServerAcceptor;
import io.moquette.server.config.IConfig;
import io.moquette.server.netty.metrics.BytesMetrics;
import io.moquette.server.netty.metrics.BytesMetricsCollector;
import io.moquette.server.netty.metrics.BytesMetricsHandler;
import io.moquette.server.netty.metrics.DropWizardMetricsHandler;
import io.moquette.server.netty.metrics.MQTTMessageLogger;
import io.moquette.server.netty.metrics.MessageMetrics;
import io.moquette.server.netty.metrics.MessageMetricsCollector;
import io.moquette.server.netty.metrics.MessageMetricsHandler;
import io.moquette.spi.impl.ProtocolProcessor;
import io.moquette.spi.security.ISslContextCreator;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/moquette/server/netty/NettyAcceptor.class */
public class NettyAcceptor implements ServerAcceptor {
    private static final String MQTT_SUBPROTOCOL_CSV_LIST = "mqtt, mqttv3.1, mqttv3.1.1";
    private static final Logger LOG = LoggerFactory.getLogger(NettyAcceptor.class);
    EventLoopGroup m_bossGroup;
    EventLoopGroup m_workerGroup;
    BytesMetricsCollector m_bytesMetricsCollector = new BytesMetricsCollector();
    MessageMetricsCollector m_metricsCollector = new MessageMetricsCollector();
    private Optional<? extends ChannelInboundHandler> metrics;
    private Optional<? extends ChannelInboundHandler> errorsCather;
    private int nettySoBacklog;
    private boolean nettySoReuseaddr;
    private boolean nettyTcpNodelay;
    private boolean nettySoKeepalive;
    private int nettyChannelTimeoutSeconds;
    private int maxBytesInMessage;
    private Class<? extends ServerSocketChannel> channelClass;

    /* loaded from: input_file:io/moquette/server/netty/NettyAcceptor$ByteBufToWebSocketFrameEncoder.class */
    static class ByteBufToWebSocketFrameEncoder extends MessageToMessageEncoder<ByteBuf> {
        ByteBufToWebSocketFrameEncoder() {
        }

        protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
            BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame();
            binaryWebSocketFrame.content().writeBytes(byteBuf);
            list.add(binaryWebSocketFrame);
        }

        protected /* bridge */ /* synthetic */ void encode(ChannelHandlerContext channelHandlerContext, Object obj, List list) throws Exception {
            encode(channelHandlerContext, (ByteBuf) obj, (List<Object>) list);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/moquette/server/netty/NettyAcceptor$PipelineInitializer.class */
    public static abstract class PipelineInitializer {
        PipelineInitializer() {
        }

        abstract void init(ChannelPipeline channelPipeline) throws Exception;
    }

    /* loaded from: input_file:io/moquette/server/netty/NettyAcceptor$WebSocketFrameToByteBufDecoder.class */
    static class WebSocketFrameToByteBufDecoder extends MessageToMessageDecoder<BinaryWebSocketFrame> {
        WebSocketFrameToByteBufDecoder() {
        }

        protected void decode(ChannelHandlerContext channelHandlerContext, BinaryWebSocketFrame binaryWebSocketFrame, List<Object> list) throws Exception {
            ByteBuf content = binaryWebSocketFrame.content();
            content.retain();
            list.add(content);
        }

        protected /* bridge */ /* synthetic */ void decode(ChannelHandlerContext channelHandlerContext, Object obj, List list) throws Exception {
            decode(channelHandlerContext, (BinaryWebSocketFrame) obj, (List<Object>) list);
        }
    }

    @Override // io.moquette.server.ServerAcceptor
    public void initialize(ProtocolProcessor protocolProcessor, IConfig iConfig, ISslContextCreator iSslContextCreator) {
        LOG.debug("Initializing Netty acceptor");
        this.nettySoBacklog = iConfig.intProp(BrokerConstants.NETTY_SO_BACKLOG_PROPERTY_NAME, 128);
        this.nettySoReuseaddr = iConfig.boolProp(BrokerConstants.NETTY_SO_REUSEADDR_PROPERTY_NAME, true);
        this.nettyTcpNodelay = iConfig.boolProp(BrokerConstants.NETTY_TCP_NODELAY_PROPERTY_NAME, true);
        this.nettySoKeepalive = iConfig.boolProp(BrokerConstants.NETTY_SO_KEEPALIVE_PROPERTY_NAME, true);
        this.nettyChannelTimeoutSeconds = iConfig.intProp(BrokerConstants.NETTY_CHANNEL_TIMEOUT_SECONDS_PROPERTY_NAME, 10);
        this.maxBytesInMessage = iConfig.intProp(BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME, BrokerConstants.DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE);
        if (iConfig.boolProp(BrokerConstants.NETTY_EPOLL_PROPERTY_NAME, false)) {
            LOG.info("Netty is using Epoll");
            this.m_bossGroup = new EpollEventLoopGroup();
            this.m_workerGroup = new EpollEventLoopGroup();
            this.channelClass = EpollServerSocketChannel.class;
        } else {
            LOG.info("Netty is using NIO");
            this.m_bossGroup = new NioEventLoopGroup();
            this.m_workerGroup = new NioEventLoopGroup();
            this.channelClass = NioServerSocketChannel.class;
        }
        NettyMQTTHandler nettyMQTTHandler = new NettyMQTTHandler(protocolProcessor);
        if (iConfig.boolProp(BrokerConstants.METRICS_ENABLE_PROPERTY_NAME, false)) {
            DropWizardMetricsHandler dropWizardMetricsHandler = new DropWizardMetricsHandler();
            dropWizardMetricsHandler.init(iConfig);
            this.metrics = Optional.of(dropWizardMetricsHandler);
        } else {
            this.metrics = Optional.empty();
        }
        if (iConfig.boolProp(BrokerConstants.BUGSNAG_ENABLE_PROPERTY_NAME, false)) {
            BugSnagErrorsHandler bugSnagErrorsHandler = new BugSnagErrorsHandler();
            bugSnagErrorsHandler.init(iConfig);
            this.errorsCather = Optional.of(bugSnagErrorsHandler);
        } else {
            this.errorsCather = Optional.empty();
        }
        initializePlainTCPTransport(nettyMQTTHandler, iConfig);
        initializeWebSocketTransport(nettyMQTTHandler, iConfig);
        String property = iConfig.getProperty(BrokerConstants.SSL_PORT_PROPERTY_NAME);
        String property2 = iConfig.getProperty(BrokerConstants.WSS_PORT_PROPERTY_NAME);
        if (property == null && property2 == null) {
            return;
        }
        SSLContext initSSLContext = iSslContextCreator.initSSLContext();
        if (initSSLContext == null) {
            LOG.error("Can't initialize SSLHandler layer! Exiting, check your configuration of jks");
        } else {
            initializeSSLTCPTransport(nettyMQTTHandler, iConfig, initSSLContext);
            initializeWSSTransport(nettyMQTTHandler, iConfig, initSSLContext);
        }
    }

    private void initFactory(String str, int i, String str2, final PipelineInitializer pipelineInitializer) {
        LOG.debug("Initializing server. Protocol={}", str2);
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(this.m_bossGroup, this.m_workerGroup).channel(this.channelClass).childHandler(new ChannelInitializer<SocketChannel>() { // from class: io.moquette.server.netty.NettyAcceptor.1
            public void initChannel(SocketChannel socketChannel) throws Exception {
                try {
                    pipelineInitializer.init(socketChannel.pipeline());
                } catch (Throwable th) {
                    NettyAcceptor.LOG.error("Severe error during pipeline creation", th);
                    throw th;
                }
            }
        }).option(ChannelOption.SO_BACKLOG, Integer.valueOf(this.nettySoBacklog)).option(ChannelOption.SO_REUSEADDR, Boolean.valueOf(this.nettySoReuseaddr)).childOption(ChannelOption.TCP_NODELAY, Boolean.valueOf(this.nettyTcpNodelay)).childOption(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(this.nettySoKeepalive));
        try {
            LOG.debug("Binding server. host={}, port={}", str, Integer.valueOf(i));
            ChannelFuture bind = serverBootstrap.bind(str, i);
            LOG.info("Server bound to host={}, port={}, protocol={}", new Object[]{str, Integer.valueOf(i), str2});
            bind.sync().addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        } catch (InterruptedException e) {
            LOG.error("An interruptedException was caught while initializing server. Protocol={}", str2, e);
        }
    }

    private void initializePlainTCPTransport(final NettyMQTTHandler nettyMQTTHandler, IConfig iConfig) {
        LOG.debug("Configuring TCP MQTT transport");
        final MoquetteIdleTimeoutHandler moquetteIdleTimeoutHandler = new MoquetteIdleTimeoutHandler();
        String property = iConfig.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
        String property2 = iConfig.getProperty(BrokerConstants.PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
        if (BrokerConstants.DISABLED_PORT_BIND.equals(property2)) {
            LOG.info("Property {} has been set to {}. TCP MQTT will be disabled", BrokerConstants.PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
        } else {
            initFactory(property, Integer.parseInt(property2), "TCP MQTT", new PipelineInitializer() { // from class: io.moquette.server.netty.NettyAcceptor.2
                @Override // io.moquette.server.netty.NettyAcceptor.PipelineInitializer
                void init(ChannelPipeline channelPipeline) {
                    channelPipeline.addFirst("idleStateHandler", new IdleStateHandler(NettyAcceptor.this.nettyChannelTimeoutSeconds, 0, 0));
                    channelPipeline.addAfter("idleStateHandler", "idleEventHandler", moquetteIdleTimeoutHandler);
                    if (NettyAcceptor.this.errorsCather.isPresent()) {
                        channelPipeline.addLast("bugsnagCatcher", (ChannelHandler) NettyAcceptor.this.errorsCather.get());
                    }
                    channelPipeline.addFirst("bytemetrics", new BytesMetricsHandler(NettyAcceptor.this.m_bytesMetricsCollector));
                    channelPipeline.addLast("decoder", new MqttDecoder(NettyAcceptor.this.maxBytesInMessage));
                    channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
                    channelPipeline.addLast("metrics", new MessageMetricsHandler(NettyAcceptor.this.m_metricsCollector));
                    channelPipeline.addLast("messageLogger", new MQTTMessageLogger());
                    if (NettyAcceptor.this.metrics.isPresent()) {
                        channelPipeline.addLast("wizardMetrics", (ChannelHandler) NettyAcceptor.this.metrics.get());
                    }
                    channelPipeline.addLast("handler", nettyMQTTHandler);
                }
            });
        }
    }

    private void initializeWebSocketTransport(final NettyMQTTHandler nettyMQTTHandler, IConfig iConfig) {
        LOG.debug("Configuring Websocket MQTT transport");
        String property = iConfig.getProperty(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
        if (BrokerConstants.DISABLED_PORT_BIND.equals(property)) {
            LOG.info("Property {} has been setted to {}. Websocket MQTT will be disabled", BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
            return;
        }
        int parseInt = Integer.parseInt(property);
        final MoquetteIdleTimeoutHandler moquetteIdleTimeoutHandler = new MoquetteIdleTimeoutHandler();
        initFactory(iConfig.getProperty(BrokerConstants.HOST_PROPERTY_NAME), parseInt, "Websocket MQTT", new PipelineInitializer() { // from class: io.moquette.server.netty.NettyAcceptor.3
            @Override // io.moquette.server.netty.NettyAcceptor.PipelineInitializer
            void init(ChannelPipeline channelPipeline) {
                channelPipeline.addLast(new ChannelHandler[]{new HttpServerCodec()});
                channelPipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                channelPipeline.addLast("webSocketHandler", new WebSocketServerProtocolHandler("/mqtt", NettyAcceptor.MQTT_SUBPROTOCOL_CSV_LIST));
                channelPipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
                channelPipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
                channelPipeline.addFirst("idleStateHandler", new IdleStateHandler(NettyAcceptor.this.nettyChannelTimeoutSeconds, 0, 0));
                channelPipeline.addAfter("idleStateHandler", "idleEventHandler", moquetteIdleTimeoutHandler);
                channelPipeline.addFirst("bytemetrics", new BytesMetricsHandler(NettyAcceptor.this.m_bytesMetricsCollector));
                channelPipeline.addLast("decoder", new MqttDecoder(NettyAcceptor.this.maxBytesInMessage));
                channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
                channelPipeline.addLast("metrics", new MessageMetricsHandler(NettyAcceptor.this.m_metricsCollector));
                channelPipeline.addLast("messageLogger", new MQTTMessageLogger());
                channelPipeline.addLast("handler", nettyMQTTHandler);
            }
        });
    }

    private void initializeSSLTCPTransport(final NettyMQTTHandler nettyMQTTHandler, IConfig iConfig, final SSLContext sSLContext) {
        LOG.debug("Configuring SSL MQTT transport");
        String property = iConfig.getProperty(BrokerConstants.SSL_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
        if (BrokerConstants.DISABLED_PORT_BIND.equals(property)) {
            LOG.info("Property {} has been set to {}. SSL MQTT will be disabled", BrokerConstants.SSL_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
            return;
        }
        int parseInt = Integer.parseInt(property);
        LOG.debug("Starting SSL on port {}", Integer.valueOf(parseInt));
        final MoquetteIdleTimeoutHandler moquetteIdleTimeoutHandler = new MoquetteIdleTimeoutHandler();
        String property2 = iConfig.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
        final boolean booleanValue = Boolean.valueOf(iConfig.getProperty(BrokerConstants.NEED_CLIENT_AUTH, "false")).booleanValue();
        initFactory(property2, parseInt, "SSL MQTT", new PipelineInitializer() { // from class: io.moquette.server.netty.NettyAcceptor.4
            @Override // io.moquette.server.netty.NettyAcceptor.PipelineInitializer
            void init(ChannelPipeline channelPipeline) throws Exception {
                channelPipeline.addLast("ssl", NettyAcceptor.this.createSslHandler(sSLContext, booleanValue));
                channelPipeline.addFirst("idleStateHandler", new IdleStateHandler(NettyAcceptor.this.nettyChannelTimeoutSeconds, 0, 0));
                channelPipeline.addAfter("idleStateHandler", "idleEventHandler", moquetteIdleTimeoutHandler);
                channelPipeline.addFirst("bytemetrics", new BytesMetricsHandler(NettyAcceptor.this.m_bytesMetricsCollector));
                channelPipeline.addLast("decoder", new MqttDecoder(NettyAcceptor.this.maxBytesInMessage));
                channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
                channelPipeline.addLast("metrics", new MessageMetricsHandler(NettyAcceptor.this.m_metricsCollector));
                channelPipeline.addLast("messageLogger", new MQTTMessageLogger());
                channelPipeline.addLast("handler", nettyMQTTHandler);
            }
        });
    }

    private void initializeWSSTransport(final NettyMQTTHandler nettyMQTTHandler, IConfig iConfig, final SSLContext sSLContext) {
        LOG.debug("Configuring secure websocket MQTT transport");
        String property = iConfig.getProperty(BrokerConstants.WSS_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
        if (BrokerConstants.DISABLED_PORT_BIND.equals(property)) {
            LOG.info("Property {} has been set to {}. Secure websocket MQTT will be disabled", BrokerConstants.WSS_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
            return;
        }
        int parseInt = Integer.parseInt(property);
        final MoquetteIdleTimeoutHandler moquetteIdleTimeoutHandler = new MoquetteIdleTimeoutHandler();
        String property2 = iConfig.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
        final boolean booleanValue = Boolean.valueOf(iConfig.getProperty(BrokerConstants.NEED_CLIENT_AUTH, "false")).booleanValue();
        initFactory(property2, parseInt, "Secure websocket", new PipelineInitializer() { // from class: io.moquette.server.netty.NettyAcceptor.5
            @Override // io.moquette.server.netty.NettyAcceptor.PipelineInitializer
            void init(ChannelPipeline channelPipeline) throws Exception {
                channelPipeline.addLast("ssl", NettyAcceptor.this.createSslHandler(sSLContext, booleanValue));
                channelPipeline.addLast("httpEncoder", new HttpResponseEncoder());
                channelPipeline.addLast("httpDecoder", new HttpRequestDecoder());
                channelPipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                channelPipeline.addLast("webSocketHandler", new WebSocketServerProtocolHandler("/mqtt", NettyAcceptor.MQTT_SUBPROTOCOL_CSV_LIST));
                channelPipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
                channelPipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
                channelPipeline.addFirst("idleStateHandler", new IdleStateHandler(NettyAcceptor.this.nettyChannelTimeoutSeconds, 0, 0));
                channelPipeline.addAfter("idleStateHandler", "idleEventHandler", moquetteIdleTimeoutHandler);
                channelPipeline.addFirst("bytemetrics", new BytesMetricsHandler(NettyAcceptor.this.m_bytesMetricsCollector));
                channelPipeline.addLast("decoder", new MqttDecoder(NettyAcceptor.this.maxBytesInMessage));
                channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
                channelPipeline.addLast("metrics", new MessageMetricsHandler(NettyAcceptor.this.m_metricsCollector));
                channelPipeline.addLast("messageLogger", new MQTTMessageLogger());
                channelPipeline.addLast("handler", nettyMQTTHandler);
            }
        });
    }

    @Override // io.moquette.server.ServerAcceptor
    public void close() {
        LOG.debug("Closing Netty acceptor...");
        if (this.m_workerGroup == null || this.m_bossGroup == null) {
            LOG.error("Netty acceptor is not initialized");
            throw new IllegalStateException("Invoked close on an Acceptor that wasn't initialized");
        }
        Future shutdownGracefully = this.m_workerGroup.shutdownGracefully();
        Future shutdownGracefully2 = this.m_bossGroup.shutdownGracefully();
        LOG.info("Waiting for worker and boss event loop groups to terminate...");
        try {
            shutdownGracefully.await(10L, TimeUnit.SECONDS);
            shutdownGracefully2.await(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOG.warn("An InterruptedException was caught while waiting for event loops to terminate...");
        }
        if (!this.m_workerGroup.isTerminated()) {
            LOG.warn("Forcing shutdown of worker event loop...");
            this.m_workerGroup.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS);
        }
        if (!this.m_bossGroup.isTerminated()) {
            LOG.warn("Forcing shutdown of boss event loop...");
            this.m_bossGroup.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS);
        }
        MessageMetrics computeMetrics = this.m_metricsCollector.computeMetrics();
        BytesMetrics computeMetrics2 = this.m_bytesMetricsCollector.computeMetrics();
        LOG.info("Metrics messages[read={}, write={}] bytes[read={}, write={}]", new Object[]{Long.valueOf(computeMetrics.messagesRead()), Long.valueOf(computeMetrics.messagesWrote()), Long.valueOf(computeMetrics2.readBytes()), Long.valueOf(computeMetrics2.wroteBytes())});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChannelHandler createSslHandler(SSLContext sSLContext, boolean z) {
        SSLEngine createSSLEngine = sSLContext.createSSLEngine();
        createSSLEngine.setUseClientMode(false);
        if (z) {
            createSSLEngine.setNeedClientAuth(true);
        }
        return new SslHandler(createSSLEngine);
    }
}
