/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.dynamodbv2.streamsadapter.tasks;

import com.amazonaws.services.dynamodbv2.streamsadapter.util.KinesisMapperUtil;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

public class DynamoDBStreamsShutdownTask
implements ConsumerTask {
    private static final Logger log = LoggerFactory.getLogger(DynamoDBStreamsShutdownTask.class);
    private static final String SHUTDOWN_TASK_OPERATION = "DynamoDBStreamsShutdownTask";
    private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
    private static final LeaseLostInput LEASE_LOST_INPUT = LeaseLostInput.builder().build();
    private static final Random RANDOM = new Random();
    @VisibleForTesting
    static final int RETRY_RANDOM_MAX_RANGE = 30;
    @NonNull
    private final ShardInfo shardInfo;
    @NonNull
    private final ShardDetector dynamoDBStreamsShardDetector;
    @NonNull
    private final ShardRecordProcessor shardRecordProcessor;
    @NonNull
    private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer;
    @NonNull
    private final ShutdownReason reason;
    @NonNull
    private final InitialPositionInStreamExtended initialPositionInStream;
    private final boolean cleanupLeasesOfCompletedShards;
    private final boolean ignoreUnexpectedChildShards;
    @NonNull
    private final LeaseCoordinator leaseCoordinator;
    private final long backoffTimeMillis;
    @NonNull
    private final RecordsPublisher recordsPublisher;
    @NonNull
    private final HierarchicalShardSyncer dynamoDBStreamsShardSyncer;
    @NonNull
    private final MetricsFactory metricsFactory;
    private final TaskType taskType = TaskType.SHUTDOWN;
    private final List<ChildShard> childShards;
    @NonNull
    private final StreamIdentifier streamIdentifier;
    @NonNull
    private final LeaseCleanupManager leaseCleanupManager;
    private final String streamArn;

    public DynamoDBStreamsShutdownTask(@NonNull ShardInfo shardInfo, @NonNull ShardDetector dynamoDBStreamsShardDetector, @NonNull ShardRecordProcessor shardRecordProcessor, @NonNull ShardRecordProcessorCheckpointer recordProcessorCheckpointer, @NonNull ShutdownReason reason, @NonNull InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, @NonNull LeaseCoordinator leaseCoordinator, long backoffTimeMillis, @NonNull RecordsPublisher recordsPublisher, @NonNull HierarchicalShardSyncer dynamoDBStreamsShardSyncer, @NonNull MetricsFactory metricsFactory, List<ChildShard> childShards, @NonNull StreamIdentifier streamIdentifier, @NonNull LeaseCleanupManager leaseCleanupManager) {
        if (shardInfo == null) {
            throw new NullPointerException("shardInfo is marked non-null but is null");
        }
        if (dynamoDBStreamsShardDetector == null) {
            throw new NullPointerException("dynamoDBStreamsShardDetector is marked non-null but is null");
        }
        if (shardRecordProcessor == null) {
            throw new NullPointerException("shardRecordProcessor is marked non-null but is null");
        }
        if (recordProcessorCheckpointer == null) {
            throw new NullPointerException("recordProcessorCheckpointer is marked non-null but is null");
        }
        if (reason == null) {
            throw new NullPointerException("reason is marked non-null but is null");
        }
        if (initialPositionInStream == null) {
            throw new NullPointerException("initialPositionInStream is marked non-null but is null");
        }
        if (leaseCoordinator == null) {
            throw new NullPointerException("leaseCoordinator is marked non-null but is null");
        }
        if (recordsPublisher == null) {
            throw new NullPointerException("recordsPublisher is marked non-null but is null");
        }
        if (dynamoDBStreamsShardSyncer == null) {
            throw new NullPointerException("dynamoDBStreamsShardSyncer is marked non-null but is null");
        }
        if (metricsFactory == null) {
            throw new NullPointerException("metricsFactory is marked non-null but is null");
        }
        if (streamIdentifier == null) {
            throw new NullPointerException("streamIdentifier is marked non-null but is null");
        }
        if (leaseCleanupManager == null) {
            throw new NullPointerException("leaseCleanupManager is marked non-null but is null");
        }
        this.shardInfo = shardInfo;
        this.dynamoDBStreamsShardDetector = dynamoDBStreamsShardDetector;
        this.shardRecordProcessor = shardRecordProcessor;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.reason = reason;
        this.initialPositionInStream = initialPositionInStream;
        this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
        this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
        this.leaseCoordinator = leaseCoordinator;
        this.backoffTimeMillis = backoffTimeMillis;
        this.recordsPublisher = recordsPublisher;
        this.dynamoDBStreamsShardSyncer = dynamoDBStreamsShardSyncer;
        this.metricsFactory = metricsFactory;
        this.childShards = childShards;
        this.streamIdentifier = streamIdentifier;
        this.leaseCleanupManager = leaseCleanupManager;
        this.streamArn = KinesisMapperUtil.createDynamoDBStreamsArnFromKinesisStreamName(this.streamIdentifier.streamName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TaskResult call() {
        this.recordProcessorCheckpointer.checkpointer().operation(SHUTDOWN_TASK_OPERATION);
        MetricsScope scope = MetricsUtil.createMetricsWithOperation((MetricsFactory)this.metricsFactory, (String)SHUTDOWN_TASK_OPERATION);
        String leaseKey = ShardInfo.getLeaseKey((ShardInfo)this.shardInfo);
        try {
            log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}", new Object[]{leaseKey, this.childShards, this.shardInfo.concurrencyToken(), this.reason});
            long startTime = System.currentTimeMillis();
            Lease currentShardLease = this.leaseCoordinator.getCurrentlyHeldLease(leaseKey);
            Runnable leaseLostAction = () -> this.shardRecordProcessor.leaseLost(LEASE_LOST_INPUT);
            if (this.reason == ShutdownReason.SHARD_END) {
                try {
                    this.takeShardEndAction(currentShardLease, leaseKey, scope, startTime);
                }
                catch (InvalidStateException e) {
                    log.warn("Lease {}: Invalid state encountered while shutting down shardConsumer with SHARD_END reason. Dropping the lease and shutting down shardConsumer using LEASE_LOST reason.", (Object)leaseKey, (Object)e);
                    this.dropLease(currentShardLease, leaseKey);
                    this.throwOnApplicationException(leaseKey, leaseLostAction, scope, startTime);
                }
            } else {
                this.throwOnApplicationException(leaseKey, leaseLostAction, scope, startTime);
            }
            log.debug("Shutting down retrieval strategy for shard {}.", (Object)leaseKey);
            this.recordsPublisher.shutdown();
            log.debug("Record processor completed shutdown() for shard {}", (Object)leaseKey);
            TaskResult taskResult = new TaskResult(null);
            return taskResult;
        }
        catch (Exception e) {
            if (e instanceof CustomerApplicationException) {
                log.error("Shard {}: Application exception.", (Object)leaseKey, (Object)e);
            } else {
                log.error("Shard {}: Caught exception:", (Object)leaseKey, (Object)e);
            }
            try {
                Thread.sleep(this.backoffTimeMillis);
            }
            catch (InterruptedException ie) {
                log.debug("Shard {}: Interrupted sleep", (Object)leaseKey, (Object)ie);
            }
            TaskResult taskResult = new TaskResult(e);
            return taskResult;
        }
        finally {
            MetricsUtil.endScope((MetricsScope)scope);
        }
    }

    private void takeShardEndAction(Lease currentShardLease, String leaseKey, MetricsScope scope, long startTime) throws InvalidStateException, ProvisionedThroughputException, DependencyException, CustomerApplicationException {
        if (currentShardLease == null) {
            throw new InvalidStateException(leaseKey + " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
        }
        if (!CollectionUtils.isNullOrEmpty(this.childShards)) {
            this.createLeasesForChildShardsIfNotExist(scope);
            this.updateLeaseWithChildShards(currentShardLease);
        }
        this.attemptShardEndCheckpointing(leaseKey, scope, startTime);
    }

    private boolean attemptShardEndCheckpointing(String leaseKey, MetricsScope scope, long startTime) throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException {
        Lease leaseFromDdb = Optional.ofNullable(this.leaseCoordinator.leaseRefresher().getLease(leaseKey)).orElseThrow(() -> new InvalidStateException("Lease for shard " + leaseKey + " does not exist."));
        if (!leaseFromDdb.checkpoint().equals((Object)ExtendedSequenceNumber.SHARD_END)) {
            this.throwOnApplicationException(leaseKey, () -> this.applicationCheckpointAndVerification(leaseKey), scope, startTime);
        }
        return true;
    }

    private void applicationCheckpointAndVerification(String leaseKey) {
        this.recordProcessorCheckpointer.sequenceNumberAtShardEnd(this.recordProcessorCheckpointer.largestPermittedCheckpointValue());
        this.recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
        this.shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer((RecordProcessorCheckpointer)this.recordProcessorCheckpointer).build());
        ExtendedSequenceNumber lastCheckpointValue = this.recordProcessorCheckpointer.lastCheckpointValue();
        if (!ExtendedSequenceNumber.SHARD_END.equals((Object)lastCheckpointValue)) {
            throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + leaseKey + ". Application must checkpoint upon shard end. See ShardRecordProcessor.shardEnded javadocs for more information.");
        }
    }

    private void throwOnApplicationException(String leaseKey, Runnable action, MetricsScope metricsScope, long startTime) throws CustomerApplicationException {
        try {
            action.run();
        }
        catch (Exception e) {
            throw new CustomerApplicationException("Customer application throws exception for shard " + leaseKey + ": ", (Throwable)e);
        }
        finally {
            MetricsUtil.addLatency((MetricsScope)metricsScope, (String)RECORD_PROCESSOR_SHUTDOWN_METRIC, (long)startTime, (MetricsLevel)MetricsLevel.SUMMARY);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createLeasesForChildShardsIfNotExist(MetricsScope scope) throws ProvisionedThroughputException, InvalidStateException, DependencyException {
        String streamArn = KinesisMapperUtil.createDynamoDBStreamsArnFromKinesisStreamName(this.streamIdentifier.streamName());
        LeaseRefresher leaseRefresher = this.leaseCoordinator.leaseRefresher();
        for (ChildShard childShard : this.childShards) {
            String leaseKey = ShardInfo.getLeaseKey((ShardInfo)this.shardInfo, (String)childShard.shardId());
            if (leaseRefresher.getLease(leaseKey) != null) continue;
            log.debug("{} - Shard {} - Attempting to create lease for child shard {}", new Object[]{this.dynamoDBStreamsShardDetector.streamIdentifier(), this.shardInfo.shardId(), leaseKey});
            Lease leaseToCreate = this.dynamoDBStreamsShardSyncer.createLeaseForChildShard(childShard, this.streamIdentifier);
            long startTime = System.currentTimeMillis();
            boolean success = false;
            try {
                leaseRefresher.createLeaseIfNotExists(leaseToCreate);
                success = true;
            }
            catch (Throwable throwable) {
                MetricsUtil.addSuccessAndLatency((MetricsScope)scope, (String)"CreateLease", (boolean)success, (long)startTime, (MetricsLevel)MetricsLevel.DETAILED);
                if (leaseToCreate.checkpoint() != null) {
                    String metricName = leaseToCreate.checkpoint().isSentinelCheckpoint() ? leaseToCreate.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER";
                    MetricsUtil.addSuccess((MetricsScope)scope, (String)("CreateLease_" + metricName), (boolean)true, (MetricsLevel)MetricsLevel.DETAILED);
                }
                throw throwable;
            }
            MetricsUtil.addSuccessAndLatency((MetricsScope)scope, (String)"CreateLease", (boolean)success, (long)startTime, (MetricsLevel)MetricsLevel.DETAILED);
            if (leaseToCreate.checkpoint() != null) {
                String metricName = leaseToCreate.checkpoint().isSentinelCheckpoint() ? leaseToCreate.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER";
                MetricsUtil.addSuccess((MetricsScope)scope, (String)("CreateLease_" + metricName), (boolean)true, (MetricsLevel)MetricsLevel.DETAILED);
            }
            log.info("{} - Shard {}: Created child shard lease: {}", new Object[]{streamArn, this.shardInfo.shardId(), leaseToCreate});
        }
    }

    private void updateLeaseWithChildShards(Lease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        Set childShardIds = this.childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet());
        Lease updatedLease = currentLease.copy();
        updatedLease.childShardIds(childShardIds);
        this.leaseCoordinator.leaseRefresher().updateLeaseWithMetaInfo(updatedLease, UpdateField.CHILD_SHARDS);
        log.info("Shard {} of Stream {}: Updated current lease {} with child shard information: {}", new Object[]{this.shardInfo.shardId(), this.streamArn, currentLease.leaseKey(), childShardIds});
    }

    public TaskType taskType() {
        return this.taskType;
    }

    @VisibleForTesting
    public ShutdownReason getReason() {
        return this.reason;
    }

    private void dropLease(Lease currentLease, String leaseKey) {
        if (currentLease == null) {
            log.warn("Shard {}: Unable to find the lease for shard. Will shutdown the shardConsumer directly.", (Object)leaseKey);
        } else {
            this.leaseCoordinator.dropLease(currentLease);
            log.info("Dropped lease for shutting down ShardConsumer: {} of stream: {}", (Object)currentLease.leaseKey(), (Object)this.streamArn);
        }
    }
}

