/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.deadletter.jdbc;

import java.lang.invoke.MethodHandles;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import javax.annotation.Nonnull;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.jdbc.ConnectionProvider;
import org.axonframework.common.jdbc.JdbcException;
import org.axonframework.common.jdbc.JdbcUtils;
import org.axonframework.common.jdbc.PagingJdbcIterable;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.deadletter.jdbc.DeadLetterJdbcConverter;
import org.axonframework.eventhandling.deadletter.jdbc.DeadLetterSchema;
import org.axonframework.eventhandling.deadletter.jdbc.DeadLetterStatementFactory;
import org.axonframework.eventhandling.deadletter.jdbc.DeadLetterTableFactory;
import org.axonframework.eventhandling.deadletter.jdbc.DefaultDeadLetterJdbcConverter;
import org.axonframework.eventhandling.deadletter.jdbc.DefaultDeadLetterStatementFactory;
import org.axonframework.eventhandling.deadletter.jdbc.JdbcDeadLetter;
import org.axonframework.messaging.deadletter.Cause;
import org.axonframework.messaging.deadletter.DeadLetter;
import org.axonframework.messaging.deadletter.DeadLetterQueueOverflowException;
import org.axonframework.messaging.deadletter.EnqueueDecision;
import org.axonframework.messaging.deadletter.GenericDeadLetter;
import org.axonframework.messaging.deadletter.NoSuchDeadLetterException;
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue;
import org.axonframework.messaging.deadletter.WrongDeadLetterTypeException;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcSequencedDeadLetterQueue<E extends EventMessage<?>>
implements SequencedDeadLetterQueue<E> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final boolean CLOSE_QUIETLY = true;
    private final String processingGroup;
    private final ConnectionProvider connectionProvider;
    private final TransactionManager transactionManager;
    private final DeadLetterSchema schema;
    private final DeadLetterStatementFactory<E> statementFactory;
    private final DeadLetterJdbcConverter<E, ? extends JdbcDeadLetter<E>> converter;
    private final int maxSequences;
    private final int maxSequenceSize;
    private final int pageSize;
    private final Duration claimDuration;

    protected JdbcSequencedDeadLetterQueue(Builder<E> builder) {
        builder.validate();
        this.processingGroup = builder.processingGroup;
        this.connectionProvider = builder.connectionProvider;
        this.transactionManager = builder.transactionManager;
        this.schema = builder.schema;
        this.statementFactory = builder.statementFactory();
        this.converter = builder.converter();
        this.maxSequences = builder.maxSequences;
        this.maxSequenceSize = builder.maxSequenceSize;
        this.pageSize = builder.pageSize;
        this.claimDuration = builder.claimDuration;
    }

    public static <E extends EventMessage<?>> Builder<E> builder() {
        return new Builder();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void createSchema(DeadLetterTableFactory tableFactory) {
        Connection connection = this.getConnection();
        try {
            Statement tableStatement = tableFactory.createTableStatement(connection, this.schema);
            try {
                tableStatement.executeBatch();
            }
            finally {
                JdbcUtils.closeQuietly(tableStatement);
            }
        }
        catch (SQLException e) {
            throw new JdbcException("Failed to create the dead-letter entry table or indices", e);
        }
        finally {
            JdbcUtils.closeQuietly(connection);
        }
    }

    private Connection getConnection() {
        try {
            return this.connectionProvider.getConnection();
        }
        catch (SQLException e) {
            throw new JdbcException("Failed to obtain a database connection", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void enqueue(@Nonnull Object sequenceIdentifier, @Nonnull DeadLetter<? extends E> letter) throws DeadLetterQueueOverflowException {
        String sequenceId = this.toStringSequenceIdentifier(sequenceIdentifier);
        if (this.isFull(sequenceId)) {
            throw new DeadLetterQueueOverflowException("No room left to enqueue [" + String.valueOf(letter.message()) + "] for identifier [" + sequenceId + "] since the queue is full.");
        }
        if (logger.isDebugEnabled()) {
            Optional<Cause> optionalCause = letter.cause();
            if (optionalCause.isPresent()) {
                logger.info("Adding dead letter with message id [{}] because [{}].", (Object)((EventMessage)letter.message()).getIdentifier(), (Object)optionalCause.get().type());
            } else {
                logger.info("Adding dead letter with message id [{}] because the sequence identifier [{}] is already present.", (Object)((EventMessage)letter.message()).getIdentifier(), (Object)sequenceId);
            }
        }
        Connection connection = this.getConnection();
        try {
            JdbcUtils.executeUpdate(connection, c -> this.statementFactory.enqueueStatement(c, this.processingGroup, sequenceId, letter, this.nextIndexForSequence(sequenceId)), e -> new JdbcException("Failed to enqueue dead letter with with message id [" + ((EventMessage)letter.message()).getIdentifier() + "]", (Throwable)e));
        }
        finally {
            JdbcUtils.closeQuietly(connection);
        }
    }

    private long nextIndexForSequence(String sequenceId) {
        long nextIndex = this.maxIndexForSequence(sequenceId) + 1L;
        logger.debug("Next index for [{}] is [{}]", (Object)sequenceId, (Object)nextIndex);
        return nextIndex;
    }

    private long maxIndexForSequence(String sequenceId) {
        return this.transactionManager.fetchInTransaction(() -> JdbcUtils.executeQuery(this.getConnection(), connection -> this.statementFactory.maxIndexStatement(connection, this.processingGroup, sequenceId), resultSet -> JdbcUtils.nextAndExtract(resultSet, 1, Long.class, 0L), e -> new JdbcException("Failed to uncover the maximum index for sequence [" + sequenceId + "]", (Throwable)e)));
    }

    @Override
    public void evict(DeadLetter<? extends E> letter) {
        if (!(letter instanceof JdbcDeadLetter)) {
            throw new WrongDeadLetterTypeException(String.format("Invoke evict with a JdbcDeadLetter instance. Instead got: [%s]", letter.getClass().getName()));
        }
        JdbcDeadLetter jdbcLetter = (JdbcDeadLetter)letter;
        String identifier = jdbcLetter.getIdentifier();
        String sequenceIdentifier = jdbcLetter.getSequenceIdentifier();
        logger.info("Evicting dead letter with id [{}] for processing group [{}] and sequence [{}]", new Object[]{identifier, this.processingGroup, sequenceIdentifier});
        this.transactionManager.executeInTransaction(() -> {
            Connection connection = this.getConnection();
            try {
                int deletedRows = JdbcUtils.executeUpdate(connection, c -> this.statementFactory.evictStatement(c, identifier), e -> new JdbcException("Failed to evict letter with message id [" + ((EventMessage)letter.message()).getIdentifier() + "]", (Throwable)e));
                if (deletedRows == 0) {
                    logger.info("Dead letter with identifier [{}] for processing group [{}] and sequence [{}] was already evicted", new Object[]{identifier, this.processingGroup, sequenceIdentifier});
                }
            }
            finally {
                JdbcUtils.closeQuietly(connection);
            }
        });
    }

    @Override
    public void requeue(@Nonnull DeadLetter<? extends E> letter, @Nonnull UnaryOperator<DeadLetter<? extends E>> letterUpdater) throws NoSuchDeadLetterException {
        if (!(letter instanceof JdbcDeadLetter)) {
            throw new WrongDeadLetterTypeException(String.format("Invoke requeue with a JdbcDeadLetter instance. Instead got: [%s]", letter.getClass().getName()));
        }
        JdbcDeadLetter jdbcLetter = (JdbcDeadLetter)letter;
        String identifier = jdbcLetter.getIdentifier();
        logger.info("Requeueing dead letter with id [{}] for processing group [{}] and sequence [{}]", new Object[]{identifier, this.processingGroup, jdbcLetter.getSequenceIdentifier()});
        DeadLetter updatedLetter = ((DeadLetter)letterUpdater.apply(jdbcLetter)).markTouched();
        this.transactionManager.executeInTransaction(() -> {
            Connection connection = this.getConnection();
            try {
                int updatedRows = JdbcUtils.executeUpdate(connection, c -> this.statementFactory.requeueStatement(c, identifier, updatedLetter.cause().orElse(null), updatedLetter.lastTouched(), updatedLetter.diagnostics()), e -> new JdbcException("Failed to requeue letter with message id [" + ((EventMessage)letter.message()).getIdentifier() + "]", (Throwable)e));
                if (updatedRows == 0) {
                    throw new NoSuchDeadLetterException("Cannot requeue [" + ((EventMessage)letter.message()).getIdentifier() + "] since there is not matching entry in this queue.");
                }
                if (logger.isTraceEnabled()) {
                    logger.trace("Requeued letter [{}] for sequence [{}].", (Object)identifier, (Object)jdbcLetter.getSequenceIdentifier());
                }
            }
            finally {
                JdbcUtils.closeQuietly(connection);
            }
        });
    }

    @Override
    public boolean contains(@Nonnull Object sequenceIdentifier) {
        String sequenceId = this.toStringSequenceIdentifier(sequenceIdentifier);
        if (logger.isDebugEnabled()) {
            logger.debug("Validating existence of sequence identifier [{}].", (Object)sequenceId);
        }
        return JdbcUtils.executeQuery(this.getConnection(), connection -> this.statementFactory.containsStatement(connection, this.processingGroup, sequenceId), resultSet -> JdbcUtils.nextAndExtract(resultSet, 1, Long.class, 0L) > 0L, e -> new JdbcException("Failed to validate whether there are letters present for sequence [" + sequenceId + "]", (Throwable)e), true);
    }

    @Override
    public Iterable<DeadLetter<? extends E>> deadLetterSequence(@Nonnull Object sequenceIdentifier) {
        String sequenceId = this.toStringSequenceIdentifier(sequenceIdentifier);
        if (!this.contains(sequenceId)) {
            return Collections.emptyList();
        }
        return new PagingJdbcIterable<DeadLetter<? extends E>>(this.transactionManager, this::getConnection, (connection, firstResult, maxSize) -> this.statementFactory.letterSequenceStatement(connection, this.processingGroup, sequenceId, firstResult, this.maxSequenceSize), this.pageSize, this.converter::convertToLetter, e -> new JdbcException("Failed to retrieve dead letters for sequence [" + sequenceId + "]", (Throwable)e));
    }

    @Override
    public Iterable<Iterable<DeadLetter<? extends E>>> deadLetters() {
        List<String> sequenceIdentifiers = JdbcUtils.executeQuery(this.getConnection(), connection -> this.statementFactory.sequenceIdentifiersStatement(connection, this.processingGroup), JdbcUtils.listResults(resultSet -> resultSet.getString(1)), e -> new JdbcException("Failed to retrieve all sequence identifiers", (Throwable)e), true);
        return () -> {
            final Iterator sequenceIterator = sequenceIdentifiers.iterator();
            return new Iterator<Iterable<DeadLetter<? extends E>>>(){

                @Override
                public boolean hasNext() {
                    return sequenceIterator.hasNext();
                }

                @Override
                public Iterable<DeadLetter<? extends E>> next() {
                    String next = (String)sequenceIterator.next();
                    return JdbcSequencedDeadLetterQueue.this.deadLetterSequence(next);
                }
            };
        };
    }

    @Override
    public boolean isFull(@Nonnull Object sequenceIdentifier) {
        String sequenceId = this.toStringSequenceIdentifier(sequenceIdentifier);
        long numberInSequence = this.sequenceSize(sequenceId);
        return numberInSequence > 0L ? numberInSequence >= (long)this.maxSequenceSize : this.amountOfSequences() >= (long)this.maxSequences;
    }

    @Override
    public long size() {
        return JdbcUtils.executeQuery(this.getConnection(), connection -> this.statementFactory.sizeStatement(connection, this.processingGroup), resultSet -> JdbcUtils.nextAndExtract(resultSet, 1, Long.class, 0L), e -> new JdbcException("Failed to check the total number of dead letters", (Throwable)e), true);
    }

    @Override
    public long sequenceSize(@Nonnull Object sequenceIdentifier) {
        String sequenceId = this.toStringSequenceIdentifier(sequenceIdentifier);
        return JdbcUtils.executeQuery(this.getConnection(), connection -> this.statementFactory.sequenceSizeStatement(connection, this.processingGroup, sequenceId), resultSet -> JdbcUtils.nextAndExtract(resultSet, 1, Long.class, 0L), e -> new JdbcException("Failed to check the number of dead letters in sequence [" + sequenceId + "]", (Throwable)e), true);
    }

    @Override
    public long amountOfSequences() {
        return JdbcUtils.executeQuery(this.getConnection(), connection -> this.statementFactory.amountOfSequencesStatement(connection, this.processingGroup), resultSet -> JdbcUtils.nextAndExtract(resultSet, 1, Long.class, 0L), e -> new JdbcException("Failed to check the number of dead letter sequences in this queue", (Throwable)e), true);
    }

    @Override
    public boolean process(@Nonnull Predicate<DeadLetter<? extends E>> sequenceFilter, @Nonnull Function<DeadLetter<? extends E>, EnqueueDecision<E>> processingTask) {
        logger.debug("Received a request to process matching dead letters.");
        Iterator<JdbcDeadLetter<E>> iterator = this.findClaimableSequences(10);
        JdbcDeadLetter<E> claimedLetter = null;
        while (iterator.hasNext() && claimedLetter == null) {
            JdbcDeadLetter<E> next = iterator.next();
            if (!sequenceFilter.test(next) || !this.claimDeadLetter(next)) continue;
            claimedLetter = next;
        }
        if (claimedLetter != null) {
            return this.processInitialAndSubsequent(claimedLetter, processingTask);
        }
        logger.debug("Received a request to process dead letters but there are no matching or claimable sequences.");
        return false;
    }

    @Override
    public boolean process(@Nonnull Function<DeadLetter<? extends E>, EnqueueDecision<E>> processingTask) {
        logger.debug("Received a request to process any dead letters.");
        Iterator<JdbcDeadLetter<E>> iterator = this.findClaimableSequences(1);
        if (iterator.hasNext()) {
            JdbcDeadLetter<E> deadLetter = iterator.next();
            this.claimDeadLetter(deadLetter);
            return this.processInitialAndSubsequent(deadLetter, processingTask);
        }
        logger.debug("Received a request to process dead letters but there are no claimable sequences.");
        return false;
    }

    private Iterator<? extends JdbcDeadLetter<E>> findClaimableSequences(int pageSize) {
        return new PagingJdbcIterable<JdbcDeadLetter>(this.transactionManager, this::getConnection, (connection, firstResult, maxSize) -> this.statementFactory.claimableSequencesStatement(connection, this.processingGroup, this.processingStartedLimit(), firstResult, maxSize), pageSize, this.converter::convertToLetter, e -> new JdbcException("Failed to find any claimable sequences for processing", (Throwable)e)).iterator();
    }

    private boolean claimDeadLetter(JdbcDeadLetter<E> letter) {
        Instant processingStartedLimit = this.processingStartedLimit();
        return this.transactionManager.fetchInTransaction(() -> {
            Connection connection = this.getConnection();
            try {
                int updatedRows = JdbcUtils.executeUpdate(connection, c -> this.statementFactory.claimStatement(c, letter.getIdentifier(), GenericDeadLetter.clock.instant(), processingStartedLimit), e -> new JdbcException("Failed to claim JDBC dead letter [" + letter.getIdentifier() + "] for processing", (Throwable)e));
                if (updatedRows > 0) {
                    logger.debug("Claimed dead letter with identifier [{}] to process.", (Object)letter.getIdentifier());
                    Boolean bl = true;
                    return bl;
                }
                logger.debug("Failed to claim dead letter with identifier [{}].", (Object)letter.getIdentifier());
                Boolean bl = false;
                return bl;
            }
            finally {
                JdbcUtils.closeQuietly(connection);
            }
        });
    }

    private Instant processingStartedLimit() {
        return GenericDeadLetter.clock.instant().minus(this.claimDuration);
    }

    private boolean processInitialAndSubsequent(JdbcDeadLetter<E> initialLetter, Function<DeadLetter<? extends E>, EnqueueDecision<E>> processingTask) {
        JdbcDeadLetter<E> current = initialLetter;
        while (current != null) {
            logger.info("Processing dead letter with identifier [{}] at index [{}]", (Object)current.getIdentifier(), (Object)current.getSequenceIndex());
            EnqueueDecision decision = processingTask.apply(current);
            if (!decision.shouldEnqueue()) {
                JdbcDeadLetter<E> previous = current;
                JdbcDeadLetter<E> next = this.findNext(previous.getSequenceIdentifier(), previous.getSequenceIndex());
                if (next != null) {
                    current = next;
                    this.claimDeadLetter(current);
                } else {
                    current = null;
                }
                this.evict((DeadLetter<? extends E>)previous);
                continue;
            }
            this.requeue((DeadLetter<? extends E>)current, letter -> decision.withDiagnostics((DeadLetter)letter).withCause(decision.enqueueCause().orElse(null)));
            return false;
        }
        return true;
    }

    private JdbcDeadLetter<E> findNext(String sequenceIdentifier, long sequenceIndex) {
        return this.transactionManager.fetchInTransaction(() -> JdbcUtils.executeQuery(this.getConnection(), connection -> this.statementFactory.nextLetterInSequenceStatement(connection, this.processingGroup, sequenceIdentifier, sequenceIndex), resultSet -> resultSet.next() ? this.converter.convertToLetter(resultSet) : null, e -> new JdbcException("Failed to find the next dead letter in sequence [" + sequenceIdentifier + "] for processing", (Throwable)e), true));
    }

    @Override
    public void clear() {
        Connection connection = this.getConnection();
        try {
            JdbcUtils.executeUpdate(connection, c -> this.statementFactory.clearStatement(c, this.processingGroup), e -> new JdbcException("Failed to clear out all dead letters for processing group [" + this.processingGroup + "]", (Throwable)e));
        }
        finally {
            JdbcUtils.closeQuietly(connection);
        }
    }

    private String toStringSequenceIdentifier(Object sequenceIdentifier) {
        return sequenceIdentifier instanceof String ? (String)sequenceIdentifier : Integer.toString(sequenceIdentifier.hashCode());
    }

    public static class Builder<E extends EventMessage<?>> {
        private String processingGroup;
        private ConnectionProvider connectionProvider;
        private TransactionManager transactionManager;
        private DeadLetterSchema schema = DeadLetterSchema.defaultSchema();
        private DeadLetterStatementFactory<E> statementFactory;
        private DeadLetterJdbcConverter<E, ? extends JdbcDeadLetter<E>> converter;
        private Serializer genericSerializer;
        private Serializer eventSerializer;
        private int maxSequences = 1024;
        private int maxSequenceSize = 1024;
        private int pageSize = 100;
        private Duration claimDuration = Duration.ofSeconds(30L);

        public Builder<E> processingGroup(@Nonnull String processingGroup) {
            BuilderUtils.assertNonEmpty(processingGroup, "Can not set processingGroup to an empty String.");
            this.processingGroup = processingGroup;
            return this;
        }

        public Builder<E> connectionProvider(@Nonnull ConnectionProvider connectionProvider) {
            BuilderUtils.assertNonNull(connectionProvider, "ConnectionProvider may not be null");
            this.connectionProvider = connectionProvider;
            return this;
        }

        public Builder<E> transactionManager(@Nonnull TransactionManager transactionManager) {
            BuilderUtils.assertNonNull(transactionManager, "The TransactionManager may not be null");
            this.transactionManager = transactionManager;
            return this;
        }

        public Builder<E> schema(@Nonnull DeadLetterSchema schema) {
            BuilderUtils.assertNonNull(schema, "DeadLetterSchema may not be null");
            this.schema = schema;
            return this;
        }

        public Builder<E> statementFactory(@Nonnull DeadLetterStatementFactory<E> statementFactory) {
            BuilderUtils.assertNonNull(statementFactory, "The DeadLetterStatementFactory may not be null");
            this.statementFactory = statementFactory;
            return this;
        }

        public Builder<E> converter(@Nonnull DeadLetterJdbcConverter<E, ? extends JdbcDeadLetter<E>> converter) {
            BuilderUtils.assertNonNull(converter, "The DeadLetterJdbcConverter may not be null");
            this.converter = converter;
            return this;
        }

        public Builder<E> genericSerializer(@Nonnull Serializer genericSerializer) {
            BuilderUtils.assertNonNull(genericSerializer, "The generic serializer may not be null");
            this.genericSerializer = genericSerializer;
            return this;
        }

        public Builder<E> eventSerializer(@Nonnull Serializer eventSerializer) {
            BuilderUtils.assertNonNull(eventSerializer, "The event serializer may not be null");
            this.eventSerializer = eventSerializer;
            return this;
        }

        public Builder<E> maxSequences(int maxSequences) {
            BuilderUtils.assertStrictPositive(maxSequences, "The maximum number of sequences should be larger than 0");
            this.maxSequences = maxSequences;
            return this;
        }

        public Builder<E> maxSequenceSize(int maxSequenceSize) {
            BuilderUtils.assertStrictPositive(maxSequenceSize, "The maximum number of entries in a sequence should be larger than 0");
            this.maxSequenceSize = maxSequenceSize;
            return this;
        }

        public Builder<E> claimDuration(@Nonnull Duration claimDuration) {
            BuilderUtils.assertNonNull(claimDuration, "Claim duration can not be set to null.");
            this.claimDuration = claimDuration;
            return this;
        }

        public Builder<E> pageSize(int pageSize) {
            BuilderUtils.assertStrictPositive(pageSize, "The page size  should be larger than 0.");
            this.pageSize = pageSize;
            return this;
        }

        public JdbcSequencedDeadLetterQueue<E> build() {
            return new JdbcSequencedDeadLetterQueue(this);
        }

        private DeadLetterStatementFactory<E> statementFactory() {
            return ObjectUtils.getOrDefault(this.statementFactory, DefaultDeadLetterStatementFactory.builder().schema(this.schema).genericSerializer(this.genericSerializer).eventSerializer(this.eventSerializer).build());
        }

        private DeadLetterJdbcConverter<E, ? extends JdbcDeadLetter<E>> converter() {
            return (DeadLetterJdbcConverter)((Object)ObjectUtils.getOrDefault(this.converter, () -> DefaultDeadLetterJdbcConverter.builder().schema(this.schema).genericSerializer(this.genericSerializer).eventSerializer(this.eventSerializer).build()));
        }

        protected void validate() {
            BuilderUtils.assertNonEmpty(this.processingGroup, "The processing group is a hard requirement and should be non-empty");
            BuilderUtils.assertNonNull(this.connectionProvider, "The ConnectionProvider is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.transactionManager, "The TransactionManager is a hard requirement and should be provided");
            if (this.statementFactory == null) {
                BuilderUtils.assertNonNull(this.genericSerializer, "The generic Serializer is a hard requirement when the DeadLetterStatementFactory is not provided");
                BuilderUtils.assertNonNull(this.eventSerializer, "The event Serializer is a hard requirement when the DeadLetterStatementFactory is not provided");
            }
            if (this.converter == null) {
                BuilderUtils.assertNonNull(this.genericSerializer, "The generic Serializer is a hard requirement when the DeadLetterJdbcConverter is not provided");
                BuilderUtils.assertNonNull(this.eventSerializer, "The event Serializer is a hard requirement when the DeadLetterJdbcConverter is not provided");
            }
        }
    }
}

