/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.http;

import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.encoding.DecodingService;
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
import com.linecorp.armeria.server.throttling.ThrottlingRejectHandler;
import com.linecorp.armeria.server.throttling.ThrottlingService;
import com.linecorp.armeria.server.throttling.ThrottlingStrategy;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.opensearch.dataprepper.HttpRequestExceptionHandler;
import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider;
import org.opensearch.dataprepper.http.BaseHttpService;
import org.opensearch.dataprepper.http.HttpServerConfig;
import org.opensearch.dataprepper.http.LogThrottlingRejectHandler;
import org.opensearch.dataprepper.http.LogThrottlingStrategy;
import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.codec.JsonDecoder;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.slf4j.Logger;

public abstract class BaseHttpSource<T extends Record<?>>
implements Source<T> {
    public static final String REGEX_HEALTH = "regex:^/(?!health$).*$";
    public static final String SERVER_CONNECTIONS = "serverConnections";
    private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}";
    private static final String HTTP_HEALTH_CHECK_PATH = "/health";
    private final HttpServerConfig sourceConfig;
    private final CertificateProviderFactory certificateProviderFactory;
    private final ArmeriaHttpAuthenticationProvider authenticationProvider;
    private final HttpRequestExceptionHandler httpRequestExceptionHandler;
    private final String pipelineName;
    private final String sourceName;
    private final Logger logger;
    private final PluginMetrics pluginMetrics;
    private Server server;
    private ByteDecoder byteDecoder;

    public BaseHttpSource(HttpServerConfig sourceConfig, PluginMetrics pluginMetrics, PluginFactory pluginFactory, PipelineDescription pipelineDescription, String sourceName, Logger logger) {
        this.sourceConfig = sourceConfig;
        this.pluginMetrics = pluginMetrics;
        this.pipelineName = pipelineDescription.getPipelineName();
        this.sourceName = sourceName;
        this.logger = logger;
        this.byteDecoder = new JsonDecoder();
        this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
        PluginModel authenticationConfiguration = sourceConfig.getAuthentication();
        if (authenticationConfiguration == null || authenticationConfiguration.getPluginName().equals("unauthenticated")) {
            logger.warn("Creating {} source without authentication. This is not secure.", (Object)sourceName);
            logger.warn("In order to set up Http Basic authentication for the {} source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#authentication-configurations", (Object)sourceName);
        }
        PluginSetting authenticationPluginSetting = authenticationConfiguration != null ? new PluginSetting(authenticationConfiguration.getPluginName(), authenticationConfiguration.getPluginSettings()) : new PluginSetting("unauthenticated", Collections.emptyMap());
        authenticationPluginSetting.setPipelineName(this.pipelineName);
        this.authenticationProvider = (ArmeriaHttpAuthenticationProvider)pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, authenticationPluginSetting, new Object[0]);
        this.httpRequestExceptionHandler = new HttpRequestExceptionHandler(pluginMetrics);
    }

    public void start(Buffer<T> buffer) {
        if (buffer == null) {
            throw new IllegalStateException("Buffer provided is null");
        }
        if (this.server == null) {
            ServerBuilder sb = Server.builder();
            sb.disableServerHeader();
            if (this.sourceConfig.isSsl()) {
                this.logger.info("Creating {} source with SSL/TLS enabled.", (Object)this.sourceName);
                CertificateProvider certificateProvider = this.certificateProviderFactory.getCertificateProvider();
                Certificate certificate = certificateProvider.getCertificate();
                sb.https(this.sourceConfig.getPort()).tls((InputStream)new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), (InputStream)new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)));
            } else {
                this.logger.warn("Creating {} source without SSL/TLS. This is not secure.", (Object)this.sourceName);
                this.logger.warn("In order to set up TLS for the {} source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl", (Object)this.sourceName);
                sb.http(this.sourceConfig.getPort());
            }
            if (this.sourceConfig.getAuthentication() != null) {
                Optional optionalAuthDecorator = this.authenticationProvider.getAuthenticationDecorator();
                if (this.sourceConfig.isUnauthenticatedHealthCheck()) {
                    optionalAuthDecorator.ifPresent(authDecorator -> sb.decorator(REGEX_HEALTH, authDecorator));
                } else {
                    optionalAuthDecorator.ifPresent(arg_0 -> ((ServerBuilder)sb).decorator(arg_0));
                }
            }
            sb.maxNumConnections(this.sourceConfig.getMaxConnectionCount());
            sb.requestTimeout(Duration.ofMillis(this.sourceConfig.getRequestTimeoutInMillis()));
            if (this.sourceConfig.getMaxRequestLength() != null) {
                sb.maxRequestLength(this.sourceConfig.getMaxRequestLength().getBytes());
            }
            int threads = this.sourceConfig.getThreadCount();
            ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads);
            sb.blockingTaskExecutor((ScheduledExecutorService)blockingTaskExecutor, true);
            int maxPendingRequests = this.sourceConfig.getMaxPendingRequests();
            LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy(maxPendingRequests, blockingTaskExecutor.getQueue());
            LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, this.pluginMetrics);
            String httpSourcePath = this.sourceConfig.getPath().replace(PIPELINE_NAME_PLACEHOLDER, this.pipelineName);
            sb.decorator(httpSourcePath, ThrottlingService.newDecorator((ThrottlingStrategy)logThrottlingStrategy, (ThrottlingRejectHandler)logThrottlingRejectHandler));
            BaseHttpService httpService = this.getHttpService(this.sourceConfig.getBufferTimeoutInMillis(), buffer, this.pluginMetrics);
            if (CompressionOption.NONE.equals((Object)this.sourceConfig.getCompression())) {
                sb.annotatedService(httpSourcePath, (Object)httpService, new Object[]{this.httpRequestExceptionHandler});
            } else {
                sb.annotatedService(httpSourcePath, (Object)httpService, DecodingService.newDecorator(), new Object[]{this.httpRequestExceptionHandler});
            }
            if (this.sourceConfig.hasHealthCheckService()) {
                this.logger.info("{} source health check is enabled", (Object)this.sourceName);
                sb.service(this.getHttpHealthCheckPath(), (HttpService)HealthCheckService.builder().longPolling(0L).build());
            }
            this.server = sb.build();
            this.pluginMetrics.gauge(SERVER_CONNECTIONS, (Object)this.server, Server::numConnections);
        }
        try {
            this.server.start().get();
        }
        catch (ExecutionException ex) {
            if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) {
                throw (RuntimeException)ex.getCause();
            }
            throw new RuntimeException(ex);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(ex);
        }
        this.logger.info("Started {} source on port {}", (Object)this.sourceName, (Object)this.sourceConfig.getPort());
    }

    public ByteDecoder getDecoder() {
        return this.byteDecoder;
    }

    public void stop() {
        if (this.server != null) {
            try {
                this.server.stop().get();
            }
            catch (ExecutionException ex) {
                if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) {
                    throw (RuntimeException)ex.getCause();
                }
                throw new RuntimeException(ex);
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(ex);
            }
        }
        this.logger.info("Stopped {} source.", (Object)this.sourceName);
    }

    public abstract BaseHttpService getHttpService(int var1, Buffer<T> var2, PluginMetrics var3);

    public String getHttpHealthCheckPath() {
        return HTTP_HEALTH_CHECK_PATH;
    }
}

