/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.dynamodb.stream;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer;
import org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamCheckpointer;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.TableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;

public class ShardConsumerFactory {
    private static final Logger LOG = LoggerFactory.getLogger(ShardConsumerFactory.class);
    private final DynamoDbStreamsClient streamsClient;
    private final EnhancedSourceCoordinator enhancedSourceCoordinator;
    private final PluginMetrics pluginMetrics;
    private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;
    private final Buffer<Record<Event>> buffer;
    private final StreamConfig streamConfig;

    public ShardConsumerFactory(EnhancedSourceCoordinator enhancedSourceCoordinator, DynamoDbStreamsClient streamsClient, PluginMetrics pluginMetrics, DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics, Buffer<Record<Event>> buffer, StreamConfig streamConfig) {
        this.streamsClient = streamsClient;
        this.enhancedSourceCoordinator = enhancedSourceCoordinator;
        this.pluginMetrics = pluginMetrics;
        this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics;
        this.buffer = buffer;
        this.streamConfig = streamConfig;
    }

    public Runnable createConsumer(StreamPartition streamPartition, AcknowledgementSet acknowledgementSet, Duration shardAcknowledgmentTimeout) {
        String shardIterator;
        LOG.info("Starting to consume shard " + streamPartition.getShardId());
        Optional<StreamProgressState> progressState = streamPartition.getProgressState();
        String sequenceNumber = null;
        String lastShardIterator = null;
        Instant startTime = null;
        boolean waitForExport = false;
        if (progressState.isPresent()) {
            String endingSequenceNumber;
            sequenceNumber = acknowledgementSet == null ? null : progressState.get().getSequenceNumber();
            waitForExport = progressState.get().shouldWaitForExport();
            if (progressState.get().getStartTime() != 0L) {
                startTime = Instant.ofEpochMilli(progressState.get().getStartTime());
            }
            if ((endingSequenceNumber = progressState.get().getEndingSequenceNumber()) != null && !endingSequenceNumber.isEmpty()) {
                lastShardIterator = this.getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), endingSequenceNumber);
            }
        }
        if ((shardIterator = this.getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), sequenceNumber)) == null) {
            LOG.error("Failed to start consuming shard '{}'. Unable to get a shard iterator for this shard, this shard may have expired", (Object)streamPartition.getShardId());
            return null;
        }
        StreamCheckpointer checkpointer = new StreamCheckpointer(this.enhancedSourceCoordinator, streamPartition);
        String tableArn = TableUtil.getTableArnFromStreamArn(streamPartition.getStreamArn());
        TableInfo tableInfo = this.getTableInfo(tableArn);
        LOG.debug("Create shard consumer for {} with shardIter {}", (Object)streamPartition.getShardId(), (Object)shardIterator);
        LOG.debug("Create shard consumer for {} with lastShardIter {}", (Object)streamPartition.getShardId(), (Object)lastShardIterator);
        ShardConsumer shardConsumer = ShardConsumer.builder(this.streamsClient, this.pluginMetrics, this.dynamoDBSourceAggregateMetrics, this.buffer, this.streamConfig).tableInfo(tableInfo).checkpointer(checkpointer).shardIterator(shardIterator).shardId(streamPartition.getShardId()).lastShardIterator(lastShardIterator).startTime(startTime).waitForExport(waitForExport).acknowledgmentSet(acknowledgementSet).acknowledgmentSetTimeout(shardAcknowledgmentTimeout).build();
        return shardConsumer;
    }

    private TableInfo getTableInfo(String tableArn) {
        GlobalState tableState = (GlobalState)((Object)this.enhancedSourceCoordinator.getPartition(tableArn).get());
        TableInfo tableInfo = new TableInfo(tableArn, TableMetadata.fromMap(tableState.getProgressState().get()));
        return tableInfo;
    }

    public String getShardIterator(String streamArn, String shardId, String sequenceNumber) {
        GetShardIteratorRequest getShardIteratorRequest;
        LOG.debug("Get Initial Shard Iter for {}", (Object)shardId);
        if (sequenceNumber != null && !sequenceNumber.isEmpty()) {
            LOG.debug("Get Shard Iterator at {}", (Object)sequenceNumber);
            getShardIteratorRequest = (GetShardIteratorRequest)GetShardIteratorRequest.builder().shardId(shardId).streamArn(streamArn).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER).sequenceNumber(sequenceNumber).build();
        } else {
            LOG.debug("Get Shard Iterator from beginning (TRIM_HORIZON) for shard {}", (Object)shardId);
            getShardIteratorRequest = (GetShardIteratorRequest)GetShardIteratorRequest.builder().shardId(shardId).streamArn(streamArn).shardIteratorType(ShardIteratorType.TRIM_HORIZON).build();
        }
        try {
            this.dynamoDBSourceAggregateMetrics.getStreamApiInvocations().increment();
            GetShardIteratorResponse getShardIteratorResult = this.streamsClient.getShardIterator(getShardIteratorRequest);
            return getShardIteratorResult.shardIterator();
        }
        catch (InternalServerErrorException e) {
            this.dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment();
            LOG.error("Received an internal server error from DynamoDB while getting a shard iterator: {}", (Object)e.getMessage());
            return null;
        }
        catch (SdkException e) {
            this.dynamoDBSourceAggregateMetrics.getStream4xxErrors().increment();
            LOG.error("Exception when trying to get the shard iterator due to {}", (Object)e.getMessage());
            return null;
        }
    }
}

