/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.thrift;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.CassandraServer;
import org.apache.cassandra.thrift.CustomTHsHaServer;
import org.apache.cassandra.thrift.CustomTNonBlockingServer;
import org.apache.cassandra.thrift.CustomTThreadPoolServer;
import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.cassandra.thrift.TCustomNonblockingServerSocket;
import org.apache.cassandra.thrift.TCustomServerSocket;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThriftServer
implements CassandraDaemon.Server {
    private static final Logger logger = LoggerFactory.getLogger(ThriftServer.class);
    private static final String SYNC = "sync";
    private static final String ASYNC = "async";
    private static final String HSHA = "hsha";
    public static final List<String> rpc_server_types = Arrays.asList("sync", "async", "hsha");
    private final InetAddress address;
    private final int port;
    private volatile ThriftServerThread server;

    public ThriftServer(InetAddress address, int port) {
        this.address = address;
        this.port = port;
    }

    @Override
    public void start() {
        if (this.server == null) {
            this.server = new ThriftServerThread(this.address, this.port);
            this.server.start();
        }
    }

    @Override
    public void stop() {
        if (this.server != null) {
            this.server.stopServer();
            try {
                this.server.join();
            }
            catch (InterruptedException e) {
                logger.error("Interrupted while waiting thrift server to stop", (Throwable)e);
            }
            this.server = null;
        }
    }

    @Override
    public boolean isRunning() {
        return this.server != null;
    }

    private static class CleaningThreadPool
    extends ThreadPoolExecutor {
        private final ThreadLocal<ClientState> state;

        public CleaningThreadPool(ThreadLocal<ClientState> state, int minWorkerThread, int maxWorkerThreads) {
            super(minWorkerThread, maxWorkerThreads, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory("Thrift"));
            this.state = state;
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
            this.state.get().logout();
        }
    }

    private static class ThriftServerThread
    extends Thread {
        private TServer serverEngine;

        public ThriftServerThread(InetAddress listenAddr, int listenPort) {
            CassandraServer cassandraServer = new CassandraServer();
            Cassandra.Processor processor = new Cassandra.Processor((Cassandra.Iface)cassandraServer);
            logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort));
            TBinaryProtocol.Factory tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength());
            int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize();
            TFramedTransport.Factory inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
            TFramedTransport.Factory outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
            logger.info("Using TFastFramedTransport with a max frame size of {} bytes.", (Object)tFramedTransportSize);
            if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ThriftServer.SYNC)) {
                TCustomServerSocket serverTransport;
                try {
                    serverTransport = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort), DatabaseDescriptor.getRpcKeepAlive(), DatabaseDescriptor.getRpcSendBufferSize(), DatabaseDescriptor.getRpcRecvBufferSize());
                }
                catch (TTransportException e) {
                    throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
                }
                TThreadPoolServer.Args serverArgs = (TThreadPoolServer.Args)((TThreadPoolServer.Args)((TThreadPoolServer.Args)((TThreadPoolServer.Args)((TThreadPoolServer.Args)new TThreadPoolServer.Args((TServerTransport)serverTransport).minWorkerThreads(DatabaseDescriptor.getRpcMinThreads().intValue()).maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads().intValue()).inputTransportFactory((TTransportFactory)inTransportFactory)).outputTransportFactory((TTransportFactory)outTransportFactory)).inputProtocolFactory((TProtocolFactory)tProtocolFactory)).outputProtocolFactory((TProtocolFactory)tProtocolFactory)).processor((TProcessor)processor);
                CleaningThreadPool executorService = new CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
                this.serverEngine = new CustomTThreadPoolServer(serverArgs, executorService);
                logger.info(String.format("Using synchronous/threadpool thrift server on %s : %s", listenAddr, listenPort));
            } else {
                TCustomNonblockingServerSocket serverTransport;
                try {
                    serverTransport = new TCustomNonblockingServerSocket(new InetSocketAddress(listenAddr, listenPort), DatabaseDescriptor.getRpcKeepAlive(), DatabaseDescriptor.getRpcSendBufferSize(), DatabaseDescriptor.getRpcRecvBufferSize());
                }
                catch (TTransportException e) {
                    throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e);
                }
                if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ThriftServer.ASYNC)) {
                    TNonblockingServer.Args serverArgs = (TNonblockingServer.Args)((TNonblockingServer.Args)((TNonblockingServer.Args)((TNonblockingServer.Args)((TNonblockingServer.Args)new TNonblockingServer.Args((TNonblockingServerTransport)serverTransport).inputTransportFactory((TTransportFactory)inTransportFactory)).outputTransportFactory((TTransportFactory)outTransportFactory)).inputProtocolFactory((TProtocolFactory)tProtocolFactory)).outputProtocolFactory((TProtocolFactory)tProtocolFactory)).processor((TProcessor)processor);
                    logger.info(String.format("Using non-blocking/asynchronous thrift server on %s : %s", listenAddr, listenPort));
                    this.serverEngine = new CustomTNonBlockingServer(serverArgs);
                } else if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ThriftServer.HSHA)) {
                    JMXEnabledThreadPoolExecutor executorService = new JMXEnabledThreadPoolExecutor((int)DatabaseDescriptor.getRpcMinThreads(), (int)DatabaseDescriptor.getRpcMaxThreads(), 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
                    TNonblockingServer.Args serverArgs = (TNonblockingServer.Args)((TNonblockingServer.Args)((TNonblockingServer.Args)((TNonblockingServer.Args)((TNonblockingServer.Args)new TNonblockingServer.Args((TNonblockingServerTransport)serverTransport).inputTransportFactory((TTransportFactory)inTransportFactory)).outputTransportFactory((TTransportFactory)outTransportFactory)).inputProtocolFactory((TProtocolFactory)tProtocolFactory)).outputProtocolFactory((TProtocolFactory)tProtocolFactory)).processor((TProcessor)processor);
                    logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", listenAddr, listenPort));
                    this.serverEngine = new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors());
                }
            }
        }

        @Override
        public void run() {
            logger.info("Listening for thrift clients...");
            this.serverEngine.serve();
        }

        public void stopServer() {
            logger.info("Stop listening to thrift clients");
            this.serverEngine.stop();
        }
    }
}

