/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.dynamodbv2.streamsadapter;

import com.amazonaws.services.dynamodbv2.streamsadapter.DynamoDBStreamsConsumerStates;
import com.amazonaws.services.dynamodbv2.streamsadapter.DynamoDBStreamsDataFetcher;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.AsynchronousGetRecordsRetrievalStrategy;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsCache;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsRetrievalStrategy;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.IDataFetcher;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.IShardConsumer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SequenceNumberValidator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncStrategy;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SynchronousGetRecordsRetrievalStrategy;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskResult;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DynamoDBStreamsShardConsumer
implements IShardConsumer {
    private static final Log LOG = LogFactory.getLog(DynamoDBStreamsShardConsumer.class);
    private final StreamConfig streamConfig;
    private final IRecordProcessor recordProcessor;
    private final KinesisClientLibConfiguration config;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final ExecutorService executorService;
    private final ShardInfo shardInfo;
    private final DynamoDBStreamsDataFetcher dataFetcher;
    private final IMetricsFactory metricsFactory;
    private final KinesisClientLibLeaseCoordinator leaseCoordinator;
    private ICheckpoint checkpoint;
    private LeaseCleanupManager leaseCleanupManager;
    private final long parentShardPollIntervalMillis;
    private final boolean cleanupLeasesOfCompletedShards;
    private final long taskBackoffTimeMillis;
    private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
    private final ShardSyncer shardSyncer;
    private ITask currentTask;
    private long currentTaskSubmitTime;
    private Future<TaskResult> future;
    private ShardSyncStrategy shardSyncStrategy;
    private final GetRecordsCache getRecordsCache;
    private DynamoDBStreamsConsumerStates.ConsumerState currentState = DynamoDBStreamsConsumerStates.INITIAL_STATE;
    private volatile ShutdownReason shutdownReason;
    private volatile ShutdownNotification shutdownNotification;

    public ShardSyncer getShardSyncer() {
        return this.shardSyncer;
    }

    public GetRecordsCache getGetRecordsCache() {
        return this.getRecordsCache;
    }

    private static final GetRecordsRetrievalStrategy makeStrategy(IDataFetcher dataFetcher, Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, ShardInfo shardInfo) {
        Optional getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry -> maxGetRecordsThreadPool.map(max -> new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry.intValue(), max.intValue(), shardInfo.getShardId())));
        return (GetRecordsRetrievalStrategy)getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
    }

    @Deprecated
    DynamoDBStreamsShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, KinesisClientLibLeaseCoordinator leaseCoordinator, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, ExecutorService executorService, IMetricsFactory metricsFactory, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
        this(shardInfo, streamConfig, checkpoint, recordProcessor, leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty(), config, shardSyncer, shardSyncStrategy);
    }

    @Deprecated
    DynamoDBStreamsShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, KinesisClientLibLeaseCoordinator leaseCoordinator, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, ExecutorService executorService, IMetricsFactory metricsFactory, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
        this(shardInfo, streamConfig, checkpoint, recordProcessor, new RecordProcessorCheckpointer(shardInfo, checkpoint, new SequenceNumberValidator(streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), metricsFactory), leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, new DynamoDBStreamsDataFetcher(streamConfig.getStreamProxy(), shardInfo), retryGetRecordsInSeconds, maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy);
    }

    @Deprecated
    DynamoDBStreamsShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisClientLibLeaseCoordinator leaseCoordinator, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, ExecutorService executorService, IMetricsFactory metricsFactory, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, DynamoDBStreamsDataFetcher dynamoDBStreamsDataFetcher, Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
        this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, dynamoDBStreamsDataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.newInstance((IKinesisProxy)streamConfig.getStreamProxy(), (ILeaseManager)leaseCoordinator.getLeaseManager(), (ScheduledExecutorService)Executors.newSingleThreadScheduledExecutor(), (IMetricsFactory)metricsFactory, (boolean)config.shouldCleanupLeasesUponShardCompletion(), (long)config.leaseCleanupIntervalMillis(), (long)config.completedLeaseCleanupThresholdMillis(), (long)config.garbageLeaseCleanupThresholdMillis(), (int)config.getMaxRecords()));
    }

    DynamoDBStreamsShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisClientLibLeaseCoordinator leaseCoordinator, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, ExecutorService executorService, IMetricsFactory metricsFactory, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, DynamoDBStreamsDataFetcher dynamoDBStreamsDataFetcher, Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, LeaseCleanupManager leaseCleanupManager) {
        this.shardInfo = shardInfo;
        this.streamConfig = streamConfig;
        this.checkpoint = checkpoint;
        this.recordProcessor = recordProcessor;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.leaseCoordinator = leaseCoordinator;
        this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
        this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
        this.executorService = executorService;
        this.metricsFactory = metricsFactory;
        this.taskBackoffTimeMillis = backoffTimeMillis;
        this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
        this.config = config;
        this.dataFetcher = dynamoDBStreamsDataFetcher;
        this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher(DynamoDBStreamsShardConsumer.makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords());
        this.shardSyncer = shardSyncer;
        this.shardSyncStrategy = shardSyncStrategy;
        this.leaseCleanupManager = leaseCleanupManager;
    }

    public synchronized boolean consumeShard() {
        return this.checkAndSubmitNextTask();
    }

    private boolean readyForNextTask() {
        return this.future == null || this.future.isCancelled() || this.future.isDone();
    }

    private synchronized boolean checkAndSubmitNextTask() {
        boolean submittedNewTask = false;
        if (this.readyForNextTask()) {
            IShardConsumer.TaskOutcome taskOutcome = IShardConsumer.TaskOutcome.NOT_COMPLETE;
            if (this.future != null && this.future.isDone()) {
                taskOutcome = this.determineTaskOutcome();
            }
            this.updateState(taskOutcome);
            ITask nextTask = this.getNextTask();
            if (nextTask != null) {
                this.currentTask = nextTask;
                try {
                    this.future = this.executorService.submit(this.currentTask);
                    this.currentTaskSubmitTime = System.currentTimeMillis();
                    submittedNewTask = true;
                    LOG.debug((Object)("Submitted new " + this.currentTask.getTaskType() + " task for shard " + this.shardInfo.getShardId()));
                }
                catch (RejectedExecutionException e) {
                    LOG.info((Object)(this.currentTask.getTaskType() + " task was not accepted for execution."), (Throwable)e);
                }
                catch (RuntimeException e) {
                    LOG.info((Object)(this.currentTask.getTaskType() + " task encountered exception "), (Throwable)e);
                }
            } else if (LOG.isDebugEnabled()) {
                LOG.debug((Object)String.format("No new task to submit for shard %s, currentState %s", this.shardInfo.getShardId(), this.currentState.toString()));
            }
        } else {
            long timeElapsed = System.currentTimeMillis() - this.currentTaskSubmitTime;
            String commonMessage = String.format("Previous %s task still pending for shard %s since %d ms ago. ", this.currentTask.getTaskType(), this.shardInfo.getShardId(), timeElapsed);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)(commonMessage + "Not submitting new task."));
            }
            this.config.getLogWarningForTaskAfterMillis().ifPresent(value -> {
                if (timeElapsed > value) {
                    LOG.warn((Object)commonMessage);
                }
            });
        }
        return submittedNewTask;
    }

    public boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist() {
        return this.skipShardSyncAtWorkerInitializationIfLeasesExist;
    }

    private IShardConsumer.TaskOutcome determineTaskOutcome() {
        try {
            TaskResult result = this.future.get();
            if (result.getException() == null) {
                if (result.isShardEndReached()) {
                    IShardConsumer.TaskOutcome taskOutcome = IShardConsumer.TaskOutcome.END_OF_SHARD;
                    return taskOutcome;
                }
                IShardConsumer.TaskOutcome taskOutcome = IShardConsumer.TaskOutcome.SUCCESSFUL;
                return taskOutcome;
            }
            this.logTaskException(result);
            if (result.isLeaseNotFound()) {
                IShardConsumer.TaskOutcome taskOutcome = IShardConsumer.TaskOutcome.LEASE_NOT_FOUND;
                return taskOutcome;
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            this.future = null;
        }
        return IShardConsumer.TaskOutcome.FAILURE;
    }

    private void logTaskException(TaskResult taskResult) {
        if (LOG.isDebugEnabled()) {
            Exception taskException = taskResult.getException();
            if (taskException instanceof BlockedOnParentShardException) {
                LOG.debug((Object)("Shard " + this.shardInfo.getShardId() + " is blocked on completion of parent shard."));
            } else {
                LOG.debug((Object)("Caught exception running " + this.currentTask.getTaskType() + " task: "), (Throwable)taskResult.getException());
            }
        }
    }

    public void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
        this.shutdownNotification = shutdownNotification;
        this.markForShutdown(ShutdownReason.REQUESTED);
    }

    public synchronized boolean beginShutdown() {
        this.markForShutdown(ShutdownReason.ZOMBIE);
        this.checkAndSubmitNextTask();
        return this.isShutdown();
    }

    synchronized void markForShutdown(ShutdownReason reason) {
        if (this.shutdownReason == null || this.shutdownReason.canTransitionTo(reason)) {
            this.shutdownReason = reason;
        }
    }

    public boolean isShutdown() {
        return this.currentState.isTerminal();
    }

    public ShutdownReason getShutdownReason() {
        return this.shutdownReason;
    }

    private ITask getNextTask() {
        ITask nextTask = this.currentState.createTask(this);
        if (nextTask == null) {
            return null;
        }
        return new MetricsCollectingTaskDecorator(nextTask, this.metricsFactory);
    }

    void updateState(IShardConsumer.TaskOutcome taskOutcome) {
        if (taskOutcome == IShardConsumer.TaskOutcome.END_OF_SHARD) {
            this.markForShutdown(ShutdownReason.TERMINATE);
            LOG.info((Object)("Shard " + this.shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE"));
        }
        if (taskOutcome == IShardConsumer.TaskOutcome.LEASE_NOT_FOUND) {
            this.markForShutdown(ShutdownReason.ZOMBIE);
            LOG.info((Object)("Shard " + this.shardInfo.getShardId() + ": Mark for shutdown with reason ZOMBIE as lease was not found"));
        }
        if (this.isShutdownRequested() && taskOutcome != IShardConsumer.TaskOutcome.FAILURE) {
            this.currentState = this.currentState.shutdownTransition(this.shutdownReason);
        } else if (this.isShutdownRequested() && DynamoDBStreamsConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals((Object)this.currentState.getState())) {
            this.currentState = this.currentState.shutdownTransition(this.shutdownReason);
        } else if (taskOutcome == IShardConsumer.TaskOutcome.SUCCESSFUL) {
            if (this.currentState.getTaskType() == this.currentTask.getTaskType()) {
                this.currentState = this.currentState.successTransition();
            } else {
                LOG.error((Object)("Current State task type of '" + this.currentState.getTaskType() + "' doesn't match the current tasks type of '" + this.currentTask.getTaskType() + "'.  This shouldn't happen, and indicates a programming error. Unable to safely transition to the next state."));
            }
        }
    }

    @VisibleForTesting
    public boolean isShutdownRequested() {
        return this.shutdownReason != null;
    }

    public KinesisConsumerStates.ShardConsumerState getCurrentState() {
        switch (this.currentState.getState()) {
            case WAITING_ON_PARENT_SHARDS: {
                return KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS;
            }
            case INITIALIZING: {
                return KinesisConsumerStates.ShardConsumerState.INITIALIZING;
            }
            case PROCESSING: {
                return KinesisConsumerStates.ShardConsumerState.PROCESSING;
            }
            case SHUTDOWN_REQUESTED: {
                return KinesisConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED;
            }
            case SHUTTING_DOWN: {
                return KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN;
            }
            case SHUTDOWN_COMPLETE: {
                return KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE;
            }
        }
        return null;
    }

    StreamConfig getStreamConfig() {
        return this.streamConfig;
    }

    IRecordProcessor getRecordProcessor() {
        return this.recordProcessor;
    }

    RecordProcessorCheckpointer getRecordProcessorCheckpointer() {
        return this.recordProcessorCheckpointer;
    }

    ExecutorService getExecutorService() {
        return this.executorService;
    }

    ShardInfo getShardInfo() {
        return this.shardInfo;
    }

    DynamoDBStreamsDataFetcher getDataFetcher() {
        return this.dataFetcher;
    }

    ILeaseManager<KinesisClientLease> getLeaseManager() {
        return this.leaseCoordinator.getLeaseManager();
    }

    KinesisClientLibLeaseCoordinator getLeaseCoordinator() {
        return this.leaseCoordinator;
    }

    ICheckpoint getCheckpoint() {
        return this.checkpoint;
    }

    long getParentShardPollIntervalMillis() {
        return this.parentShardPollIntervalMillis;
    }

    boolean isCleanupLeasesOfCompletedShards() {
        return this.cleanupLeasesOfCompletedShards;
    }

    boolean isIgnoreUnexpectedChildShards() {
        return this.config.shouldIgnoreUnexpectedChildShards();
    }

    long getTaskBackoffTimeMillis() {
        return this.taskBackoffTimeMillis;
    }

    Future<TaskResult> getFuture() {
        return this.future;
    }

    ShutdownNotification getShutdownNotification() {
        return this.shutdownNotification;
    }

    ShardSyncStrategy getShardSyncStrategy() {
        return this.shardSyncStrategy;
    }

    LeaseCleanupManager getLeaseCleanupManager() {
        return this.leaseCleanupManager;
    }
}

