/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.multilang;

import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.multilang.MessageReader;
import software.amazon.kinesis.multilang.MessageWriter;
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
import software.amazon.kinesis.multilang.messages.CheckpointMessage;
import software.amazon.kinesis.multilang.messages.Message;
import software.amazon.kinesis.multilang.messages.StatusMessage;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;

class MultiLangProtocol {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MultiLangProtocol.class);
    private final InitializationInput initializationInput;
    private final Optional<Integer> timeoutInSeconds;
    private MessageReader messageReader;
    private MessageWriter messageWriter;
    private MultiLangDaemonConfiguration configuration;

    MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, InitializationInput initializationInput, MultiLangDaemonConfiguration configuration) {
        this.messageReader = messageReader;
        this.messageWriter = messageWriter;
        this.initializationInput = initializationInput;
        this.configuration = configuration;
        this.timeoutInSeconds = Optional.ofNullable(configuration.getTimeoutInSeconds());
    }

    boolean initialize() {
        Future<Boolean> writeFuture = this.messageWriter.writeInitializeMessage(this.initializationInput);
        return this.waitForStatusMessage("initialize", null, writeFuture);
    }

    boolean processRecords(ProcessRecordsInput processRecordsInput) {
        Future<Boolean> writeFuture = this.messageWriter.writeProcessRecordsMessage(processRecordsInput);
        return this.waitForStatusMessage("processRecords", processRecordsInput.checkpointer(), writeFuture);
    }

    boolean leaseLost(LeaseLostInput leaseLostInput) {
        return this.waitForStatusMessage("leaseLost", null, this.messageWriter.writeLeaseLossMessage(leaseLostInput));
    }

    boolean shardEnded(ShardEndedInput shardEndedInput) {
        return this.waitForStatusMessage("shardEnded", shardEndedInput.checkpointer(), this.messageWriter.writeShardEndedMessage(shardEndedInput));
    }

    boolean shutdownRequested(RecordProcessorCheckpointer checkpointer) {
        Future<Boolean> writeFuture = this.messageWriter.writeShutdownRequestedMessage();
        return this.waitForStatusMessage("shutdownRequested", checkpointer, writeFuture);
    }

    private boolean waitForStatusMessage(String action, RecordProcessorCheckpointer checkpointer, Future<Boolean> writeFuture) {
        boolean statusWasCorrect = this.waitForStatusMessage(action, checkpointer);
        try {
            boolean writerIsStillOpen = writeFuture.get();
            return statusWasCorrect && writerIsStillOpen;
        }
        catch (InterruptedException e) {
            log.error("Interrupted while writing {} message for shard {}", (Object)action, (Object)this.initializationInput.shardId());
            return false;
        }
        catch (ExecutionException e) {
            log.error("Failed to write {} message for shard {}", new Object[]{action, this.initializationInput.shardId(), e});
            return false;
        }
    }

    boolean waitForStatusMessage(String action, RecordProcessorCheckpointer checkpointer) {
        Optional<Object> statusMessage = Optional.empty();
        while (!statusMessage.isPresent()) {
            Future<Message> future = this.messageReader.getNextMessageFromSTDOUT();
            Optional<Message> message = this.timeoutInSeconds.map(second -> this.futureMethod(() -> (Message)future.get(second.intValue(), TimeUnit.SECONDS), action)).orElse(this.futureMethod(future::get, action));
            if (!message.isPresent()) {
                return false;
            }
            Optional<Boolean> checkpointFailed = message.filter(m -> m instanceof CheckpointMessage).map(m -> (CheckpointMessage)m).flatMap(m -> this.futureMethod(() -> this.checkpoint((CheckpointMessage)m, checkpointer).get(), "Checkpoint")).map(checkpointSuccess -> checkpointSuccess == false);
            if (checkpointFailed.orElse(false).booleanValue()) {
                return false;
            }
            statusMessage = message.filter(m -> m instanceof StatusMessage).map(m -> (StatusMessage)m);
        }
        return this.validateStatusMessage((StatusMessage)statusMessage.get(), action);
    }

    private <T> Optional<T> futureMethod(FutureMethod<T> fm, String action) {
        try {
            return Optional.of(fm.get());
        }
        catch (InterruptedException e) {
            log.error("Interrupted while waiting for {} message for shard {}", new Object[]{action, this.initializationInput.shardId(), e});
        }
        catch (ExecutionException e) {
            log.error("Failed to get status message for {} action for shard {}", new Object[]{action, this.initializationInput.shardId(), e});
        }
        catch (TimeoutException e) {
            log.error("Timedout to get status message for {} action for shard {}. Terminating...", new Object[]{action, this.initializationInput.shardId(), e});
            this.haltJvm(1);
        }
        return Optional.empty();
    }

    protected void haltJvm(int exitStatus) {
        Runtime.getRuntime().halt(exitStatus);
    }

    private boolean validateStatusMessage(StatusMessage statusMessage, String action) {
        log.info("Received response {} from subprocess while waiting for {} while processing shard {}", new Object[]{statusMessage, action, this.initializationInput.shardId()});
        return statusMessage != null && statusMessage.getResponseFor() != null && statusMessage.getResponseFor().equals(action);
    }

    private Future<Boolean> checkpoint(CheckpointMessage checkpointMessage, RecordProcessorCheckpointer checkpointer) {
        String sequenceNumber = checkpointMessage.getSequenceNumber();
        Long subSequenceNumber = checkpointMessage.getSubSequenceNumber();
        try {
            if (checkpointer != null) {
                log.debug(this.logCheckpointMessage(sequenceNumber, subSequenceNumber));
                if (sequenceNumber != null) {
                    if (subSequenceNumber != null) {
                        checkpointer.checkpoint(sequenceNumber, subSequenceNumber.longValue());
                    } else {
                        checkpointer.checkpoint(sequenceNumber);
                    }
                } else {
                    checkpointer.checkpoint();
                }
                return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, null);
            }
            String message = String.format("Was asked to checkpoint at %s but no checkpointer was provided for shard %s", sequenceNumber, this.initializationInput.shardId());
            log.error(message);
            return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, (Throwable)new InvalidStateException(message));
        }
        catch (Throwable t) {
            return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, t);
        }
    }

    private String logCheckpointMessage(String sequenceNumber, Long subSequenceNumber) {
        return String.format("Attempting to checkpoint shard %s @ sequence number %s, and sub sequence number %s", this.initializationInput.shardId(), sequenceNumber, subSequenceNumber);
    }

    private static interface FutureMethod<T> {
        public T get() throws InterruptedException, TimeoutException, ExecutionException;
    }
}

