/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.kinesis.source;

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfigSupplier;
import org.opensearch.dataprepper.plugins.kinesis.source.HostNameWorkerIdentifierGenerator;
import org.opensearch.dataprepper.plugins.kinesis.source.KinesisClientFactory;
import org.opensearch.dataprepper.plugins.kinesis.source.KinesisService;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name="kinesis", alternateNames={"kinesis-data-streams", "kinesis_data_streams"}, pluginType=Source.class, pluginConfigurationType=KinesisSourceConfig.class)
public class KinesisSource
implements Source<Record<Event>> {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
    private final KinesisSourceConfig kinesisSourceConfig;
    private final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier;
    private KinesisService kinesisService;

    @DataPrepperPluginConstructor
    public KinesisSource(KinesisSourceConfig kinesisSourceConfig, PluginMetrics pluginMetrics, PluginFactory pluginFactory, PipelineDescription pipelineDescription, AwsCredentialsSupplier awsCredentialsSupplier, AcknowledgementSetManager acknowledgementSetManager, KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier) {
        this.kinesisSourceConfig = kinesisSourceConfig;
        this.kinesisLeaseConfigSupplier = kinesisLeaseConfigSupplier;
        KinesisClientFactory kinesisClientFactory = new KinesisClientFactory(awsCredentialsSupplier, kinesisSourceConfig.getAwsAuthenticationConfig());
        this.kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, new HostNameWorkerIdentifierGenerator());
    }

    public void start(Buffer<Record<Event>> buffer) {
        if (buffer == null) {
            throw new IllegalStateException("Buffer provided is null");
        }
        this.kinesisService.start(buffer);
    }

    public void stop() {
        this.kinesisService.shutDown();
    }

    public boolean areAcknowledgementsEnabled() {
        return this.kinesisSourceConfig.isAcknowledgments();
    }

    public void setKinesisService(KinesisService kinesisService) {
        this.kinesisService = kinesisService;
    }
}

