/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.health.HealthCheck;
import com.codahale.metrics.health.HealthCheckRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import com.codahale.metrics.servlets.HealthCheckServlet;
import com.codahale.metrics.servlets.MetricsServlet;
import com.codahale.metrics.servlets.PingServlet;
import io.debezium.config.Configuration;
import io.debezium.connector.cassandra.AbstractProcessor;
import io.debezium.connector.cassandra.CassandraConnectorConfig;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CassandraConnectorTaskHealthCheck;
import io.debezium.connector.cassandra.CommitLogPostProcessor;
import io.debezium.connector.cassandra.CommitLogProcessor;
import io.debezium.connector.cassandra.QueueProcessor;
import io.debezium.connector.cassandra.SchemaProcessor;
import io.debezium.connector.cassandra.SnapshotProcessor;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorConfigException;
import io.debezium.connector.cassandra.network.BuildInfoServlet;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.servlet.Servlet;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraConnectorTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraConnectorTask.class);
    public static final MetricRegistry METRIC_REGISTRY_INSTANCE = new MetricRegistry();
    private final CassandraConnectorConfig config;
    private CassandraConnectorContext taskContext;
    private ProcessorGroup processorGroup;
    private Server httpServer;
    private JmxReporter jmxReporter;

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            throw new CassandraConnectorConfigException("CDC config file is required");
        }
        String configPath = args[0];
        try (FileInputStream fis = new FileInputStream(configPath);){
            CassandraConnectorConfig config = new CassandraConnectorConfig(Configuration.load((InputStream)fis));
            CassandraConnectorTask task = new CassandraConnectorTask(config);
            task.run();
        }
    }

    public CassandraConnectorTask(CassandraConnectorConfig config) {
        this.config = config;
    }

    void run() throws Exception {
        try {
            LOGGER.info("Initializing Cassandra connector task context ...");
            this.taskContext = new CassandraConnectorContext(this.config);
            LOGGER.info("Starting processor group ...");
            this.initProcessorGroup();
            this.processorGroup.start();
            LOGGER.info("Starting HTTP server ...");
            this.initHttpServer();
            this.httpServer.start();
            LOGGER.info("Starting JMX reporter ...");
            this.initJmxReporter(this.config.connectorName());
            this.jmxReporter.start();
            while (this.processorGroup.isRunning()) {
                Thread.sleep(1000L);
            }
        }
        finally {
            this.stopAll();
        }
    }

    private void stopAll() throws Exception {
        if (this.processorGroup != null) {
            this.processorGroup.terminate();
            LOGGER.info("Stopped processor group");
        }
        if (this.httpServer != null) {
            this.httpServer.stop();
            LOGGER.info("Stopped HTTP server");
        }
        if (this.jmxReporter != null) {
            this.jmxReporter.stop();
            LOGGER.info("Stopped JMX reporter");
        }
        if (this.taskContext != null) {
            this.taskContext.cleanUp();
            LOGGER.info("Cleaned up Cassandra connector task context");
        }
    }

    private void initHttpServer() {
        int httpPort = this.config.httpPort();
        LOGGER.info("HTTP port is {}", (Object)httpPort);
        this.httpServer = new Server(httpPort);
        ServletContextHandler contextHandler = new ServletContextHandler(1);
        contextHandler.setContextPath("/");
        this.httpServer.setHandler((Handler)contextHandler);
        contextHandler.addServlet(new ServletHolder((Servlet)new PingServlet()), "/ping");
        contextHandler.addServlet(new ServletHolder((Servlet)new BuildInfoServlet(CassandraConnectorTask.getBuildInfoMap(this.getClass()))), "/buildinfo");
        contextHandler.addServlet(new ServletHolder((Servlet)new MetricsServlet(METRIC_REGISTRY_INSTANCE)), "/metrics");
        contextHandler.addServlet(new ServletHolder((Servlet)new HealthCheckServlet(this.registerHealthCheck())), "/health");
    }

    private void initProcessorGroup() throws IOException {
        this.processorGroup = new ProcessorGroup("Cassandra Connector Task");
        this.processorGroup.addProcessor(new SchemaProcessor(this.taskContext));
        this.processorGroup.addProcessor(new CommitLogProcessor(this.taskContext));
        this.processorGroup.addProcessor(new SnapshotProcessor(this.taskContext));
        this.processorGroup.addProcessor(new QueueProcessor(this.taskContext));
        if (this.taskContext.getCassandraConnectorConfig().postProcessEnabled()) {
            this.processorGroup.addProcessor(new CommitLogPostProcessor(this.taskContext));
        }
    }

    private void initJmxReporter(String domain) {
        this.jmxReporter = JmxReporter.forRegistry((MetricRegistry)METRIC_REGISTRY_INSTANCE).inDomain(domain).build();
    }

    private HealthCheckRegistry registerHealthCheck() {
        CassandraConnectorTaskHealthCheck healthCheck = new CassandraConnectorTaskHealthCheck(this.processorGroup, this.taskContext.getCassandraClient());
        HealthCheckRegistry healthCheckRegistry = new HealthCheckRegistry();
        healthCheckRegistry.register("cassandra-cdc-health-check", (HealthCheck)healthCheck);
        return healthCheckRegistry;
    }

    private static Map<String, String> getBuildInfoMap(Class<?> clazz) {
        HashMap<String, String> buildInfo = new HashMap<String, String>();
        buildInfo.put("version", clazz.getPackage().getImplementationVersion());
        buildInfo.put("service_name", clazz.getPackage().getImplementationTitle());
        return buildInfo;
    }

    public static class ProcessorGroup {
        private final String name;
        private final Set<AbstractProcessor> processors;
        private ExecutorService executorService;

        ProcessorGroup(String name) {
            this.name = name;
            this.processors = new HashSet<AbstractProcessor>();
        }

        public boolean isRunning() {
            for (AbstractProcessor processor : this.processors) {
                if (processor.isRunning()) continue;
                return false;
            }
            return true;
        }

        public String getName() {
            return this.name;
        }

        void addProcessor(AbstractProcessor processor) {
            this.processors.add(processor);
        }

        void start() {
            this.executorService = Executors.newFixedThreadPool(this.processors.size());
            LOGGER.info("Starting processor group {}", (Object)this.getName());
            for (AbstractProcessor processor : this.processors) {
                Runnable runnable = () -> {
                    try {
                        processor.initialize();
                        processor.start();
                    }
                    catch (Exception e) {
                        LOGGER.error("Encountered exception while running {}; stopping all processors in {}", new Object[]{processor.getName(), this.getName(), e});
                        try {
                            this.stopProcessors();
                        }
                        catch (Exception e2) {
                            LOGGER.error("Encountered exceptions while stopping all processors in {}", (Object)this.getName(), (Object)e2);
                        }
                    }
                };
                this.executorService.submit(runnable);
            }
        }

        void terminate() throws Exception {
            this.stopProcessors();
            LOGGER.info("Terminating processor group {}", (Object)this.getName());
            if (!this.executorService.isShutdown()) {
                this.executorService.shutdown();
                if (!this.executorService.awaitTermination(1L, TimeUnit.SECONDS)) {
                    this.executorService.shutdownNow();
                }
            }
        }

        private void stopProcessors() throws Exception {
            for (AbstractProcessor processor : this.processors) {
                processor.stop();
                processor.destroy();
            }
        }
    }
}

