/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kinesis.provisioning;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ScalingType;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.amazonaws.services.kinesis.model.StreamStatus;
import com.amazonaws.services.kinesis.model.UpdateShardCountRequest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisConsumerProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisProducerProperties;
import org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisConsumerDestination;
import org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisProducerDestination;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.util.Assert;

public class KinesisStreamProvisioner
implements ProvisioningProvider<ExtendedConsumerProperties<KinesisConsumerProperties>, ExtendedProducerProperties<KinesisProducerProperties>> {
    private static final Log logger = LogFactory.getLog(KinesisStreamProvisioner.class);
    private final AmazonKinesis amazonKinesis;
    private final KinesisBinderConfigurationProperties configurationProperties;

    public KinesisStreamProvisioner(AmazonKinesis amazonKinesis, KinesisBinderConfigurationProperties kinesisBinderConfigurationProperties) {
        Assert.notNull((Object)amazonKinesis, (String)"'amazonKinesis' must not be null");
        Assert.notNull((Object)kinesisBinderConfigurationProperties, (String)"'kinesisBinderConfigurationProperties' must not be null");
        this.amazonKinesis = amazonKinesis;
        this.configurationProperties = kinesisBinderConfigurationProperties;
    }

    public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<KinesisProducerProperties> properties) throws ProvisioningException {
        if (logger.isInfoEnabled()) {
            logger.info((Object)("Using Kinesis stream for outbound: " + name));
        }
        if (properties.getHeaderMode() == null) {
            properties.setHeaderMode(HeaderMode.embeddedHeaders);
        }
        return new KinesisProducerDestination(name, this.createOrUpdate(name, properties.getPartitionCount()));
    }

    public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<KinesisConsumerProperties> properties) throws ProvisioningException {
        if (((KinesisConsumerProperties)properties.getExtension()).isDynamoDbStreams()) {
            if (logger.isInfoEnabled()) {
                logger.info((Object)("Using DynamoDB table in DynamoDB Streams support for inbound: " + name));
            }
            return new KinesisConsumerDestination(name, Collections.emptyList());
        }
        if (logger.isInfoEnabled()) {
            logger.info((Object)("Using Kinesis stream for inbound: " + name));
        }
        if (properties.getHeaderMode() == null) {
            properties.setHeaderMode(HeaderMode.embeddedHeaders);
        }
        int shardCount = properties.getInstanceCount() * properties.getConcurrency();
        return new KinesisConsumerDestination(name, this.createOrUpdate(name, shardCount));
    }

    private List<Shard> createOrUpdate(String stream, int shards) {
        ArrayList<Shard> shardList = new ArrayList<Shard>();
        int describeStreamRetries = 0;
        String exclusiveStartShardId = null;
        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest().withStreamName(stream);
        while (true) {
            DescribeStreamResult describeStreamResult = null;
            try {
                describeStreamRequest.withExclusiveStartShardId(exclusiveStartShardId);
                describeStreamResult = this.amazonKinesis.describeStream(describeStreamRequest);
                StreamDescription streamDescription = describeStreamResult.getStreamDescription();
                if (StreamStatus.ACTIVE.toString().equals(streamDescription.getStreamStatus())) {
                    shardList.addAll(streamDescription.getShards());
                    if (!streamDescription.getHasMoreShards().booleanValue()) break;
                    exclusiveStartShardId = ((Shard)shardList.get(shardList.size() - 1)).getShardId();
                }
            }
            catch (ResourceNotFoundException ex) {
                if (!this.configurationProperties.isAutoCreateStream()) {
                    throw new ProvisioningException("The stream [" + stream + "] was not found and auto creation is disabled.", (Throwable)ex);
                }
                if (logger.isInfoEnabled()) {
                    logger.info((Object)("Stream '" + stream + "' not found. Create one..."));
                }
                this.amazonKinesis.createStream(stream, Integer.valueOf(Math.max(this.configurationProperties.getMinShardCount(), shards)));
                continue;
            }
            catch (LimitExceededException ex) {
                logger.info((Object)("Got LimitExceededException when describing stream [" + stream + "]. Backing off for [" + this.configurationProperties.getDescribeStreamBackoff() + "] millis."));
            }
            if (describeStreamResult != null && StreamStatus.ACTIVE.toString().equals(describeStreamResult.getStreamDescription().getStreamStatus())) continue;
            if (describeStreamRetries++ > this.configurationProperties.getDescribeStreamRetries()) {
                ResourceNotFoundException resourceNotFoundException = new ResourceNotFoundException("The stream [" + stream + "] isn't ACTIVE or doesn't exist.");
                resourceNotFoundException.setServiceName("Kinesis");
                throw new ProvisioningException("Kinesis org.springframework.cloud.stream.binder.kinesis.provisioning error", (Throwable)resourceNotFoundException);
            }
            try {
                Thread.sleep(this.configurationProperties.getDescribeStreamBackoff());
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                throw new ProvisioningException("The [describeStream] thread for the stream [" + stream + "] has been interrupted.", (Throwable)ex);
            }
        }
        int effectiveShardCount = Math.max(this.configurationProperties.getMinShardCount(), shards);
        if (shardList.size() < effectiveShardCount && this.configurationProperties.isAutoAddShards()) {
            return this.updateShardCount(stream, shardList.size(), effectiveShardCount);
        }
        return shardList;
    }

    private List<Shard> updateShardCount(String streamName, int shardCount, int targetCount) {
        if (logger.isInfoEnabled()) {
            logger.info((Object)("Stream [" + streamName + "] has [" + shardCount + "] shards compared to a target configuration of [" + targetCount + "], creating shards..."));
        }
        UpdateShardCountRequest updateShardCountRequest = new UpdateShardCountRequest().withStreamName(streamName).withTargetShardCount(Integer.valueOf(targetCount)).withScalingType(ScalingType.UNIFORM_SCALING);
        this.amazonKinesis.updateShardCount(updateShardCountRequest);
        ArrayList<Shard> shardList = new ArrayList<Shard>();
        int describeStreamRetries = 0;
        String exclusiveStartShardId = null;
        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest().withStreamName(streamName);
        while (true) {
            DescribeStreamResult describeStreamResult = null;
            try {
                describeStreamRequest.withExclusiveStartShardId(exclusiveStartShardId);
                describeStreamResult = this.amazonKinesis.describeStream(describeStreamRequest);
                StreamDescription streamDescription = describeStreamResult.getStreamDescription();
                if (StreamStatus.ACTIVE.toString().equals(streamDescription.getStreamStatus())) {
                    shardList.addAll(streamDescription.getShards());
                    if (!streamDescription.getHasMoreShards().booleanValue()) break;
                    exclusiveStartShardId = ((Shard)shardList.get(shardList.size() - 1)).getShardId();
                }
            }
            catch (LimitExceededException ex) {
                logger.info((Object)("Got LimitExceededException when describing stream [" + streamName + "]. Backing off for [" + this.configurationProperties.getDescribeStreamBackoff() + "] millis."));
            }
            if (describeStreamResult != null && StreamStatus.ACTIVE.toString().equals(describeStreamResult.getStreamDescription().getStreamStatus())) continue;
            if (describeStreamRetries++ > this.configurationProperties.getDescribeStreamRetries()) {
                ResourceNotFoundException resourceNotFoundException = new ResourceNotFoundException("The stream [" + streamName + "] isn't ACTIVE or doesn't exist.");
                resourceNotFoundException.setServiceName("Kinesis");
                throw new ProvisioningException("Kinesis org.springframework.cloud.stream.binder.kinesis.provisioning error", (Throwable)resourceNotFoundException);
            }
            try {
                Thread.sleep(this.configurationProperties.getDescribeStreamBackoff());
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                throw new ProvisioningException("The [describeStream] thread for the stream [" + streamName + "] has been interrupted.", (Throwable)ex);
            }
        }
        return shardList;
    }
}

