/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.dynamodb;

import java.util.Objects;
import java.util.function.Function;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination;
import org.opensearch.dataprepper.plugins.source.dynamodb.ClientFactory;
import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBService;
import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.PartitionFactory;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.LeaderPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name="dynamodb", pluginType=Source.class, pluginConfigurationType=DynamoDBSourceConfig.class)
public class DynamoDBSource
implements Source<Record<Event>>,
UsesEnhancedSourceCoordination {
    private static final Logger LOG = LoggerFactory.getLogger(DynamoDBSource.class);
    private final PluginMetrics pluginMetrics;
    private final DynamoDBSourceConfig sourceConfig;
    private final PluginFactory pluginFactory;
    private final ClientFactory clientFactory;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private EnhancedSourceCoordinator coordinator;
    private DynamoDBService dynamoDBService;
    private final boolean acknowledgementsEnabled;

    @DataPrepperPluginConstructor
    public DynamoDBSource(PluginMetrics pluginMetrics, DynamoDBSourceConfig sourceConfig, PluginFactory pluginFactory, AwsCredentialsSupplier awsCredentialsSupplier, AcknowledgementSetManager acknowledgementSetManager) {
        LOG.info("Create DynamoDB Source");
        this.pluginMetrics = pluginMetrics;
        this.sourceConfig = sourceConfig;
        this.pluginFactory = pluginFactory;
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.acknowledgementsEnabled = sourceConfig.isAcknowledgmentsEnabled();
        this.clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig(), sourceConfig.getTableConfigs().get(0).getExportConfig());
    }

    public boolean areAcknowledgementsEnabled() {
        return this.acknowledgementsEnabled;
    }

    public void start(Buffer<Record<Event>> buffer) {
        Objects.requireNonNull(this.coordinator);
        this.coordinator.createPartition((EnhancedSourcePartition)new LeaderPartition());
        this.dynamoDBService = new DynamoDBService(this.coordinator, this.clientFactory, this.sourceConfig, this.pluginMetrics, this.acknowledgementSetManager);
        LOG.info("Start DynamoDB service");
        this.dynamoDBService.start(buffer);
    }

    public void stop() {
        LOG.info("Stop DynamoDB Source");
        if (Objects.nonNull(this.dynamoDBService)) {
            this.dynamoDBService.shutdown();
        }
    }

    public void setEnhancedSourceCoordinator(EnhancedSourceCoordinator sourceCoordinator) {
        this.coordinator = sourceCoordinator;
        this.coordinator.initialize();
    }

    public Function<SourcePartitionStoreItem, EnhancedSourcePartition> getPartitionFactory() {
        return new PartitionFactory();
    }
}

