/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.dynamodb;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.dynamodb.ClientFactory;
import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileLoaderFactory;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileScheduler;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.ManifestFileReader;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.S3ObjectReader;
import org.opensearch.dataprepper.plugins.source.dynamodb.leader.LeaderScheduler;
import org.opensearch.dataprepper.plugins.source.dynamodb.leader.ShardManager;
import org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumerFactory;
import org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.BackoffCalculator;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.s3.S3Client;

public class DynamoDBService {
    private static final Logger LOG = LoggerFactory.getLogger(DynamoDBService.class);
    private final List<TableConfig> tableConfigs;
    private final EnhancedSourceCoordinator coordinator;
    private final DynamoDbClient dynamoDbClient;
    private final DynamoDBSourceConfig dynamoDBSourceConfig;
    private final DynamoDbStreamsClient dynamoDbStreamsClient;
    private final S3Client s3Client;
    private final ShardManager shardManager;
    private final ExecutorService executor;
    private final PluginMetrics pluginMetrics;
    private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;
    private final AcknowledgementSetManager acknowledgementSetManager;

    public DynamoDBService(EnhancedSourceCoordinator coordinator, ClientFactory clientFactory, DynamoDBSourceConfig sourceConfig, PluginMetrics pluginMetrics, AcknowledgementSetManager acknowledgementSetManager) {
        this.coordinator = coordinator;
        this.pluginMetrics = pluginMetrics;
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.dynamoDBSourceConfig = sourceConfig;
        this.dynamoDBSourceAggregateMetrics = new DynamoDBSourceAggregateMetrics();
        this.dynamoDbClient = clientFactory.buildDynamoDBClient();
        this.dynamoDbStreamsClient = clientFactory.buildDynamoDbStreamClient();
        this.s3Client = clientFactory.buildS3Client();
        this.shardManager = new ShardManager(this.dynamoDbStreamsClient, this.dynamoDBSourceAggregateMetrics);
        this.tableConfigs = sourceConfig.getTableConfigs();
        this.executor = Executors.newFixedThreadPool(4);
    }

    public void start(Buffer<Record<Event>> buffer) {
        LOG.info("Start running DynamoDB service");
        ManifestFileReader manifestFileReader = new ManifestFileReader(new S3ObjectReader(this.s3Client));
        ExportScheduler exportScheduler = new ExportScheduler(this.coordinator, this.dynamoDbClient, manifestFileReader, this.pluginMetrics, this.dynamoDBSourceAggregateMetrics);
        DataFileLoaderFactory loaderFactory = new DataFileLoaderFactory(this.coordinator, this.s3Client, this.pluginMetrics, buffer);
        DataFileScheduler fileLoaderScheduler = new DataFileScheduler(this.coordinator, loaderFactory, this.pluginMetrics, this.acknowledgementSetManager, this.dynamoDBSourceConfig);
        ShardConsumerFactory consumerFactory = new ShardConsumerFactory(this.coordinator, this.dynamoDbStreamsClient, this.pluginMetrics, this.dynamoDBSourceAggregateMetrics, buffer, this.dynamoDBSourceConfig.getTableConfigs().get(0).getStreamConfig());
        StreamScheduler streamScheduler = new StreamScheduler(this.coordinator, consumerFactory, this.pluginMetrics, this.acknowledgementSetManager, this.dynamoDBSourceConfig, new BackoffCalculator(this.dynamoDBSourceConfig.getTableConfigs().get(0).getExportConfig() != null));
        LeaderScheduler leaderScheduler = new LeaderScheduler(this.coordinator, this.dynamoDbClient, this.shardManager, this.tableConfigs);
        this.executor.submit(leaderScheduler);
        this.executor.submit(exportScheduler);
        this.executor.submit(fileLoaderScheduler);
        if (this.tableConfigs.get(0).getStreamConfig() != null) {
            this.executor.submit(streamScheduler);
        }
    }

    public void shutdown() {
        LOG.info("shutdown DynamoDB schedulers");
        this.executor.shutdownNow();
    }
}

