package io.eventuate.tram.sagas.orchestration;

import io.eventuate.tram.commands.common.CommandMessageHeaders;
import io.eventuate.tram.commands.common.CommandReplyOutcome;
import io.eventuate.tram.commands.common.Failure;
import io.eventuate.tram.commands.common.Success;
import io.eventuate.tram.commands.producer.CommandProducer;
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageBuilder;
import io.eventuate.tram.sagas.common.LockTarget;
import io.eventuate.tram.sagas.common.SagaLockManager;
import io.eventuate.tram.sagas.common.SagaReplyHeaders;
import io.eventuate.tram.sagas.common.SagaUnlockCommand;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/eventuate/tram/sagas/orchestration/SagaManagerImpl.class */
public class SagaManagerImpl<Data> implements SagaManager<Data> {
    private Logger logger = LoggerFactory.getLogger(getClass());
    private Saga<Data> saga;
    private SagaInstanceRepository sagaInstanceRepository;
    private CommandProducer commandProducer;
    private MessageConsumer messageConsumer;
    private SagaLockManager sagaLockManager;
    private SagaCommandProducer sagaCommandProducer;

    public SagaManagerImpl(Saga<Data> saga, SagaInstanceRepository sagaInstanceRepository, CommandProducer commandProducer, MessageConsumer messageConsumer, SagaLockManager sagaLockManager, SagaCommandProducer sagaCommandProducer) {
        this.saga = saga;
        this.sagaInstanceRepository = sagaInstanceRepository;
        this.commandProducer = commandProducer;
        this.messageConsumer = messageConsumer;
        this.sagaLockManager = sagaLockManager;
        this.sagaCommandProducer = sagaCommandProducer;
    }

    public void setSagaCommandProducer(SagaCommandProducer sagaCommandProducer) {
        this.sagaCommandProducer = sagaCommandProducer;
    }

    public void setSagaInstanceRepository(SagaInstanceRepository sagaInstanceRepository) {
        this.sagaInstanceRepository = sagaInstanceRepository;
    }

    public void setCommandProducer(CommandProducer commandProducer) {
        this.commandProducer = commandProducer;
    }

    public void setMessageConsumer(MessageConsumer messageConsumer) {
        this.messageConsumer = messageConsumer;
    }

    public void setSagaLockManager(SagaLockManager sagaLockManager) {
        this.sagaLockManager = sagaLockManager;
    }

    @Override // io.eventuate.tram.sagas.orchestration.SagaManager
    public SagaInstance create(Data data) {
        return create(data, Optional.empty());
    }

    @Override // io.eventuate.tram.sagas.orchestration.SagaManager
    public SagaInstance create(Data data, Class cls, Object obj) {
        return create(data, Optional.of(new LockTarget(cls, obj).getTarget()));
    }

    @Override // io.eventuate.tram.sagas.orchestration.SagaManager
    public SagaInstance create(Data data, Optional<String> optional) {
        SagaInstance sagaInstance = new SagaInstance(getSagaType(), null, "????", null, SagaDataSerde.serializeSagaData(data), new HashSet());
        this.sagaInstanceRepository.save(sagaInstance);
        String id = sagaInstance.getId();
        this.saga.onStarting(id, data);
        optional.ifPresent(str -> {
            if (!this.sagaLockManager.claimLock(getSagaType(), id, str)) {
                throw new RuntimeException("Cannot claim lock for resource");
            }
        });
        SagaActions<Data> start = getStateDefinition().start(data);
        start.getLocalException().ifPresent(runtimeException -> {
            throw runtimeException;
        });
        processActions(id, sagaInstance, data, start);
        return sagaInstance;
    }

    private void performEndStateActions(String str, SagaInstance sagaInstance, boolean z, Data data) {
        for (DestinationAndResource destinationAndResource : sagaInstance.getDestinationsAndResources()) {
            HashMap hashMap = new HashMap();
            hashMap.put("command_saga_id", str);
            hashMap.put("command_saga_type", getSagaType());
            this.commandProducer.send(destinationAndResource.getDestination(), destinationAndResource.getResource(), new SagaUnlockCommand(), makeSagaReplyChannel(), hashMap);
        }
        if (z) {
            this.saga.onSagaRolledBack(str, data);
        } else {
            this.saga.onSagaCompletedSuccessfully(str, data);
        }
    }

    private SagaDefinition<Data> getStateDefinition() {
        SagaDefinition<Data> sagaDefinition = this.saga.getSagaDefinition();
        if (sagaDefinition == null) {
            throw new RuntimeException("state machine cannot be null");
        }
        return sagaDefinition;
    }

    private String getSagaType() {
        return this.saga.getSagaType();
    }

    @Override // io.eventuate.tram.sagas.orchestration.SagaManager
    @PostConstruct
    public void subscribeToReplyChannel() {
        this.messageConsumer.subscribe(this.saga.getSagaType() + "-consumer", Collections.singleton(makeSagaReplyChannel()), this::handleMessage);
    }

    private String makeSagaReplyChannel() {
        return getSagaType() + "-reply";
    }

    public void handleMessage(Message message) {
        this.logger.debug("handle message invoked {}", message);
        if (message.hasHeader(SagaReplyHeaders.REPLY_SAGA_ID)) {
            handleReply(message);
        } else {
            this.logger.warn("Handle message doesn't know what to do with: {} ", message);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void handleReply(Message message) {
        if (isReplyForThisSagaType(message).booleanValue()) {
            this.logger.debug("Handle reply: {}", message);
            String requiredHeader = message.getRequiredHeader(SagaReplyHeaders.REPLY_SAGA_ID);
            SagaInstance find = this.sagaInstanceRepository.find(message.getRequiredHeader(SagaReplyHeaders.REPLY_SAGA_TYPE), requiredHeader);
            Object deserializeSagaData = SagaDataSerde.deserializeSagaData(find.getSerializedSagaData());
            message.getHeader("saga-locked-target").ifPresent(str -> {
                find.addDestinationsAndResources(Collections.singleton(new DestinationAndResource(message.getRequiredHeader(CommandMessageHeaders.inReply("command__destination")), str)));
            });
            String stateName = find.getStateName();
            this.logger.info("Current state={}", stateName);
            SagaActions handleReply = getStateDefinition().handleReply(stateName, deserializeSagaData, message);
            this.logger.info("Handled reply. Sending commands {}", handleReply.getCommands());
            processActions(requiredHeader, find, deserializeSagaData, handleReply);
        }
    }

    private void processActions(String str, SagaInstance sagaInstance, Data data, SagaActions<Data> sagaActions) {
        while (true) {
            if (sagaActions.getLocalException().isPresent()) {
                sagaActions = getStateDefinition().handleReply(sagaActions.getUpdatedState().get(), sagaActions.getUpdatedSagaData().get(), MessageBuilder.withPayload("{}").withHeader("reply_outcome-type", CommandReplyOutcome.FAILURE.name()).withHeader("reply_type", Failure.class.getName()).build());
            } else {
                sagaInstance.setLastRequestId(this.sagaCommandProducer.sendCommands(getSagaType(), str, sagaActions.getCommands(), makeSagaReplyChannel()));
                updateState(sagaInstance, sagaActions);
                sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(sagaActions.getUpdatedSagaData().orElse(data)));
                if (sagaActions.isEndState()) {
                    performEndStateActions(str, sagaInstance, sagaActions.isCompensating(), data);
                }
                this.sagaInstanceRepository.update(sagaInstance);
                if (!sagaActions.isLocal()) {
                    return;
                } else {
                    sagaActions = getStateDefinition().handleReply(sagaActions.getUpdatedState().get(), sagaActions.getUpdatedSagaData().get(), MessageBuilder.withPayload("{}").withHeader("reply_outcome-type", CommandReplyOutcome.SUCCESS.name()).withHeader("reply_type", Success.class.getName()).build());
                }
            }
        }
    }

    private void updateState(SagaInstance sagaInstance, SagaActions<Data> sagaActions) {
        sagaActions.getUpdatedState().ifPresent(str -> {
            sagaInstance.setStateName(str);
            sagaInstance.setEndState(Boolean.valueOf(sagaActions.isEndState()));
            sagaInstance.setCompensating(Boolean.valueOf(sagaActions.isCompensating()));
        });
    }

    private Boolean isReplyForThisSagaType(Message message) {
        return (Boolean) message.getHeader(SagaReplyHeaders.REPLY_SAGA_TYPE).map(str -> {
            return Boolean.valueOf(str.equals(getSagaType()));
        }).orElse(false);
    }
}
