/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kinesis;

import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import io.micrometer.observation.ObservationRegistry;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy;
import org.springframework.cloud.stream.binder.kinesis.LegacyEmbeddedHeadersSupportBytesMessageMapper;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisConsumerProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisProducerProperties;
import org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisConsumerDestination;
import org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.expression.EvaluationContext;
import org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter;
import org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter;
import org.springframework.integration.aws.inbound.kinesis.KinesisMessageHeaderErrorMessageStrategy;
import org.springframework.integration.aws.inbound.kinesis.KinesisShardOffset;
import org.springframework.integration.aws.outbound.AbstractAwsMessageHandler;
import org.springframework.integration.aws.outbound.KinesisMessageHandler;
import org.springframework.integration.aws.outbound.KplMessageHandler;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.mapping.OutboundMessageMapper;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.support.management.IntegrationManagement;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.InterceptableChannel;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;

public class KinesisMessageChannelBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<KinesisConsumerProperties>, ExtendedProducerProperties<KinesisProducerProperties>, KinesisStreamProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KinesisConsumerProperties, KinesisProducerProperties> {
    private static final ErrorMessageStrategy ERROR_MESSAGE_STRATEGY = new KinesisMessageHeaderErrorMessageStrategy();
    private final List<String> streamsInUse = new ArrayList<String>();
    private final KinesisBinderConfigurationProperties configurationProperties;
    private final KinesisAsyncClient amazonKinesis;
    private final AwsCredentialsProvider awsCredentialsProvider;
    private final CloudWatchAsyncClient cloudWatchClient;
    private final DynamoDbAsyncClient dynamoDBClient;
    private final LegacyEmbeddedHeadersSupportBytesMessageMapper embeddedHeadersMapper;
    private KinesisExtendedBindingProperties extendedBindingProperties = new KinesisExtendedBindingProperties();
    @Nullable
    private ConcurrentMetadataStore checkpointStore;
    @Nullable
    private LockRegistry lockRegistry;
    @Nullable
    private KinesisProducerConfiguration kinesisProducerConfiguration;
    private EvaluationContext evaluationContext;
    private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;

    public KinesisMessageChannelBinder(KinesisBinderConfigurationProperties configurationProperties, KinesisStreamProvisioner provisioningProvider, KinesisAsyncClient amazonKinesis, AwsCredentialsProvider awsCredentialsProvider, @Nullable DynamoDbAsyncClient dynamoDBClient, @Nullable CloudWatchAsyncClient cloudWatchClient) {
        super(new String[0], (ProvisioningProvider)provisioningProvider);
        Assert.notNull((Object)amazonKinesis, (String)"'amazonKinesis' must not be null");
        Assert.notNull((Object)awsCredentialsProvider, (String)"'awsCredentialsProvider' must not be null");
        this.configurationProperties = configurationProperties;
        this.amazonKinesis = amazonKinesis;
        this.cloudWatchClient = cloudWatchClient;
        this.dynamoDBClient = dynamoDBClient;
        this.awsCredentialsProvider = awsCredentialsProvider;
        this.embeddedHeadersMapper = new LegacyEmbeddedHeadersSupportBytesMessageMapper(configurationProperties.isLegacyEmbeddedHeadersFormat(), KinesisMessageChannelBinder.headersToMap(configurationProperties));
    }

    public void setExtendedBindingProperties(KinesisExtendedBindingProperties extendedBindingProperties) {
        this.extendedBindingProperties = extendedBindingProperties;
    }

    public void setCheckpointStore(ConcurrentMetadataStore checkpointStore) {
        this.checkpointStore = checkpointStore;
    }

    public void setLockRegistry(LockRegistry lockRegistry) {
        this.lockRegistry = lockRegistry;
    }

    public void setKinesisProducerConfiguration(KinesisProducerConfiguration kinesisProducerConfiguration) {
        this.kinesisProducerConfiguration = kinesisProducerConfiguration;
    }

    public void setObservationRegistry(@Nullable ObservationRegistry observationRegistry) {
        this.observationRegistry = observationRegistry;
    }

    public KinesisConsumerProperties getExtendedConsumerProperties(String channelName) {
        return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
    }

    public KinesisProducerProperties getExtendedProducerProperties(String channelName) {
        return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
    }

    public String getDefaultsPrefix() {
        return this.extendedBindingProperties.getDefaultsPrefix();
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
    }

    public KinesisAsyncClient getAmazonKinesis() {
        return this.amazonKinesis;
    }

    public List<String> getStreamsInUse() {
        return this.streamsInUse;
    }

    protected void onInit() throws Exception {
        super.onInit();
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext((BeanFactory)this.getBeanFactory());
    }

    public String getBinderIdentity() {
        return "kinesis-" + super.getBinderIdentity();
    }

    protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<KinesisProducerProperties> producerProperties, MessageChannel channel, final @Nullable MessageChannel errorChannel) {
        FunctionExpression partitionKeyExpression = new FunctionExpression(m -> m.getHeaders().containsKey((Object)"scst_partition") ? m.getHeaders().get((Object)"scst_partition") : Integer.valueOf(m.getPayload().hashCode()));
        AbstractAwsMessageHandler<?> messageHandler = this.configurationProperties.isKplKclEnabled() ? this.createKplMessageHandler(destination, partitionKeyExpression, ((KinesisProducerProperties)producerProperties.getExtension()).isEmbedHeaders()) : this.createKinesisMessageHandler(destination, partitionKeyExpression, ((KinesisProducerProperties)producerProperties.getExtension()).isEmbedHeaders());
        messageHandler.setAsync(!((KinesisProducerProperties)producerProperties.getExtension()).isSync());
        messageHandler.setSendTimeout(((KinesisProducerProperties)producerProperties.getExtension()).getSendTimeout());
        messageHandler.setBeanFactory((BeanFactory)this.getBeanFactory());
        messageHandler.setOutputChannel((MessageChannel)new NullChannel());
        if (errorChannel != null) {
            ((InterceptableChannel)channel).addInterceptor(new ChannelInterceptor(){

                public Message<?> preSend(Message<?> message, MessageChannel channel) {
                    return MessageBuilder.fromMessage(message).setErrorChannel(errorChannel).build();
                }
            });
        }
        this.streamsInUse.add(destination.getName());
        return messageHandler;
    }

    protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<KinesisProducerProperties> producerProperties, MessageChannel errorChannel) {
        return null;
    }

    private AbstractAwsMessageHandler<?> createKinesisMessageHandler(ProducerDestination destination, FunctionExpression<Message<?>> partitionKeyExpression, boolean embedHeaders) {
        KinesisMessageHandler messageHandler = new KinesisMessageHandler(this.amazonKinesis);
        messageHandler.setStream(destination.getName());
        messageHandler.setPartitionKeyExpression(partitionKeyExpression);
        if (embedHeaders) {
            messageHandler.setEmbeddedHeadersMapper((OutboundMessageMapper)this.embeddedHeadersMapper);
        }
        return messageHandler;
    }

    private AbstractAwsMessageHandler<?> createKplMessageHandler(ProducerDestination destination, FunctionExpression<Message<?>> partitionKeyExpression, boolean embedHeaders) {
        KplMessageHandler messageHandler = new KplMessageHandler(new KinesisProducer(this.kinesisProducerConfiguration));
        messageHandler.setStream(destination.getName());
        messageHandler.setPartitionKeyExpression(partitionKeyExpression);
        if (embedHeaders) {
            messageHandler.setEmbeddedHeadersMapper((OutboundMessageMapper)this.embeddedHeadersMapper);
        }
        return messageHandler;
    }

    protected void postProcessOutputChannel(MessageChannel outputChannel, final ExtendedProducerProperties<KinesisProducerProperties> producerProperties) {
        ((IntegrationManagement)outputChannel).registerObservationRegistry(this.observationRegistry);
        if (outputChannel instanceof InterceptableChannel && producerProperties.isPartitioned()) {
            ((InterceptableChannel)outputChannel).addInterceptor(0, new ChannelInterceptor(){
                private final PartitionKeyExtractorStrategy partitionKeyExtractorStrategy;
                {
                    this.partitionKeyExtractorStrategy = StringUtils.hasText((String)producerProperties.getPartitionKeyExtractorName()) ? (PartitionKeyExtractorStrategy)KinesisMessageChannelBinder.this.getBeanFactory().getBean(producerProperties.getPartitionKeyExtractorName(), PartitionKeyExtractorStrategy.class) : message -> producerProperties.getPartitionKeyExpression().getValue(KinesisMessageChannelBinder.this.evaluationContext, (Object)message);
                }

                public Message<?> preSend(Message<?> message, MessageChannel channel) {
                    Object partitionKey = this.partitionKeyExtractorStrategy.extractKey(message);
                    return MessageBuilder.fromMessage(message).setHeader("scst_partition", partitionKey).build();
                }
            });
        }
    }

    protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<KinesisConsumerProperties> properties) {
        this.streamsInUse.add(destination.getName());
        MessageProducerSupport adapter = this.configurationProperties.isKplKclEnabled() ? this.createKclConsumerEndpoint(destination, group, properties) : this.createKinesisConsumerEndpoint(destination, group, properties);
        adapter.registerObservationRegistry(this.observationRegistry);
        adapter.setComponentName(String.format("Consumer for [%s]", destination.getName()));
        return adapter;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private MessageProducerSupport createKclConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<KinesisConsumerProperties> properties) {
        KinesisConsumerProperties kinesisConsumerProperties = (KinesisConsumerProperties)properties.getExtension();
        if (kinesisConsumerProperties.getShardId() != null) {
            this.logger.warn((Object)"Kinesis Client Library doesn't does not support explicit shard configuration. Ignoring 'shardId' property");
        }
        KclMessageDrivenChannelAdapter adapter = new KclMessageDrivenChannelAdapter(this.amazonKinesis, this.cloudWatchClient, this.dynamoDBClient, new String[]{destination.getName()});
        boolean anonymous = !StringUtils.hasText((String)group);
        Object consumerGroup = anonymous ? "anonymous." + UUID.randomUUID() : group;
        String workerId = kinesisConsumerProperties.getWorkerId() != null ? kinesisConsumerProperties.getWorkerId() : UUID.randomUUID().toString();
        String shardIteratorType = kinesisConsumerProperties.getShardIteratorType();
        InitialPositionInStreamExtended kinesisShardOffset = InitialPositionInStreamExtended.newInitialPosition((InitialPositionInStream)InitialPositionInStream.LATEST);
        if (StringUtils.hasText((String)shardIteratorType)) {
            String[] typeValue = shardIteratorType.split(":", 2);
            ShardIteratorType iteratorType = ShardIteratorType.valueOf((String)typeValue[0]);
            if (typeValue.length > 1) {
                if (!ShardIteratorType.AT_TIMESTAMP.equals((Object)iteratorType)) throw new IllegalArgumentException("The KCL does not support 'AT_SEQUENCE_NUMBER' or 'AFTER_SEQUENCE_NUMBER' initial position in stream.");
                kinesisShardOffset = InitialPositionInStreamExtended.newInitialPositionAtTimestamp((Date)new Date(Long.parseLong(typeValue[1])));
            } else {
                kinesisShardOffset = InitialPositionInStreamExtended.newInitialPosition((InitialPositionInStream)InitialPositionInStream.valueOf((String)iteratorType.name()));
            }
        }
        kinesisShardOffset = anonymous || StringUtils.hasText((String)shardIteratorType) ? kinesisShardOffset : InitialPositionInStreamExtended.newInitialPosition((InitialPositionInStream)InitialPositionInStream.TRIM_HORIZON);
        adapter.setConsumerGroup((String)consumerGroup);
        adapter.setWorkerId(workerId);
        adapter.setStreamInitialSequence(kinesisShardOffset);
        adapter.setCheckpointMode(kinesisConsumerProperties.getCheckpointMode());
        adapter.setCheckpointsInterval(kinesisConsumerProperties.getCheckpointInterval().longValue());
        adapter.setConsumerBackoff(kinesisConsumerProperties.getConsumerBackoff());
        adapter.setListenerMode(kinesisConsumerProperties.getListenerMode());
        if (((KinesisConsumerProperties)properties.getExtension()).isEmbedHeaders()) {
            adapter.setEmbeddedHeadersMapper((InboundMessageMapper)this.embeddedHeadersMapper);
        }
        if (properties.isUseNativeDecoding()) {
            adapter.setConverter(null);
        } else {
            adapter.setConverter(bytes -> bytes);
        }
        AbstractMessageChannelBinder.ErrorInfrastructure errorInfrastructure = this.registerErrorInfrastructure(destination, (String)consumerGroup, (ConsumerProperties)properties);
        adapter.setErrorMessageStrategy(ERROR_MESSAGE_STRATEGY);
        adapter.setErrorChannel((MessageChannel)errorInfrastructure.getErrorChannel());
        adapter.setBindSourceRecord(true);
        return adapter;
    }

    private MessageProducerSupport createKinesisConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<KinesisConsumerProperties> properties) {
        KinesisMessageDrivenChannelAdapter adapter;
        KinesisConsumerProperties kinesisConsumerProperties = (KinesisConsumerProperties)properties.getExtension();
        if (properties.getInstanceCount() > 1 && ((KinesisConsumerProperties)properties.getExtension()).getShardId() != null) {
            throw (InvalidArgumentException)InvalidArgumentException.builder().message("'instanceCount' more than 1 and 'shardId' cannot be provided together.").build();
        }
        HashSet<KinesisShardOffset> shardOffsets = null;
        String shardIteratorType = kinesisConsumerProperties.getShardIteratorType();
        KinesisShardOffset kinesisShardOffset = KinesisShardOffset.latest();
        if (StringUtils.hasText((String)shardIteratorType)) {
            String[] typeValue = shardIteratorType.split(":", 2);
            ShardIteratorType iteratorType = ShardIteratorType.valueOf((String)typeValue[0]);
            kinesisShardOffset = new KinesisShardOffset(iteratorType);
            if (typeValue.length > 1) {
                if (ShardIteratorType.AT_TIMESTAMP.equals((Object)iteratorType)) {
                    kinesisShardOffset.setTimestamp(Instant.ofEpochMilli(Long.parseLong(typeValue[1])));
                } else {
                    kinesisShardOffset.setSequenceNumber(typeValue[1]);
                }
            }
        }
        if (properties.getInstanceCount() > 1) {
            shardOffsets = new HashSet<KinesisShardOffset>();
            KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination)destination;
            List<Shard> shards = kinesisConsumerDestination.getShards();
            for (int i = 0; i < shards.size(); ++i) {
                if (i % properties.getInstanceCount() != properties.getInstanceIndex()) continue;
                KinesisShardOffset shardOffset = new KinesisShardOffset(kinesisShardOffset);
                shardOffset.setStream(destination.getName());
                shardOffset.setShard(shards.get(i).shardId());
                shardOffsets.add(shardOffset);
            }
        }
        String shardId = kinesisConsumerProperties.getShardId();
        if (CollectionUtils.isEmpty(shardOffsets) && shardId == null) {
            adapter = new KinesisMessageDrivenChannelAdapter(this.amazonKinesis, new String[]{destination.getName()});
        } else if (shardId != null) {
            KinesisShardOffset shardOffset = new KinesisShardOffset(kinesisShardOffset);
            shardOffset.setStream(destination.getName());
            shardOffset.setShard(shardId);
            adapter = new KinesisMessageDrivenChannelAdapter(this.amazonKinesis, new KinesisShardOffset[]{shardOffset});
        } else {
            adapter = new KinesisMessageDrivenChannelAdapter(this.amazonKinesis, shardOffsets.toArray(new KinesisShardOffset[0]));
        }
        boolean anonymous = !StringUtils.hasText((String)group);
        Object consumerGroup = anonymous ? "anonymous." + UUID.randomUUID() : group;
        adapter.setConsumerGroup((String)consumerGroup);
        adapter.setStreamInitialSequence(anonymous || StringUtils.hasText((String)shardIteratorType) ? kinesisShardOffset : KinesisShardOffset.trimHorizon());
        adapter.setListenerMode(kinesisConsumerProperties.getListenerMode());
        if (((KinesisConsumerProperties)properties.getExtension()).isEmbedHeaders()) {
            adapter.setEmbeddedHeadersMapper((InboundMessageMapper)this.embeddedHeadersMapper);
        }
        if (properties.isUseNativeDecoding()) {
            adapter.setConverter(null);
        } else {
            adapter.setConverter(bytes -> bytes);
        }
        adapter.setCheckpointMode(kinesisConsumerProperties.getCheckpointMode());
        adapter.setRecordsLimit(kinesisConsumerProperties.getRecordsLimit());
        adapter.setIdleBetweenPolls(kinesisConsumerProperties.getIdleBetweenPolls());
        adapter.setConsumerBackoff(kinesisConsumerProperties.getConsumerBackoff());
        adapter.setCheckpointsInterval(kinesisConsumerProperties.getCheckpointInterval().longValue());
        if (this.checkpointStore != null) {
            adapter.setCheckpointStore(this.checkpointStore);
        }
        adapter.setLockRegistry(this.lockRegistry);
        adapter.setConcurrency(properties.getConcurrency());
        adapter.setStartTimeout(kinesisConsumerProperties.getStartTimeout());
        adapter.setDescribeStreamBackoff(this.configurationProperties.getDescribeStreamBackoff());
        adapter.setDescribeStreamRetries(this.configurationProperties.getDescribeStreamRetries());
        AbstractMessageChannelBinder.ErrorInfrastructure errorInfrastructure = this.registerErrorInfrastructure(destination, (String)consumerGroup, (ConsumerProperties)properties);
        adapter.setErrorMessageStrategy(ERROR_MESSAGE_STRATEGY);
        adapter.setErrorChannel((MessageChannel)errorInfrastructure.getErrorChannel());
        adapter.setBindSourceRecord(true);
        return adapter;
    }

    protected ErrorMessageStrategy getErrorMessageStrategy() {
        return ERROR_MESSAGE_STRATEGY;
    }

    private static String[] headersToMap(KinesisBinderConfigurationProperties configurationProperties) {
        Assert.notNull((Object)configurationProperties, (String)"'configurationProperties' must not be null");
        ArrayList<String> headers = new ArrayList<String>();
        Collections.addAll(headers, BinderHeaders.STANDARD_HEADERS);
        if (configurationProperties.isEnableObservation()) {
            headers.add("traceparent");
            headers.add("X-B3*");
        }
        if (!ObjectUtils.isEmpty((Object[])configurationProperties.getHeaders())) {
            Collections.addAll(headers, configurationProperties.getHeaders());
        }
        return headers.toArray(new String[0]);
    }
}

