/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.dynamodbv2.streamsadapter;

import com.amazonaws.services.dynamodbv2.streamsadapter.util.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.streamsadapter.util.KinesisMapperUtil;
import com.amazonaws.services.dynamodbv2.streamsadapter.util.ShardGraphTracker;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.ApiName;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.retrieval.AWSExceptionManager;

public class DynamoDBStreamsShardDetector
implements ShardDetector {
    private static final Logger log = LoggerFactory.getLogger(DynamoDBStreamsShardDetector.class);
    private final Object $lock = new Object[0];
    @NonNull
    private final KinesisAsyncClient kinesisAsyncClient;
    @NonNull
    private final StreamIdentifier streamIdentifier;
    private final String streamArn;
    private final long listShardsCacheAllowedAgeInSeconds;
    private final int maxCacheMissesBeforeReload;
    private final int cacheMissWarningModulus;
    private final Duration kinesisRequestTimeout;
    private volatile Map<String, Shard> cachedShardMap = null;
    private volatile Instant lastCacheUpdateTime;
    private final AtomicInteger cacheMisses = new AtomicInteger(0);
    private static final AWSExceptionManager AWS_EXCEPTION_MANAGER = new AWSExceptionManager();

    public DynamoDBStreamsShardDetector(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull StreamIdentifier streamIdentifier, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, int cacheMissWarningModulus, Duration kinesisRequestTimeout) {
        if (kinesisAsyncClient == null) {
            throw new NullPointerException("kinesisAsyncClient is marked non-null but is null");
        }
        if (streamIdentifier == null) {
            throw new NullPointerException("streamIdentifier is marked non-null but is null");
        }
        this.kinesisAsyncClient = kinesisAsyncClient;
        this.streamIdentifier = streamIdentifier;
        this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds;
        this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload;
        this.cacheMissWarningModulus = cacheMissWarningModulus;
        this.kinesisRequestTimeout = kinesisRequestTimeout;
        this.streamArn = KinesisMapperUtil.createDynamoDBStreamsArnFromKinesisStreamName(this.streamIdentifier.streamName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Shard shard(String shardId) {
        Shard shard;
        if (CollectionUtils.isNullOrEmpty(this.cachedShardMap)) {
            DynamoDBStreamsShardDetector dynamoDBStreamsShardDetector = this;
            synchronized (dynamoDBStreamsShardDetector) {
                if (CollectionUtils.isNullOrEmpty(this.cachedShardMap)) {
                    this.listShards();
                }
            }
        }
        if ((shard = this.cachedShardMap.get(shardId)) == null && (this.cacheMisses.incrementAndGet() > this.maxCacheMissesBeforeReload || this.shouldRefreshCache())) {
            DynamoDBStreamsShardDetector dynamoDBStreamsShardDetector = this;
            synchronized (dynamoDBStreamsShardDetector) {
                shard = this.cachedShardMap.get(shardId);
                if (shard == null) {
                    log.info("Too many shard map cache misses for stream: {} or cache is out of date -- forcing a refresh", (Object)this.streamArn);
                    this.describeStream(null, "");
                    shard = this.cachedShardMap.get(shardId);
                    if (shard == null) {
                        log.warn("Even after cache refresh shard '{}' wasn't found. This could indicate a bigger problem.", (Object)shardId);
                    }
                }
                this.cacheMisses.set(0);
            }
        }
        if (shard == null) {
            String message = String.format("Cannot find the shard given the shardId %s. Cache misses: %s", shardId, this.cacheMisses);
            if (this.cacheMisses.get() % this.cacheMissWarningModulus == 0) {
                log.warn(message);
            } else {
                log.debug(message);
            }
        }
        return shard;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Shard> listShards() {
        Object object = this.$lock;
        synchronized (object) {
            DescribeStreamResult describeStreamResult = this.describeStream(null, "");
            return describeStreamResult.getShards();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Shard> listShards(String consumerId) {
        Object object = this.$lock;
        synchronized (object) {
            DescribeStreamResult describeStreamResult = this.describeStream(null, consumerId);
            return describeStreamResult.getShards();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public DescribeStreamResult describeStream(String lastSeenShardId, String consumerId) {
        Object object = this.$lock;
        synchronized (object) {
            DescribeStreamResponse describeStreamResponse;
            ShardGraphTracker shardTracker = new ShardGraphTracker();
            String exclusiveStartShardId = lastSeenShardId;
            DescribeStreamResult describeStreamResult = new DescribeStreamResult();
            do {
                describeStreamResponse = this.describeStreamResponse(exclusiveStartShardId, consumerId);
                shardTracker.collectShards(describeStreamResponse.streamDescription().shards());
                List shards = describeStreamResponse.streamDescription().shards();
                describeStreamResult.addStatus(describeStreamResponse.streamDescription().streamStatusAsString());
                if (shards.isEmpty()) continue;
                exclusiveStartShardId = ((Shard)shards.get(shards.size() - 1)).shardId();
            } while (describeStreamResponse.streamDescription().hasMoreShards().booleanValue());
            if (Objects.equals(describeStreamResult.getStreamStatus(), "ENABLING")) {
                log.warn("Stream: {} is in ENABLING state, new shards will not be discovered until stream gets enabled.", (Object)this.streamArn);
            }
            shardTracker.closeOpenParents();
            if (Objects.equals(describeStreamResult.getStreamStatus(), "DISABLED")) {
                shardTracker.markLeafShardsActive();
            }
            List<Shard> processedShards = shardTracker.getShards();
            describeStreamResult.addShards(processedShards);
            this.cachedShardMap(processedShards);
            return describeStreamResult;
        }
    }

    private DescribeStreamResponse describeStreamResponse(String exclusiveStartShardId, String consumerId) {
        DescribeStreamResponse describeStreamResponse;
        DescribeStreamRequest describeStreamRequest = (DescribeStreamRequest)DescribeStreamRequest.builder().streamName(this.streamArn).exclusiveStartShardId(exclusiveStartShardId).overrideConfiguration(((AwsRequestOverrideConfiguration.Builder)AwsRequestOverrideConfiguration.builder().addApiName(ApiName.builder().name(consumerId).version("3.3.0").build())).build()).build();
        try {
            describeStreamResponse = (DescribeStreamResponse)this.kinesisAsyncClient.describeStream(describeStreamRequest).get();
        }
        catch (ExecutionException e) {
            throw AWS_EXCEPTION_MANAGER.apply(e.getCause());
        }
        catch (InterruptedException e) {
            log.debug("Interrupted exception caught, shutdown initiated, returning null");
            return null;
        }
        if (describeStreamResponse == null) {
            throw new IllegalStateException("Received null from DescribeStream call.");
        }
        return describeStreamResponse;
    }

    private boolean shouldRefreshCache() {
        Duration secondsSinceLastUpdate = Duration.between(this.lastCacheUpdateTime, Instant.now());
        String message = String.format("Shard map cache for stream: %s is %d seconds old", this.streamArn, secondsSinceLastUpdate.getSeconds());
        if (secondsSinceLastUpdate.compareTo(Duration.of(this.listShardsCacheAllowedAgeInSeconds, ChronoUnit.SECONDS)) > 0) {
            log.info("{}. Age exceeds limit of {} seconds -- Refreshing.", (Object)message, (Object)this.listShardsCacheAllowedAgeInSeconds);
            return true;
        }
        log.debug("{}. Age doesn't exceed limit of {} seconds.", (Object)message, (Object)this.listShardsCacheAllowedAgeInSeconds);
        return false;
    }

    private void cachedShardMap(List<Shard> shards) {
        this.cachedShardMap = shards.stream().collect(Collectors.toMap(Shard::shardId, Function.identity()));
        this.lastCacheUpdateTime = Instant.now();
    }

    @NonNull
    public StreamIdentifier streamIdentifier() {
        return this.streamIdentifier;
    }

    AtomicInteger cacheMisses() {
        return this.cacheMisses;
    }

    static {
        AWS_EXCEPTION_MANAGER.add(KinesisException.class, t -> t);
        AWS_EXCEPTION_MANAGER.add(LimitExceededException.class, t -> t);
        AWS_EXCEPTION_MANAGER.add(ResourceInUseException.class, t -> t);
        AWS_EXCEPTION_MANAGER.add(ResourceNotFoundException.class, t -> t);
    }
}

