/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.dynamodb.leader;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.opensearch.dataprepper.plugins.source.dynamodb.leader.ShardCache;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;

public class ShardManager {
    private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
    private static final int MAX_SHARD_COUNT = 100;
    private static final int DEFAULT_CLEAN_UP_CACHE_INTERVAL_MILLS = 600000;
    private final Map<String, StreamInfo> streamMap;
    private Map<String, String> endingSequenceNumberMap;
    private final DynamoDbStreamsClient streamsClient;
    private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;

    public ShardManager(DynamoDbStreamsClient streamsClient, DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics) {
        this.streamsClient = streamsClient;
        this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics;
        this.streamMap = new HashMap<String, StreamInfo>();
        this.endingSequenceNumberMap = new HashMap<String, String>();
    }

    public List<Shard> runDiscovery(String streamArn) {
        StreamInfo streamInfo = this.streamMap.get(streamArn);
        if (streamInfo == null) {
            streamInfo = new StreamInfo();
            streamInfo.setLastCacheBuildTime(System.currentTimeMillis());
            streamInfo.setLastEvaluatedShardId(null);
            streamInfo.setShardCache(new ShardCache());
            this.streamMap.put(streamArn, streamInfo);
        }
        ShardCache shardCache = streamInfo.getShardCache();
        if (System.currentTimeMillis() - streamInfo.getLastCacheBuildTime() > 600000L) {
            LOG.debug("Perform regular rebuild of cache.");
            streamInfo.setLastEvaluatedShardId(null);
            streamInfo.setLastCacheBuildTime(System.currentTimeMillis());
            shardCache.clear();
            this.endingSequenceNumberMap.clear();
        }
        LOG.debug("Last evaluated shard ID is " + streamInfo.getLastEvaluatedShardId());
        List<Shard> shards = this.listShards(streamArn, streamInfo.getLastEvaluatedShardId());
        if (!shards.isEmpty()) {
            shards.forEach(shard -> shardCache.put(shard.shardId(), shard.parentShardId()));
            if (streamInfo.getLastEvaluatedShardId() == null) {
                this.endingSequenceNumberMap = shards.stream().filter(shard -> shard.sequenceNumberRange().endingSequenceNumber() != null).collect(Collectors.toMap(shard -> shard.shardId(), shard -> shard.sequenceNumberRange().endingSequenceNumber()));
            }
            LOG.debug("New last evaluated shard ID is " + shards.get(shards.size() - 1).shardId());
            streamInfo.setLastEvaluatedShardId(shards.get(shards.size() - 1).shardId());
        }
        return shards;
    }

    public String getEndingSequenceNumber(String shardId) {
        return this.endingSequenceNumberMap.get(shardId);
    }

    public List<String> findChildShardIds(String streamArn, String parentShardId) {
        StreamInfo streamInfo = this.streamMap.get(streamArn);
        if (streamInfo == null) {
            return Collections.emptyList();
        }
        ShardCache shardCache = streamInfo.getShardCache();
        List<String> childShardIds = shardCache.get(parentShardId);
        return childShardIds;
    }

    private List<Shard> listShards(String streamArn, String lastEvaluatedShardId) {
        LOG.debug("Start listing all shards for stream {}", (Object)streamArn);
        long startTime = System.currentTimeMillis();
        ArrayList<Shard> shards = new ArrayList<Shard>();
        try {
            DescribeStreamResponse describeStreamResult;
            do {
                DescribeStreamRequest req = (DescribeStreamRequest)DescribeStreamRequest.builder().streamArn(streamArn).limit(Integer.valueOf(100)).exclusiveStartShardId(lastEvaluatedShardId).build();
                this.dynamoDBSourceAggregateMetrics.getStreamApiInvocations().increment();
                describeStreamResult = this.streamsClient.describeStream(req);
                shards.addAll(describeStreamResult.streamDescription().shards());
            } while ((lastEvaluatedShardId = describeStreamResult.streamDescription().lastEvaluatedShardId()) != null);
        }
        catch (InternalServerErrorException e) {
            LOG.error("Received an internal server exception from DynamoDB while listing shards: {}", (Object)e.getMessage());
            this.dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment();
            return shards;
        }
        catch (SdkException e) {
            LOG.error("Received an exception from DynamoDB while listing shards: {}", (Object)e.getMessage());
            this.dynamoDBSourceAggregateMetrics.getStream4xxErrors().increment();
            return shards;
        }
        long endTime = System.currentTimeMillis();
        LOG.info("Listing shards (DescribeStream call) took {} milliseconds with {} shards found", (Object)(endTime - startTime), (Object)shards.size());
        return shards;
    }

    class StreamInfo {
        private String lastEvaluatedShardId;
        private long lastCacheBuildTime;
        private ShardCache shardCache;

        StreamInfo() {
        }

        public String getLastEvaluatedShardId() {
            return this.lastEvaluatedShardId;
        }

        public void setLastEvaluatedShardId(String lastEvaluatedShardId) {
            this.lastEvaluatedShardId = lastEvaluatedShardId;
        }

        public long getLastCacheBuildTime() {
            return this.lastCacheBuildTime;
        }

        public void setLastCacheBuildTime(long lastCacheBuildTime) {
            this.lastCacheBuildTime = lastCacheBuildTime;
        }

        public ShardCache getShardCache() {
            return this.shardCache;
        }

        public void setShardCache(ShardCache shardCache) {
            this.shardCache = shardCache;
        }
    }
}

