package io.eventuate.tram.sagas.common;

import io.eventuate.common.jdbc.EventuateDuplicateKeyException;
import io.eventuate.common.jdbc.EventuateJdbcStatementExecutor;
import io.eventuate.common.jdbc.EventuateSchema;
import io.eventuate.common.json.mapper.JSonMapper;
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.messaging.producer.MessageBuilder;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/eventuate/tram/sagas/common/SagaLockManagerImpl.class */
public class SagaLockManagerImpl implements SagaLockManager {
    private Logger logger = LoggerFactory.getLogger(getClass());
    private EventuateJdbcStatementExecutor eventuateJdbcStatementExecutor;
    private String insertIntoSagaLockTableSql;
    private String insertIntoSagaStashTableSql;
    private String selectFromSagaLockTableSql;
    private String selectFromSagaStashTableSql;
    private String updateSagaLockTableSql;
    private String deleteFromSagaLockTableSql;
    private String deleteFromSagaStashTableSql;

    public SagaLockManagerImpl(EventuateJdbcStatementExecutor eventuateJdbcStatementExecutor, EventuateSchema eventuateSchema) {
        this.eventuateJdbcStatementExecutor = eventuateJdbcStatementExecutor;
        String qualifyTable = eventuateSchema.qualifyTable("saga_lock_table");
        String qualifyTable2 = eventuateSchema.qualifyTable("saga_stash_table");
        this.insertIntoSagaLockTableSql = String.format("INSERT INTO %s(target, saga_type, saga_id) VALUES(?, ?,?)", qualifyTable);
        this.insertIntoSagaStashTableSql = String.format("INSERT INTO %s(message_id, target, saga_type, saga_id, message_headers, message_payload) VALUES(?, ?,?, ?, ?, ?)", qualifyTable2);
        this.selectFromSagaLockTableSql = String.format("select saga_id from %s WHERE target = ? FOR UPDATE", qualifyTable);
        this.selectFromSagaStashTableSql = String.format("select message_id, target, saga_type, saga_id, message_headers, message_payload from %s WHERE target = ? ORDER BY message_id LIMIT 1", qualifyTable2);
        this.updateSagaLockTableSql = String.format("update %s set saga_type = ?, saga_id = ? where target = ?", qualifyTable);
        this.deleteFromSagaLockTableSql = String.format("delete from %s where target = ?", qualifyTable);
        this.deleteFromSagaStashTableSql = String.format("delete from %s where message_id = ?", qualifyTable2);
    }

    public String getInsertIntoSagaLockTableSql() {
        return this.insertIntoSagaLockTableSql;
    }

    public String getInsertIntoSagaStashTableSql() {
        return this.insertIntoSagaStashTableSql;
    }

    public String getSelectFromSagaLockTableSql() {
        return this.selectFromSagaLockTableSql;
    }

    public String getSelectFromSagaStashTableSql() {
        return this.selectFromSagaStashTableSql;
    }

    public String getUpdateSagaLockTableSql() {
        return this.updateSagaLockTableSql;
    }

    public String getDeleteFromSagaLockTableSql() {
        return this.deleteFromSagaLockTableSql;
    }

    public String getDeleteFromSagaStashTableSql() {
        return this.deleteFromSagaStashTableSql;
    }

    @Override // io.eventuate.tram.sagas.common.SagaLockManager
    public boolean claimLock(String str, String str2, String str3) {
        while (true) {
            try {
                this.eventuateJdbcStatementExecutor.update(this.insertIntoSagaLockTableSql, new Object[]{str3, str, str2});
                this.logger.debug("Saga {} {} has locked {}", new Object[]{str, str2, str3});
                return true;
            } catch (EventuateDuplicateKeyException e) {
                Optional<String> selectForUpdate = selectForUpdate(str3);
                if (selectForUpdate.isPresent()) {
                    if (selectForUpdate.get().equals(str2)) {
                        return true;
                    }
                    this.logger.debug("Saga {} {} is blocked by {} which has locked {}", new Object[]{str, str2, selectForUpdate, str3});
                    return false;
                }
                this.logger.debug("{}  is repeating attempt to lock {}", str2, str3);
            }
        }
    }

    private Optional<String> selectForUpdate(String str) {
        return this.eventuateJdbcStatementExecutor.query(this.selectFromSagaLockTableSql, (resultSet, i) -> {
            return resultSet.getString("saga_id");
        }, new Object[]{str}).stream().findFirst();
    }

    @Override // io.eventuate.tram.sagas.common.SagaLockManager
    public void stashMessage(String str, String str2, String str3, Message message) {
        this.logger.debug("Stashing message from {} for {} : {}", new Object[]{str2, str3, message});
        this.eventuateJdbcStatementExecutor.update(this.insertIntoSagaStashTableSql, new Object[]{message.getRequiredHeader("ID"), str3, str, str2, JSonMapper.toJson(message.getHeaders()), message.getPayload()});
    }

    @Override // io.eventuate.tram.sagas.common.SagaLockManager
    public Optional<Message> unlock(String str, String str2) {
        Optional<String> selectForUpdate = selectForUpdate(str2);
        if (!selectForUpdate.isPresent()) {
            throw new RuntimeException("owningSagaId is not present");
        }
        if (!selectForUpdate.get().equals(str)) {
            throw new RuntimeException(String.format("Expected owner to be %s but is %s", str, selectForUpdate.get()));
        }
        this.logger.debug("Saga {} has unlocked {}", str, str2);
        List query = this.eventuateJdbcStatementExecutor.query(this.selectFromSagaStashTableSql, (resultSet, i) -> {
            return new StashedMessage(resultSet.getString("saga_type"), resultSet.getString("saga_id"), MessageBuilder.withPayload(resultSet.getString("message_payload")).withExtraHeaders("", (Map) JSonMapper.fromJson(resultSet.getString("message_headers"), Map.class)).build());
        }, new Object[]{str2});
        if (query.isEmpty()) {
            assertEqualToOne(this.eventuateJdbcStatementExecutor.update(this.deleteFromSagaLockTableSql, new Object[]{str2}));
            return Optional.empty();
        }
        StashedMessage stashedMessage = (StashedMessage) query.get(0);
        this.logger.debug("unstashed from {}  for {} : {}", new Object[]{str, str2, stashedMessage.getMessage()});
        assertEqualToOne(this.eventuateJdbcStatementExecutor.update(this.updateSagaLockTableSql, new Object[]{stashedMessage.getSagaType(), stashedMessage.getSagaId(), str2}));
        assertEqualToOne(this.eventuateJdbcStatementExecutor.update(this.deleteFromSagaStashTableSql, new Object[]{stashedMessage.getMessage().getId()}));
        return Optional.of(stashedMessage.getMessage());
    }

    private void assertEqualToOne(int i) {
        if (i != 1) {
            throw new RuntimeException("Expected to update one row but updated: " + i);
        }
    }
}
