/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.sqs;

import com.linecorp.armeria.client.retry.Backoff;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.sqs.CodecBulkMessageFieldStrategy;
import org.opensearch.dataprepper.plugins.source.sqs.MessageFieldStrategy;
import org.opensearch.dataprepper.plugins.source.sqs.QueueConfig;
import org.opensearch.dataprepper.plugins.source.sqs.RawSqsMessageHandler;
import org.opensearch.dataprepper.plugins.source.sqs.SqsEventProcessor;
import org.opensearch.dataprepper.plugins.source.sqs.SqsSourceConfig;
import org.opensearch.dataprepper.plugins.source.sqs.SqsWorker;
import org.opensearch.dataprepper.plugins.source.sqs.StandardMessageFieldStrategy;
import org.opensearch.dataprepper.plugins.source.sqs.common.SqsBackoff;
import org.opensearch.dataprepper.plugins.source.sqs.common.SqsClientFactory;
import org.opensearch.dataprepper.plugins.source.sqs.common.SqsWorkerCommon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.utils.SdkAutoCloseable;

public class SqsService {
    private static final Logger LOG = LoggerFactory.getLogger(SqsService.class);
    static final long SHUTDOWN_TIMEOUT = 30L;
    private final SqsSourceConfig sqsSourceConfig;
    private final PluginMetrics pluginMetrics;
    private final PluginFactory pluginFactory;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private final List<ExecutorService> allSqsUrlExecutorServices;
    private final List<SqsWorker> sqsWorkers;
    private final Buffer<Record<Event>> buffer;
    private final Map<String, SqsClient> sqsClientMap = new HashMap<String, SqsClient>();
    private final AwsCredentialsProvider credentialsProvider;

    public SqsService(Buffer<Record<Event>> buffer, AcknowledgementSetManager acknowledgementSetManager, SqsSourceConfig sqsSourceConfig, PluginMetrics pluginMetrics, PluginFactory pluginFactory, AwsCredentialsProvider credentialsProvider) {
        this.sqsSourceConfig = sqsSourceConfig;
        this.pluginMetrics = pluginMetrics;
        this.pluginFactory = pluginFactory;
        this.credentialsProvider = credentialsProvider;
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.allSqsUrlExecutorServices = new ArrayList<ExecutorService>();
        this.sqsWorkers = new ArrayList<SqsWorker>();
        this.buffer = buffer;
    }

    public void start() {
        LOG.info("Starting SqsService");
        this.sqsSourceConfig.getQueues().forEach(queueConfig -> {
            MessageFieldStrategy strategy;
            String queueUrl = queueConfig.getUrl();
            String region = this.extractRegionFromQueueUrl(queueUrl);
            SqsClient sqsClient = this.sqsClientMap.computeIfAbsent(region, r -> SqsClientFactory.createSqsClient((Region)Region.of((String)r), (AwsCredentialsProvider)this.credentialsProvider));
            String queueName = queueUrl.substring(queueUrl.lastIndexOf(47) + 1);
            Backoff backoff = SqsBackoff.createExponentialBackoff();
            SqsWorkerCommon sqsWorkerCommon = new SqsWorkerCommon(backoff, this.pluginMetrics, this.acknowledgementSetManager);
            int numWorkers = queueConfig.getNumWorkers();
            if (queueConfig.getCodec() != null) {
                PluginModel codecConfiguration = queueConfig.getCodec();
                PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
                InputCodec codec = (InputCodec)this.pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings, new Object[0]);
                strategy = new CodecBulkMessageFieldStrategy(codec);
            } else {
                strategy = new StandardMessageFieldStrategy();
            }
            SqsEventProcessor sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler(strategy));
            ExecutorService executorService = Executors.newFixedThreadPool(numWorkers, (ThreadFactory)BackgroundThreadFactory.defaultExecutorThreadFactory((String)("sqs-source" + queueName)));
            this.allSqsUrlExecutorServices.add(executorService);
            List<SqsWorker> workers = IntStream.range(0, numWorkers).mapToObj(i -> new SqsWorker(this.buffer, this.acknowledgementSetManager, sqsClient, sqsWorkerCommon, this.sqsSourceConfig, (QueueConfig)queueConfig, this.pluginMetrics, sqsEventProcessor)).collect(Collectors.toList());
            this.sqsWorkers.addAll(workers);
            workers.forEach(executorService::submit);
            LOG.info("Started SQS workers for queue {} with {} workers", (Object)queueUrl, (Object)numWorkers);
        });
    }

    public void stop() {
        this.allSqsUrlExecutorServices.forEach(ExecutorService::shutdown);
        this.sqsWorkers.forEach(SqsWorker::stop);
        this.allSqsUrlExecutorServices.forEach(executorService -> {
            try {
                if (!executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                    LOG.warn("Failed to terminate SqsWorkers");
                    executorService.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                LOG.error("Interrupted during shutdown, exiting uncleanly...", (Throwable)e);
                executorService.shutdownNow();
                Thread.currentThread().interrupt();
            }
        });
        this.sqsClientMap.values().forEach(SdkAutoCloseable::close);
        LOG.info("SqsService shutdown completed.");
    }

    private String extractRegionFromQueueUrl(String queueUrl) {
        String[] split = queueUrl.split("\\.");
        return split[1];
    }
}

