/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.transport;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.Version;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.auth.AuthenticatedUser;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaChangeListener;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.service.IEndpointLifecycleSubscriber;
import org.apache.cassandra.service.NativeTransportService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.CBUtil;
import org.apache.cassandra.transport.ClientStat;
import org.apache.cassandra.transport.ConnectedClient;
import org.apache.cassandra.transport.Connection;
import org.apache.cassandra.transport.ConnectionLimitHandler;
import org.apache.cassandra.transport.Event;
import org.apache.cassandra.transport.Frame;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.ProtocolVersionTracker;
import org.apache.cassandra.transport.ServerConnection;
import org.apache.cassandra.transport.messages.EventMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Server
implements CassandraDaemon.Server {
    private static final Logger logger;
    private static final boolean useEpoll;
    private final ConnectionTracker connectionTracker = new ConnectionTracker();
    private final Connection.Factory connectionFactory = new Connection.Factory(){

        @Override
        public Connection newConnection(Channel channel, ProtocolVersion version) {
            return new ServerConnection(channel, version, Server.this.connectionTracker);
        }
    };
    public final InetSocketAddress socket;
    public boolean useSSL = false;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private EventLoopGroup workerGroup;
    private static final ResourceLimits.Concurrent globalRequestPayloadInFlight;

    private Server(Builder builder) {
        this.socket = builder.getSocket();
        this.useSSL = builder.useSSL;
        this.workerGroup = builder.workerGroup != null ? builder.workerGroup : (useEpoll ? new EpollEventLoopGroup() : new NioEventLoopGroup());
        EventNotifier notifier = new EventNotifier(this);
        StorageService.instance.register(notifier);
        Schema.instance.registerListener(notifier);
    }

    @Override
    public void stop() {
        if (this.isRunning.compareAndSet(true, false)) {
            this.close();
        }
    }

    @Override
    public boolean isRunning() {
        return this.isRunning.get();
    }

    @Override
    public synchronized void start() {
        if (this.isRunning()) {
            return;
        }
        ServerBootstrap bootstrap = ((ServerBootstrap)new ServerBootstrap().channel(useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)).childOption(ChannelOption.TCP_NODELAY, (Object)true).childOption(ChannelOption.SO_LINGER, (Object)0).childOption(ChannelOption.SO_KEEPALIVE, (Object)DatabaseDescriptor.getRpcKeepAlive()).childOption(ChannelOption.ALLOCATOR, (Object)CBUtil.allocator).childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, (Object)32768).childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, (Object)8192);
        if (this.workerGroup != null) {
            bootstrap = bootstrap.group(this.workerGroup);
        }
        if (this.useSSL) {
            EncryptionOptions clientEnc = DatabaseDescriptor.getNativeProtocolEncryptionOptions();
            if (clientEnc.optional) {
                logger.info("Enabling optionally encrypted CQL connections between client and server");
                bootstrap.childHandler((ChannelHandler)new OptionalSecureInitializer(this, clientEnc));
            } else {
                logger.info("Enabling encrypted CQL connections between client and server");
                bootstrap.childHandler((ChannelHandler)new SecureInitializer(this, clientEnc));
            }
        } else {
            bootstrap.childHandler((ChannelHandler)new Initializer(this));
        }
        logger.info("Using Netty Version: {}", Version.identify().entrySet());
        logger.info("Starting listening for CQL clients on {} ({})...", (Object)this.socket, (Object)(this.useSSL ? "encrypted" : "unencrypted"));
        ChannelFuture bindFuture = bootstrap.bind((SocketAddress)this.socket);
        if (!bindFuture.awaitUninterruptibly().isSuccess()) {
            throw new IllegalStateException(String.format("Failed to bind port %d on %s.", this.socket.getPort(), this.socket.getAddress().getHostAddress()), bindFuture.cause());
        }
        this.connectionTracker.allChannels.add((Object)bindFuture.channel());
        this.isRunning.set(true);
    }

    public int countConnectedClients() {
        return this.connectionTracker.countConnectedClients();
    }

    public Map<String, Integer> countConnectedClientsByUser() {
        return this.connectionTracker.countConnectedClientsByUser();
    }

    public List<ConnectedClient> getConnectedClients() {
        ArrayList<ConnectedClient> result = new ArrayList<ConnectedClient>();
        for (Channel c : this.connectionTracker.allChannels) {
            Connection conn = (Connection)c.attr(Connection.attributeKey).get();
            if (!(conn instanceof ServerConnection)) continue;
            result.add(new ConnectedClient((ServerConnection)conn));
        }
        return result;
    }

    public List<ClientStat> recentClientStats() {
        return this.connectionTracker.protocolVersionTracker.getAll();
    }

    @Override
    public void clearConnectionHistory() {
        this.connectionTracker.protocolVersionTracker.clear();
    }

    private void close() {
        this.connectionTracker.closeAll();
        logger.info("Stop listening for CQL clients");
    }

    static {
        InternalLoggerFactory.setDefaultFactory((InternalLoggerFactory)new Slf4JLoggerFactory());
        logger = LoggerFactory.getLogger(Server.class);
        useEpoll = NativeTransportService.useEpoll();
        globalRequestPayloadInFlight = new ResourceLimits.Concurrent(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
    }

    private static class EventNotifier
    extends SchemaChangeListener
    implements IEndpointLifecycleSubscriber {
        private final Server server;
        private final Map<InetAddressAndPort, LatestEvent> latestEvents = new ConcurrentHashMap<InetAddressAndPort, LatestEvent>();
        private final Set<InetAddressAndPort> endpointsPendingJoinedNotification = ConcurrentHashMap.newKeySet();

        private EventNotifier(Server server) {
            this.server = server;
        }

        private InetAddressAndPort getNativeAddress(InetAddressAndPort endpoint) {
            try {
                return InetAddressAndPort.getByName(StorageService.instance.getNativeaddress(endpoint, true));
            }
            catch (UnknownHostException e) {
                logger.error("Problem retrieving RPC address for {}", (Object)endpoint, (Object)e);
                return InetAddressAndPort.getByAddressOverrideDefaults(endpoint.address, DatabaseDescriptor.getNativeTransportPort());
            }
        }

        private void send(InetAddressAndPort endpoint, Event.NodeEvent event) {
            if (logger.isTraceEnabled()) {
                logger.trace("Sending event for endpoint {}, rpc address {}", (Object)endpoint, (Object)event.nodeAddress());
            }
            if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && event.nodeAddress().equals(FBUtilities.getJustBroadcastNativeAddress())) {
                return;
            }
            this.send(event);
        }

        private void send(Event event) {
            this.server.connectionTracker.send(event);
        }

        @Override
        public void onJoinCluster(InetAddressAndPort endpoint) {
            if (!StorageService.instance.isRpcReady(endpoint)) {
                this.endpointsPendingJoinedNotification.add(endpoint);
            } else {
                this.onTopologyChange(endpoint, Event.TopologyChange.newNode(this.getNativeAddress(endpoint)));
            }
        }

        @Override
        public void onLeaveCluster(InetAddressAndPort endpoint) {
            this.onTopologyChange(endpoint, Event.TopologyChange.removedNode(this.getNativeAddress(endpoint)));
        }

        @Override
        public void onMove(InetAddressAndPort endpoint) {
            this.onTopologyChange(endpoint, Event.TopologyChange.movedNode(this.getNativeAddress(endpoint)));
        }

        @Override
        public void onUp(InetAddressAndPort endpoint) {
            if (this.endpointsPendingJoinedNotification.remove(endpoint)) {
                this.onJoinCluster(endpoint);
            }
            this.onStatusChange(endpoint, Event.StatusChange.nodeUp(this.getNativeAddress(endpoint)));
        }

        @Override
        public void onDown(InetAddressAndPort endpoint) {
            this.onStatusChange(endpoint, Event.StatusChange.nodeDown(this.getNativeAddress(endpoint)));
        }

        private void onTopologyChange(InetAddressAndPort endpoint, Event.TopologyChange event) {
            LatestEvent ret;
            LatestEvent prev;
            if (logger.isTraceEnabled()) {
                logger.trace("Topology changed event : {}, {}", (Object)endpoint, (Object)event.change);
            }
            if (((prev = this.latestEvents.get(endpoint)) == null || prev.topology != event.change) && (ret = this.latestEvents.put(endpoint, LatestEvent.forTopologyChange(event.change, prev))) == prev) {
                this.send(endpoint, event);
            }
        }

        private void onStatusChange(InetAddressAndPort endpoint, Event.StatusChange event) {
            LatestEvent ret;
            LatestEvent prev;
            if (logger.isTraceEnabled()) {
                logger.trace("Status changed event : {}, {}", (Object)endpoint, (Object)event.status);
            }
            if (((prev = this.latestEvents.get(endpoint)) == null || prev.status != event.status) && (ret = this.latestEvents.put(endpoint, LatestEvent.forStatusChange(event.status, null))) == prev) {
                this.send(endpoint, event);
            }
        }

        @Override
        public void onCreateKeyspace(String ksName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName));
        }

        @Override
        public void onCreateTable(String ksName, String cfName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
        }

        @Override
        public void onCreateType(String ksName, String typeName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
        }

        @Override
        public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION, ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
        }

        @Override
        public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE, ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
        }

        @Override
        public void onAlterKeyspace(String ksName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
        }

        @Override
        public void onAlterTable(String ksName, String cfName, boolean affectsStatements) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
        }

        @Override
        public void onAlterType(String ksName, String typeName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
        }

        @Override
        public void onAlterFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION, ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
        }

        @Override
        public void onAlterAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE, ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
        }

        @Override
        public void onDropKeyspace(String ksName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
        }

        @Override
        public void onDropTable(String ksName, String cfName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName));
        }

        @Override
        public void onDropType(String ksName, String typeName) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
        }

        @Override
        public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION, ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
        }

        @Override
        public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) {
            this.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE, ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
        }
    }

    private static class LatestEvent {
        public final Event.StatusChange.Status status;
        public final Event.TopologyChange.Change topology;

        private LatestEvent(Event.StatusChange.Status status, Event.TopologyChange.Change topology) {
            this.status = status;
            this.topology = topology;
        }

        public String toString() {
            return String.format("Status %s, Topology %s", new Object[]{this.status, this.topology});
        }

        public static LatestEvent forStatusChange(Event.StatusChange.Status status, LatestEvent prev) {
            return new LatestEvent(status, prev == null ? null : prev.topology);
        }

        public static LatestEvent forTopologyChange(Event.TopologyChange.Change change, LatestEvent prev) {
            return new LatestEvent(prev == null ? null : prev.status, change);
        }
    }

    private static class SecureInitializer
    extends AbstractSecureIntializer {
        public SecureInitializer(Server server, EncryptionOptions encryptionOptions) {
            super(server, encryptionOptions);
        }

        @Override
        protected void initChannel(Channel channel) throws Exception {
            SslHandler sslHandler = this.createSslHandler(channel.alloc());
            super.initChannel(channel);
            channel.pipeline().addFirst("ssl", (ChannelHandler)sslHandler);
        }
    }

    private static class OptionalSecureInitializer
    extends AbstractSecureIntializer {
        public OptionalSecureInitializer(Server server, EncryptionOptions encryptionOptions) {
            super(server, encryptionOptions);
        }

        @Override
        protected void initChannel(final Channel channel) throws Exception {
            super.initChannel(channel);
            channel.pipeline().addFirst("sslDetectionHandler", (ChannelHandler)new ByteToMessageDecoder(){

                protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
                    if (byteBuf.readableBytes() < 5) {
                        return;
                    }
                    if (SslHandler.isEncrypted((ByteBuf)byteBuf)) {
                        SslHandler sslHandler = this.createSslHandler(channel.alloc());
                        channelHandlerContext.pipeline().replace((ChannelHandler)this, "ssl", (ChannelHandler)sslHandler);
                    } else {
                        channelHandlerContext.pipeline().remove((ChannelHandler)this);
                    }
                }
            });
        }
    }

    protected static abstract class AbstractSecureIntializer
    extends Initializer {
        private final EncryptionOptions encryptionOptions;

        protected AbstractSecureIntializer(Server server, EncryptionOptions encryptionOptions) {
            super(server);
            this.encryptionOptions = encryptionOptions;
        }

        protected final SslHandler createSslHandler(ByteBufAllocator allocator) throws IOException {
            SslContext sslContext = SSLFactory.getOrCreateSslContext(this.encryptionOptions, this.encryptionOptions.require_client_auth, SSLFactory.SocketType.SERVER);
            return sslContext.newHandler(allocator);
        }
    }

    private static class Initializer
    extends ChannelInitializer<Channel> {
        private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
        private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder();
        private static final Frame.InboundBodyTransformer inboundFrameTransformer = new Frame.InboundBodyTransformer();
        private static final Frame.OutboundBodyTransformer outboundFrameTransformer = new Frame.OutboundBodyTransformer();
        private static final Frame.Encoder frameEncoder = new Frame.Encoder();
        private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler();
        private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
        private final Server server;

        public Initializer(Server server) {
            this.server = server;
        }

        protected void initChannel(final Channel channel) throws Exception {
            long idleTimeout;
            ChannelPipeline pipeline = channel.pipeline();
            if (DatabaseDescriptor.getNativeTransportMaxConcurrentConnections() > 0L || DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp() > 0L) {
                pipeline.addFirst("connectionLimitHandler", (ChannelHandler)connectionLimitHandler);
            }
            if ((idleTimeout = DatabaseDescriptor.nativeTransportIdleTimeout()) > 0L) {
                pipeline.addLast("idleStateHandler", (ChannelHandler)new IdleStateHandler(false, 0L, 0L, idleTimeout, TimeUnit.MILLISECONDS){

                    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
                        logger.info("Closing client connection {} after timeout of {}ms", (Object)channel.remoteAddress(), (Object)idleTimeout);
                        ctx.close();
                    }
                });
            }
            pipeline.addLast("frameDecoder", (ChannelHandler)new Frame.Decoder(this.server.connectionFactory));
            pipeline.addLast("frameEncoder", (ChannelHandler)frameEncoder);
            pipeline.addLast("inboundFrameTransformer", (ChannelHandler)inboundFrameTransformer);
            pipeline.addLast("outboundFrameTransformer", (ChannelHandler)outboundFrameTransformer);
            pipeline.addLast("messageDecoder", (ChannelHandler)messageDecoder);
            pipeline.addLast("messageEncoder", (ChannelHandler)messageEncoder);
            pipeline.addLast("executor", (ChannelHandler)new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher(), EndpointPayloadTracker.get(((InetSocketAddress)channel.remoteAddress()).getAddress())));
            pipeline.addLast("exceptionHandler", (ChannelHandler)exceptionHandler);
        }
    }

    public static class EndpointPayloadTracker {
        private static final ConcurrentMap<InetAddress, EndpointPayloadTracker> requestPayloadInFlightPerEndpoint = new ConcurrentHashMap<InetAddress, EndpointPayloadTracker>();
        private final AtomicInteger refCount = new AtomicInteger(0);
        private final InetAddress endpoint;
        final ResourceLimits.EndpointAndGlobal endpointAndGlobalPayloadsInFlight = new ResourceLimits.EndpointAndGlobal(new ResourceLimits.Concurrent(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp()), Server.access$700());

        private EndpointPayloadTracker(InetAddress endpoint) {
            this.endpoint = endpoint;
        }

        public static EndpointPayloadTracker get(InetAddress endpoint) {
            EndpointPayloadTracker result;
            while (!(result = requestPayloadInFlightPerEndpoint.computeIfAbsent(endpoint, EndpointPayloadTracker::new)).acquire()) {
                requestPayloadInFlightPerEndpoint.remove(endpoint, result);
            }
            return result;
        }

        public static long getGlobalLimit() {
            return DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes();
        }

        public static void setGlobalLimit(long newLimit) {
            DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(newLimit);
            long existingLimit = globalRequestPayloadInFlight.setLimit(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
            logger.info("Changed native_max_transport_requests_in_bytes from {} to {}", (Object)existingLimit, (Object)newLimit);
        }

        public static long getEndpointLimit() {
            return DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp();
        }

        public static void setEndpointLimit(long newLimit) {
            long existingLimit = DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp();
            DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(newLimit);
            for (EndpointPayloadTracker tracker : requestPayloadInFlightPerEndpoint.values()) {
                existingLimit = tracker.endpointAndGlobalPayloadsInFlight.endpoint().setLimit(newLimit);
            }
            logger.info("Changed native_max_transport_requests_in_bytes_per_ip from {} to {}", (Object)existingLimit, (Object)newLimit);
        }

        private boolean acquire() {
            return 0 < this.refCount.updateAndGet(i -> i < 0 ? i : i + 1);
        }

        public void release() {
            if (-1 == this.refCount.updateAndGet(i -> i == 1 ? -1 : i - 1)) {
                requestPayloadInFlightPerEndpoint.remove(this.endpoint, this);
            }
        }
    }

    public static class ConnectionTracker
    implements Connection.Tracker {
        public final ChannelGroup allChannels = new DefaultChannelGroup((EventExecutor)GlobalEventExecutor.INSTANCE);
        private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap(Event.Type.class);
        private final ProtocolVersionTracker protocolVersionTracker = new ProtocolVersionTracker();

        public ConnectionTracker() {
            for (Event.Type type : Event.Type.values()) {
                this.groups.put(type, (ChannelGroup)new DefaultChannelGroup(type.toString(), (EventExecutor)GlobalEventExecutor.INSTANCE));
            }
        }

        @Override
        public void addConnection(Channel ch, Connection connection) {
            this.allChannels.add((Object)ch);
            if (ch.remoteAddress() instanceof InetSocketAddress) {
                this.protocolVersionTracker.addConnection(((InetSocketAddress)ch.remoteAddress()).getAddress(), connection.getVersion());
            }
        }

        public void register(Event.Type type, Channel ch) {
            this.groups.get((Object)type).add((Object)ch);
        }

        public void send(Event event) {
            this.groups.get((Object)event.type).writeAndFlush((Object)new EventMessage(event));
        }

        void closeAll() {
            this.allChannels.close().awaitUninterruptibly();
        }

        int countConnectedClients() {
            return this.allChannels.size() != 0 ? this.allChannels.size() - 1 : 0;
        }

        Map<String, Integer> countConnectedClientsByUser() {
            HashMap<String, Integer> result = new HashMap<String, Integer>();
            for (Channel c : this.allChannels) {
                Connection connection = (Connection)c.attr(Connection.attributeKey).get();
                if (!(connection instanceof ServerConnection)) continue;
                ServerConnection conn = (ServerConnection)connection;
                AuthenticatedUser user = conn.getClientState().getUser();
                String name = null != user ? user.getName() : null;
                result.put(name, result.getOrDefault(name, 0) + 1);
            }
            return result;
        }
    }

    public static class Builder {
        private EventLoopGroup workerGroup;
        private EventExecutor eventExecutorGroup;
        private boolean useSSL = false;
        private InetAddress hostAddr;
        private int port = -1;
        private InetSocketAddress socket;

        public Builder withSSL(boolean useSSL) {
            this.useSSL = useSSL;
            return this;
        }

        public Builder withEventLoopGroup(EventLoopGroup eventLoopGroup) {
            this.workerGroup = eventLoopGroup;
            return this;
        }

        public Builder withHost(InetAddress host) {
            this.hostAddr = host;
            this.socket = null;
            return this;
        }

        public Builder withPort(int port) {
            this.port = port;
            this.socket = null;
            return this;
        }

        public Server build() {
            return new Server(this);
        }

        private InetSocketAddress getSocket() {
            if (this.socket != null) {
                return this.socket;
            }
            if (this.port == -1) {
                throw new IllegalStateException("Missing port number");
            }
            if (this.hostAddr == null) {
                throw new IllegalStateException("Missing host");
            }
            this.socket = new InetSocketAddress(this.hostAddr, this.port);
            return this.socket;
        }
    }
}

