/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.dynamodbv2.streamsadapter;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.dynamodbv2.model.SequenceNumberRange;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.ShardAdapter;
import com.amazonaws.services.dynamodbv2.streamsadapter.utils.Sleeper;
import com.amazonaws.services.dynamodbv2.streamsadapter.utils.ThreadSleeper;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxyExtended;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamStatus;
import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DynamoDBStreamsProxy
implements IKinesisProxyExtended {
    private static final Log LOG = LogFactory.getLog(DynamoDBStreamsProxy.class);
    private static final Set<ShardIteratorType> EXPECTED_ITERATOR_TYPES = EnumSet.of(ShardIteratorType.AT_SEQUENCE_NUMBER, ShardIteratorType.AFTER_SEQUENCE_NUMBER);
    private static final long DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS = 1000L;
    private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50;
    static final String END_SEQUENCE_NUMBER_TO_CLOSE_OPEN_PARENT = String.valueOf(Long.MAX_VALUE);
    private static final boolean DEFAULT_INCONSISTENCY_RESOLUTION_RETRY_BACKOFF_JITTER_ENABLED = true;
    private static final int DEFAULT_MAX_RETRIES_TO_RESOLVE_INCONSISTENCIES = 8;
    private static final long DEFAULT_INCONSISTENCY_RESOLUTION_RETRY_BACKOFF_MULTIPLIER_MILLIS = 200L;
    private static final long DEFAULT_INCONSISTENCY_RESOLUTION_RETRY_BACKOFF_BASE_MILLIS = 1200L;
    private static final long MAX_SHARD_COUNT_TO_TRIGGER_RETRIES = 1500L;
    private final AmazonKinesis client;
    private final AWSCredentialsProvider credentialsProvider;
    private final AtomicReference<List<Shard>> listOfShardsSinceLastGet = new AtomicReference();
    private final String streamName;
    private final Random random;
    private final boolean isInconsistencyResolutionRetryBackoffJitterEnabled;
    private final long describeStreamBackoffTimeInMillis;
    private final int maxDescribeStreamRetryAttempts;
    private final int maxRetriesToResolveInconsistencies;
    private final long inconsistencyResolutionRetryBackoffMultiplierInMillis;
    private final long inconsistencyResolutionRetryBackoffBaseInMillis;
    private final Sleeper sleeper;
    private ShardGraph shardGraph;

    private DynamoDBStreamsProxy(String streamName, AWSCredentialsProvider credentialProvider, AmazonKinesis kinesisClient, long describeStreamBackoffTimeInMillis, int maxDescribeStreamRetryAttempts, int maxRetriesToResolveInconsistencies, long inconsistencyResolutionRetryBackoffBaseInMillis, long inconsistencyResolutionRetryBackoffMultiplierInMillis, boolean isDefaultInconsistencyResolutionRetryBackoffJitterEnabled, Sleeper sleeper, Random random) {
        this.streamName = streamName;
        this.credentialsProvider = credentialProvider;
        this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis;
        this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts;
        this.maxRetriesToResolveInconsistencies = maxRetriesToResolveInconsistencies;
        this.inconsistencyResolutionRetryBackoffBaseInMillis = inconsistencyResolutionRetryBackoffBaseInMillis;
        this.inconsistencyResolutionRetryBackoffMultiplierInMillis = inconsistencyResolutionRetryBackoffMultiplierInMillis;
        this.isInconsistencyResolutionRetryBackoffJitterEnabled = isDefaultInconsistencyResolutionRetryBackoffJitterEnabled;
        this.client = kinesisClient;
        this.sleeper = sleeper;
        this.random = random;
        LOG.debug((Object)("DynamoDBStreamsProxy( " + streamName + ")"));
    }

    public GetRecordsResult get(String shardIterator, int maxRecords) throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException {
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
        getRecordsRequest.setRequestCredentials(this.credentialsProvider.getCredentials());
        getRecordsRequest.setShardIterator(shardIterator);
        getRecordsRequest.setLimit(Integer.valueOf(maxRecords));
        GetRecordsResult response = this.client.getRecords(getRecordsRequest);
        return response;
    }

    public DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException, LimitExceededException {
        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        describeStreamRequest.setRequestCredentials(this.credentialsProvider.getCredentials());
        describeStreamRequest.setStreamName(this.streamName);
        describeStreamRequest.setExclusiveStartShardId(startShardId);
        DescribeStreamResult response = null;
        LimitExceededException lastException = null;
        int remainingRetryTimes = this.maxDescribeStreamRetryAttempts;
        while (response == null) {
            try {
                response = this.client.describeStream(describeStreamRequest);
            }
            catch (LimitExceededException le) {
                LOG.info((Object)("Got LimitExceededException when describing stream " + this.streamName + ". Backing off for " + this.describeStreamBackoffTimeInMillis + " millis."));
                this.sleeper.sleep(this.describeStreamBackoffTimeInMillis);
                lastException = le;
            }
            if (--remainingRetryTimes != 0 || response != null) continue;
            if (lastException != null) {
                throw lastException;
            }
            throw new IllegalStateException("Received null from DescribeStream call.");
        }
        String streamStatus = response.getStreamDescription().getStreamStatus();
        if (StreamStatus.ACTIVE.toString().equals(streamStatus) || StreamStatus.UPDATING.toString().equals(streamStatus)) {
            return response;
        }
        LOG.info((Object)("Stream is in status " + streamStatus + ", DescribeStream returning null (wait until stream is Active or Updating"));
        return null;
    }

    public Shard getShard(String shardId) {
        if (this.listOfShardsSinceLastGet.get() == null) {
            this.getShardList();
        }
        for (Shard shard : this.listOfShardsSinceLastGet.get()) {
            if (!shard.getShardId().equals(shardId)) continue;
            return shard;
        }
        LOG.warn((Object)("Cannot find the shard given the shardId " + shardId));
        return null;
    }

    public synchronized List<Shard> getShardList() {
        if (this.shardGraph == null) {
            this.shardGraph = new ShardGraph();
        }
        if (this.buildShardGraphSnapshot() == ShardGraphProcessingResult.STREAM_DISABLED) {
            LOG.info((Object)"Stream was disabled during getShardList operation.");
            return null;
        }
        if ((long)this.shardGraph.size() < 1500L) {
            int retryAttempt;
            for (retryAttempt = 0; this.shardGraph.closedLeafNodeCount() > 0 && retryAttempt < this.maxRetriesToResolveInconsistencies; ++retryAttempt) {
                long backOffTime = this.getInconsistencyBackoffTimeInMillis(retryAttempt);
                String infoMsg = String.format("Inconsistency resolution retry attempt: %d. Backing off for %d millis.", retryAttempt, backOffTime);
                LOG.info((Object)infoMsg);
                this.sleeper.sleep(backOffTime);
                ShardGraphProcessingResult shardGraphProcessingResult = this.resolveInconsistenciesInShardGraph();
                if (shardGraphProcessingResult.equals((Object)ShardGraphProcessingResult.STREAM_DISABLED)) {
                    LOG.info((Object)"Stream was disabled during getShardList operation.");
                    return null;
                }
                if (!shardGraphProcessingResult.equals((Object)ShardGraphProcessingResult.RESOLVED_INCONSISTENCIES_AND_ABORTED)) continue;
                infoMsg = String.format("An intermediate page in DescribeStream response resolved inconsistencies. Total retry attempts taken to resolve inconsistencies: %d", retryAttempt + 1);
                LOG.info((Object)infoMsg);
                break;
            }
            if (retryAttempt == this.maxRetriesToResolveInconsistencies && this.shardGraph.closedLeafNodeCount() > 0) {
                LOG.warn((Object)"Inconsistencies in the shard graph were not resolved after exhausting all retries.");
            }
        } else if (this.shardGraph.closedLeafNodeCount() > 0) {
            String msg = String.format("Returning shard list with %s closed leaf node shards.", this.shardGraph.closedLeafNodeCount());
            LOG.debug((Object)msg);
        }
        this.listOfShardsSinceLastGet.set(this.shardGraph.getShards());
        this.shardGraph = new ShardGraph();
        return this.listOfShardsSinceLastGet.get();
    }

    private ShardGraphProcessingResult buildShardGraphSnapshot() {
        DescribeStreamResult response;
        do {
            if ((response = this.getStreamInfo(this.shardGraph.getLastFetchedShardId())) == null) {
                return ShardGraphProcessingResult.STREAM_DISABLED;
            }
            this.shardGraph.addNodes(response.getStreamDescription().getShards());
            LOG.debug((Object)String.format("Building shard graph snapshot; total shard count: %d", this.shardGraph.size()));
        } while (response.getStreamDescription().isHasMoreShards().booleanValue());
        return ShardGraphProcessingResult.FETCHED_ALL_AVAILABLE_SHARDS;
    }

    private ShardGraphProcessingResult resolveInconsistenciesInShardGraph() {
        DescribeStreamResult response;
        String warnMsg = String.format("Inconsistent shard graph state detected. Fetched: %d shards. Closed leaves: %d shards", this.shardGraph.size(), this.shardGraph.closedLeafNodeCount());
        LOG.warn((Object)warnMsg);
        if (LOG.isDebugEnabled()) {
            String debugMsg = String.format("Following leaf node shards are closed: %s", String.join((CharSequence)", ", this.shardGraph.getAllClosedLeafNodeIds()));
            LOG.debug((Object)debugMsg);
        }
        String exclusiveStartShardId = this.shardGraph.getEarliestClosedLeafNodeId();
        do {
            if ((response = this.getStreamInfo(exclusiveStartShardId)) == null) {
                return ShardGraphProcessingResult.STREAM_DISABLED;
            }
            this.shardGraph.addToClosedLeafNodes(response.getStreamDescription().getShards());
            LOG.debug((Object)String.format("Resolving inconsistencies in shard graph; total shard count: %d", this.shardGraph.size()));
            if (this.shardGraph.closedLeafNodeCount() == 0) {
                return ShardGraphProcessingResult.RESOLVED_INCONSISTENCIES_AND_ABORTED;
            }
            exclusiveStartShardId = this.shardGraph.getLastFetchedShardId();
        } while (response.getStreamDescription().isHasMoreShards().booleanValue());
        return ShardGraphProcessingResult.FETCHED_ALL_AVAILABLE_SHARDS;
    }

    @VisibleForTesting
    long getInconsistencyBackoffTimeInMillis(int retryAttempt) {
        double baseMultiplier = this.isInconsistencyResolutionRetryBackoffJitterEnabled ? this.random.nextDouble() : 1.0;
        return (long)(baseMultiplier * (double)this.inconsistencyResolutionRetryBackoffBaseInMillis) + (long)Math.pow(2.0, retryAttempt) * this.inconsistencyResolutionRetryBackoffMultiplierInMillis;
    }

    public Set<String> getAllShardIds() throws ResourceNotFoundException {
        List<Shard> shards = this.getShardList();
        if (shards == null) {
            return null;
        }
        HashSet<String> shardIds = new HashSet<String>();
        for (Shard shard : shards) {
            shardIds.add(shard.getShardId());
        }
        return shardIds;
    }

    public String getIterator(String shardId, String iteratorType, String sequenceNumber) {
        ShardIteratorType shardIteratorType;
        try {
            shardIteratorType = ShardIteratorType.fromValue((String)iteratorType);
        }
        catch (IllegalArgumentException iae) {
            LOG.error((Object)("Caught illegal argument exception while parsing iteratorType: " + iteratorType), (Throwable)iae);
            shardIteratorType = null;
        }
        if (!EXPECTED_ITERATOR_TYPES.contains(shardIteratorType)) {
            LOG.info((Object)"This method should only be used for AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER ShardIteratorTypes. For methods to use with other ShardIteratorTypes, see IKinesisProxy.java");
        }
        GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
        getShardIteratorRequest.setRequestCredentials(this.credentialsProvider.getCredentials());
        getShardIteratorRequest.setStreamName(this.streamName);
        getShardIteratorRequest.setShardId(shardId);
        getShardIteratorRequest.setShardIteratorType(iteratorType);
        getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
        getShardIteratorRequest.setTimestamp(null);
        GetShardIteratorResult response = this.client.getShardIterator(getShardIteratorRequest);
        return response.getShardIterator();
    }

    public String getIterator(String shardId, String iteratorType) {
        GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
        getShardIteratorRequest.setRequestCredentials(this.credentialsProvider.getCredentials());
        getShardIteratorRequest.setStreamName(this.streamName);
        getShardIteratorRequest.setShardId(shardId);
        getShardIteratorRequest.setShardIteratorType(iteratorType);
        getShardIteratorRequest.setStartingSequenceNumber(null);
        getShardIteratorRequest.setTimestamp(null);
        GetShardIteratorResult response = this.client.getShardIterator(getShardIteratorRequest);
        return response.getShardIterator();
    }

    public String getIterator(String shardId, Date timestamp) {
        throw new UnsupportedOperationException("DynamoDB Streams does not support shard iterator of type AT_TIMESTAMP");
    }

    public PutRecordResult put(String exclusiveMinimumSequenceNumber, String explicitHashKey, String partitionKey, ByteBuffer data) throws ResourceNotFoundException, InvalidArgumentException {
        throw new UnsupportedOperationException("DynamoDB Streams does not support Put operations.");
    }

    public static class Builder {
        private int maxDescribeStreamRetryAttempts = 50;
        private int maxRetriesToResolveInconsistencies = 8;
        private long describeStreamBackoffTimeInMillis = 1000L;
        private long inconsistencyResolutionRetryBackoffMultiplierInMillis = 200L;
        private long inconsistencyResolutionRetryBackoffBaseInMillis = 1200L;
        private boolean isInconsistencyResolutionRetryBackoffJitterEnabled = true;
        private final String streamName;
        private final AmazonKinesis kinesisClient;
        private final AWSCredentialsProvider credentialsProvider;
        private Sleeper sleeper;
        private Random random;

        public Builder(String streamName, AWSCredentialsProvider credentialsProvider, AmazonKinesis kinesisClient) {
            this.kinesisClient = kinesisClient;
            this.streamName = streamName;
            this.credentialsProvider = credentialsProvider;
        }

        public Builder withMaxDescribeStreamRetryAttempts(int maxDescribeStreamRetryAttempts) {
            this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts;
            return this;
        }

        public Builder withMaxRetriesToResolveInconsistencies(int maxRetriesToResolveInconsistencies) {
            this.maxRetriesToResolveInconsistencies = maxRetriesToResolveInconsistencies;
            return this;
        }

        public Builder withDescribeStreamBackoffTimeInMillis(long describeStreamBackoffTimeInMillis) {
            this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis;
            return this;
        }

        public Builder withInconsistencyResolutionRetryBackoffMultiplierInMillis(long inconsistencyResolutionRetryBackoffMultiplierInMillis) {
            this.inconsistencyResolutionRetryBackoffMultiplierInMillis = inconsistencyResolutionRetryBackoffMultiplierInMillis;
            return this;
        }

        public Builder withInconsistencyResolutionRetryBackoffBaseInMillis(long inconsistencyResolutionRetryBackoffBaseInMillis) {
            this.inconsistencyResolutionRetryBackoffBaseInMillis = inconsistencyResolutionRetryBackoffBaseInMillis;
            return this;
        }

        public Builder withInconsistencyResolutionRetryBackoffJitterEnabled(boolean inconsistencyResolutionRetryBackoffJitterEnabled) {
            this.isInconsistencyResolutionRetryBackoffJitterEnabled = inconsistencyResolutionRetryBackoffJitterEnabled;
            return this;
        }

        public Builder withSleeper(Sleeper sleeper) {
            this.sleeper = sleeper;
            return this;
        }

        public Builder withRandomNumberGeneratorForJitter(Random randomNumberGeneratorForJitter) {
            this.random = randomNumberGeneratorForJitter;
            return this;
        }

        public DynamoDBStreamsProxy build() {
            if (null == this.sleeper) {
                this.sleeper = new ThreadSleeper();
            }
            if (null == this.random) {
                this.random = ThreadLocalRandom.current();
            }
            return new DynamoDBStreamsProxy(this.streamName, this.credentialsProvider, this.kinesisClient, this.describeStreamBackoffTimeInMillis, this.maxDescribeStreamRetryAttempts, this.maxRetriesToResolveInconsistencies, this.inconsistencyResolutionRetryBackoffBaseInMillis, this.inconsistencyResolutionRetryBackoffMultiplierInMillis, this.isInconsistencyResolutionRetryBackoffJitterEnabled, this.sleeper, this.random);
        }
    }

    private static class ShardGraph {
        private final Map<String, ShardNode> nodes = new HashMap<String, ShardNode>();
        private final TreeSet<String> closedLeafNodeIds = new TreeSet();
        private String lastFetchedShardId;

        String getLastFetchedShardId() {
            return this.lastFetchedShardId;
        }

        String getEarliestClosedLeafNodeId() {
            if (this.closedLeafNodeIds.isEmpty()) {
                return null;
            }
            return this.closedLeafNodeIds.first();
        }

        private void addNodes(List<Shard> shards) {
            if (null == shards) {
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)String.format("Updating the graph with the following shards: \n %s", String.join((CharSequence)", ", shards.stream().map(Shard::getShardId).collect(Collectors.toList()))));
            }
            for (Shard shard : shards) {
                this.addNode(shard);
            }
            this.updateLastFetchedShardId(shards);
        }

        private ShardNode setShardEndSequenceNumberForOpenParent(ShardNode parentNode, ShardNode childNode) {
            Shard innerShard = parentNode.getShard();
            com.amazonaws.services.kinesis.model.SequenceNumberRange innerSequenceNumberRange = parentNode.getShard().getSequenceNumberRange();
            if (innerSequenceNumberRange != null && innerSequenceNumberRange.getEndingSequenceNumber() == null) {
                LOG.debug((Object)String.format("Marked open parent shard %s of shard %s as closed", parentNode.getShard().getShardId(), childNode.getShard().getShardId()));
                SequenceNumberRange modifiedSequenceNumberRange = new SequenceNumberRange().withStartingSequenceNumber(innerSequenceNumberRange.getStartingSequenceNumber()).withEndingSequenceNumber(END_SEQUENCE_NUMBER_TO_CLOSE_OPEN_PARENT);
                com.amazonaws.services.dynamodbv2.model.Shard shard = new com.amazonaws.services.dynamodbv2.model.Shard().withShardId(innerShard.getShardId()).withParentShardId(innerShard.getParentShardId()).withSequenceNumberRange(modifiedSequenceNumberRange);
                ShardAdapter shardAdapter = new ShardAdapter(shard);
                ShardNode newParentNode = new ShardNode(shardAdapter, parentNode.getDescendants());
                this.nodes.put(newParentNode.getShardId(), newParentNode);
                return newParentNode;
            }
            return parentNode;
        }

        private void addToClosedLeafNodes(List<Shard> shards) {
            if (null == shards) {
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)String.format("Attempting to resolve inconsistencies in the graph with the following shards: \n %s", String.join((CharSequence)", ", shards.stream().map(Shard::getShardId).collect(Collectors.toList()))));
            }
            for (Shard shard : shards) {
                String parentShardId = shard.getParentShardId();
                if (null == parentShardId || !this.closedLeafNodeIds.contains(parentShardId)) continue;
                ShardNode shardNode = this.addNode(shard);
                this.closedLeafNodeIds.remove(parentShardId);
                if (!shardNode.isShardClosed()) continue;
                this.closedLeafNodeIds.add(shardNode.getShardId());
            }
            this.updateLastFetchedShardId(shards);
        }

        private void updateLastFetchedShardId(List<Shard> shards) {
            if (shards.size() > 0) {
                Shard lastShard = shards.get(shards.size() - 1);
                this.lastFetchedShardId = lastShard.getShardId();
            }
        }

        private ShardNode addNode(Shard shard) {
            String parentShardID;
            ShardNode shardNode = new ShardNode(shard);
            this.nodes.put(shardNode.getShardId(), shardNode);
            if (shardNode.isShardClosed()) {
                this.closedLeafNodeIds.add(shardNode.getShardId());
            }
            if (null != (parentShardID = shard.getParentShardId()) && this.nodes.containsKey(parentShardID)) {
                ShardNode parentNode = this.nodes.get(parentShardID);
                parentNode = this.setShardEndSequenceNumberForOpenParent(parentNode, shardNode);
                parentNode.addDescendant(shard.getShardId());
                this.closedLeafNodeIds.remove(parentShardID);
            }
            return shardNode;
        }

        private int size() {
            return this.nodes.size();
        }

        private int closedLeafNodeCount() {
            return this.closedLeafNodeIds.size();
        }

        Set<String> getAllClosedLeafNodeIds() {
            return this.closedLeafNodeIds;
        }

        List<Shard> getShards() {
            return this.nodes.values().stream().map(ShardNode::getShard).collect(Collectors.toList());
        }
    }

    private static class ShardNode {
        private final Shard shard;
        private final Set<String> descendants;

        ShardNode(Shard shard) {
            this.shard = shard;
            this.descendants = new HashSet<String>();
        }

        ShardNode(Shard shard, Set<String> descendants) {
            this.shard = shard;
            this.descendants = descendants;
        }

        public Shard getShard() {
            return this.shard;
        }

        Set<String> getDescendants() {
            return this.descendants;
        }

        public String getShardId() {
            return this.shard.getShardId();
        }

        boolean isShardClosed() {
            return this.shard.getSequenceNumberRange() != null && this.shard.getSequenceNumberRange().getEndingSequenceNumber() != null;
        }

        boolean addDescendant(String shardId) {
            return this.descendants.add(shardId);
        }
    }

    private static enum ShardGraphProcessingResult {
        STREAM_DISABLED,
        FETCHED_ALL_AVAILABLE_SHARDS,
        RESOLVED_INCONSISTENCIES_AND_ABORTED;

    }
}

