/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventsourcing.eventstore.jpa;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.persistence.EntityManager;
import jakarta.persistence.TypedQuery;
import java.time.Instant;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.common.Assert;
import org.axonframework.common.DateTimeUtils;
import org.axonframework.common.TypeReference;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.common.jdbc.PersistenceExceptionResolver;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.conversion.Converter;
import org.axonframework.eventsourcing.eventstore.AggregateBasedConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.AggregateBasedEventStorageEngineUtils;
import org.axonframework.eventsourcing.eventstore.AggregateSequenceNumberPosition;
import org.axonframework.eventsourcing.eventstore.AppendCondition;
import org.axonframework.eventsourcing.eventstore.ConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.ContinuousMessageStream;
import org.axonframework.eventsourcing.eventstore.EmptyAppendTransaction;
import org.axonframework.eventsourcing.eventstore.EventCoordinator;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.SourcingCondition;
import org.axonframework.eventsourcing.eventstore.StreamSpliterator;
import org.axonframework.eventsourcing.eventstore.TaggedEventMessage;
import org.axonframework.eventsourcing.eventstore.jpa.AggregateBasedJpaEventStorageEngineConfiguration;
import org.axonframework.eventsourcing.eventstore.jpa.AggregateEventEntry;
import org.axonframework.eventsourcing.eventstore.jpa.GapAwareTrackingTokenOperations;
import org.axonframework.messaging.core.Context;
import org.axonframework.messaging.core.LegacyResources;
import org.axonframework.messaging.core.Message;
import org.axonframework.messaging.core.MessageStream;
import org.axonframework.messaging.core.MessageType;
import org.axonframework.messaging.core.SimpleEntry;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.axonframework.messaging.core.unitofwork.transaction.Transaction;
import org.axonframework.messaging.core.unitofwork.transaction.TransactionManager;
import org.axonframework.messaging.eventhandling.EventMessage;
import org.axonframework.messaging.eventhandling.GenericEventMessage;
import org.axonframework.messaging.eventhandling.TerminalEventMessage;
import org.axonframework.messaging.eventhandling.conversion.EventConverter;
import org.axonframework.messaging.eventhandling.processing.streaming.token.GapAwareTrackingToken;
import org.axonframework.messaging.eventhandling.processing.streaming.token.TrackingToken;
import org.axonframework.messaging.eventstreaming.EventCriterion;
import org.axonframework.messaging.eventstreaming.StreamingCondition;
import org.axonframework.messaging.eventstreaming.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AggregateBasedJpaEventStorageEngine
implements EventStorageEngine {
    private static final Logger logger = LoggerFactory.getLogger(AggregateBasedJpaEventStorageEngine.class);
    private static final TypeReference<Map<String, String>> METADATA_MAP_TYPE_REF = new TypeReference<Map<String, String>>(){};
    private static final String FIRST_TOKEN_QUERY = "SELECT COALESCE(MIN(e.globalIndex) - 1, -1) FROM AggregateEventEntry e";
    private static final String LATEST_TOKEN_QUERY = "SELECT COALESCE(MAX(e.globalIndex), -1) FROM AggregateEventEntry e";
    private static final String TOKEN_AT_QUERY = "SELECT COALESCE(MIN(e.globalIndex) - 1, -1) FROM AggregateEventEntry e WHERE e.timestamp >= :dateTime";
    private static final String EVENTS_BY_AGGREGATE_QUERY = "SELECT e FROM AggregateEventEntry e WHERE e.aggregateIdentifier = :id AND e.aggregateSequenceNumber >= :seq ORDER BY e.aggregateSequenceNumber ASC";
    private static final String EVENTS_BY_TOKEN_QUERY = "SELECT e FROM AggregateEventEntry e WHERE e.globalIndex > :token ORDER BY e.globalIndex ASC";
    private static final String EVENTS_BY_GAPPED_TOKEN = "SELECT e FROM AggregateEventEntry e WHERE e.globalIndex > :token OR e.globalIndex IN :gaps ORDER BY e.globalIndex ASC";
    private static final String INDEX_AND_TIMESTAMP_QUERY = "SELECT e.globalIndex, e.timestamp\nFROM AggregateEventEntry e WHERE e.globalIndex >= :firstGapOffset AND e.globalIndex <= :maxGlobalIndex";
    private final EntityManagerProvider entityManagerProvider;
    private final TransactionManager transactionManager;
    private final EventConverter converter;
    private final PersistenceExceptionResolver persistenceExceptionResolver;
    private final Predicate<List<? extends AggregateEventEntry>> finalBatchPredicate;
    private final int batchSize;
    private final int gapCleaningThreshold;
    private final int maxGapOffset;
    private final long lowestGlobalSequence;
    private final GapAwareTrackingTokenOperations tokenOperations;
    private final Map<Object, Runnable> streamCallbacks = new ConcurrentHashMap<Object, Runnable>();
    private EventCoordinator.Handle eventCoordinatorHandle;

    public AggregateBasedJpaEventStorageEngine(@Nonnull EntityManagerProvider entityManagerProvider, @Nonnull TransactionManager transactionManager, @Nonnull EventConverter converter, @Nonnull UnaryOperator<AggregateBasedJpaEventStorageEngineConfiguration> configurer) {
        this.entityManagerProvider = Objects.requireNonNull(entityManagerProvider, "The entityManagerProvider may not be null.");
        this.transactionManager = Objects.requireNonNull(transactionManager, "The transactionManager may not be null.");
        this.converter = Objects.requireNonNull(converter, "The converter may not be null.");
        AggregateBasedJpaEventStorageEngineConfiguration config = (AggregateBasedJpaEventStorageEngineConfiguration)Objects.requireNonNull(configurer, "The configurer may not be null.").apply(AggregateBasedJpaEventStorageEngineConfiguration.DEFAULT);
        this.persistenceExceptionResolver = config.persistenceExceptionResolver();
        this.finalBatchPredicate = config.finalBatchPredicate();
        this.batchSize = config.batchSize();
        this.gapCleaningThreshold = config.gapCleaningThreshold();
        this.lowestGlobalSequence = config.lowestGlobalSequence();
        this.maxGapOffset = config.maxGapOffset();
        this.tokenOperations = new GapAwareTrackingTokenOperations(config.gapTimeout(), logger);
        this.eventCoordinatorHandle = config.eventCoordinator().startCoordination(this::onAppendDetected);
    }

    private EntityManager entityManager() {
        return this.entityManagerProvider.getEntityManager();
    }

    @Override
    public CompletableFuture<EventStorageEngine.AppendTransaction<?>> appendEvents(final @Nonnull AppendCondition condition, @Nullable ProcessingContext processingContext, final @Nonnull List<TaggedEventMessage<?>> events) {
        try {
            AggregateBasedEventStorageEngineUtils.assertValidTags(events);
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
        if (events.isEmpty()) {
            return CompletableFuture.completedFuture(EmptyAppendTransaction.INSTANCE);
        }
        return CompletableFuture.completedFuture(new EventStorageEngine.AppendTransaction<AggregateBasedConsistencyMarker>(){
            private final AtomicBoolean txFinished = new AtomicBoolean(false);
            private final AggregateBasedConsistencyMarker preCommitConsistencyMarker = AggregateBasedConsistencyMarker.from(condition);

            @Override
            public CompletableFuture<AggregateBasedConsistencyMarker> commit(@Nullable ProcessingContext context) {
                if (this.txFinished.getAndSet(true)) {
                    return CompletableFuture.failedFuture(new IllegalStateException("Already committed or rolled back"));
                }
                AggregateBasedConsistencyMarker.AggregateSequencer aggregateSequencer = this.preCommitConsistencyMarker.createSequencer();
                CompletableFuture txResult = new CompletableFuture();
                Transaction tx = AggregateBasedJpaEventStorageEngine.this.transactionManager.startTransaction();
                try {
                    AggregateBasedJpaEventStorageEngine.this.entityManagerPersistEvents(aggregateSequencer, events);
                    tx.commit();
                    AggregateBasedJpaEventStorageEngine.this.eventCoordinatorHandle.onEventsAppended(events);
                    txResult.complete(null);
                }
                catch (Exception e2) {
                    tx.rollback();
                    txResult.completeExceptionally(e2);
                }
                return ((CompletableFuture)txResult.exceptionallyCompose(e -> CompletableFuture.failedFuture(this.translateConflictException((Throwable)e)))).thenApply(v -> aggregateSequencer.toMarker());
            }

            @Override
            public CompletableFuture<ConsistencyMarker> afterCommit(@Nonnull AggregateBasedConsistencyMarker marker, @Nullable ProcessingContext context) {
                return CompletableFuture.completedFuture(marker);
            }

            private Throwable translateConflictException(Throwable e) {
                Predicate<Throwable> isConflictException = t -> {
                    Exception ex;
                    return AggregateBasedJpaEventStorageEngine.this.persistenceExceptionResolver != null && t instanceof Exception && AggregateBasedJpaEventStorageEngine.this.persistenceExceptionResolver.isDuplicateKeyViolation(ex = (Exception)t);
                };
                return AggregateBasedEventStorageEngineUtils.translateConflictException(this.preCommitConsistencyMarker, e, isConflictException);
            }

            @Override
            public void rollback(@Nullable ProcessingContext context) {
                this.txFinished.set(true);
            }
        });
    }

    private void entityManagerPersistEvents(AggregateBasedConsistencyMarker.AggregateSequencer aggregateSequencer, List<TaggedEventMessage<?>> events) {
        try (EntityManager entityManager = this.entityManager();){
            events.stream().map(taggedEvent -> AggregateBasedJpaEventStorageEngine.mapToEntry(taggedEvent, aggregateSequencer, (Converter)this.converter)).forEach(arg_0 -> ((EntityManager)entityManager).persist(arg_0));
        }
    }

    private static AggregateEventEntry mapToEntry(TaggedEventMessage<?> taggedEvent, AggregateBasedConsistencyMarker.AggregateSequencer aggregateSequencer, Converter converter) {
        Object event = taggedEvent.event();
        Set<Tag> tags = taggedEvent.tags();
        String aggregateIdentifier = AggregateBasedEventStorageEngineUtils.resolveAggregateIdentifier(tags);
        return new AggregateEventEntry(event.identifier(), event.type().name(), event.type().version(), (byte[])event.payloadAs(byte[].class, converter), (byte[])converter.convert((Object)event.metadata(), byte[].class), event.timestamp(), AggregateBasedEventStorageEngineUtils.resolveAggregateType(tags), AggregateBasedEventStorageEngineUtils.resolveAggregateIdentifier(tags), aggregateIdentifier != null ? Long.valueOf(aggregateSequencer.incrementAndGetSequenceOf(aggregateIdentifier)) : null);
    }

    @Override
    public MessageStream<EventMessage> source(@Nonnull SourcingCondition condition, @Nullable ProcessingContext processingContext) {
        CompletableFuture endOfStreams = new CompletableFuture();
        List<AggregateSource> aggregateSources = condition.criteria().flatten().stream().map(criterion -> this.aggregateSourceForCriterion(condition, (EventCriterion)criterion)).toList();
        return aggregateSources.stream().map(AggregateSource::source).reduce((MessageStream)MessageStream.empty().cast(), MessageStream::concatWith).onComplete(() -> endOfStreams.complete(null)).concatWith((MessageStream)MessageStream.fromFuture((CompletableFuture)endOfStreams.thenApply(event -> TerminalEventMessage.INSTANCE), unused -> Context.with(ConsistencyMarker.RESOURCE_KEY, (Object)AggregateBasedJpaEventStorageEngine.combineAggregateMarkers(aggregateSources.stream()))));
    }

    private AggregateSource aggregateSourceForCriterion(SourcingCondition condition, EventCriterion criterion) {
        AtomicReference<AggregateBasedConsistencyMarker> markerReference = new AtomicReference<AggregateBasedConsistencyMarker>();
        String aggregateIdentifier = AggregateBasedEventStorageEngineUtils.resolveAggregateIdentifier(criterion.tags());
        long firstSequenceNumber = AggregateSequenceNumberPosition.toSequenceNumber(condition.start());
        StreamSpliterator entrySpliterator = new StreamSpliterator(lastEntry -> (List)this.transactionManager.fetchInTransaction(() -> this.queryEventsBy(aggregateIdentifier, lastEntry != null && lastEntry.aggregateSequenceNumber() != null ? lastEntry.aggregateSequenceNumber() + 1L : firstSequenceNumber)), this.finalBatchPredicate);
        MessageStream source = MessageStream.fromStream(StreamSupport.stream(entrySpliterator, false), this::convertToEventMessage, entry -> AggregateBasedJpaEventStorageEngine.setMarkerAndBuildContext(entry, markerReference)).onComplete(() -> markerReference.compareAndSet(null, new AggregateBasedConsistencyMarker(aggregateIdentifier, 0L))).cast();
        return new AggregateSource(markerReference, (MessageStream<EventMessage>)source);
    }

    List<AggregateEventEntry> queryEventsBy(String aggregateIdentifier, long firstSequenceNumber) {
        try (EntityManager entityManager = this.entityManager();){
            List list = entityManager.createQuery(EVENTS_BY_AGGREGATE_QUERY, AggregateEventEntry.class).setParameter("id", (Object)aggregateIdentifier).setParameter("seq", (Object)firstSequenceNumber).setMaxResults(this.batchSize).getResultList();
            return list;
        }
    }

    private static Context setMarkerAndBuildContext(AggregateEventEntry entry, AtomicReference<AggregateBasedConsistencyMarker> markerReference) {
        String aggregateId = Objects.requireNonNullElse(entry.aggregateIdentifier(), entry.aggregateIdentifier());
        String aggregateType = entry.aggregateType();
        Long aggregateSeqNo = Objects.requireNonNullElse(entry.aggregateSequenceNumber(), 0L);
        markerReference.set(new AggregateBasedConsistencyMarker(aggregateId, aggregateSeqNo));
        return AggregateBasedJpaEventStorageEngine.buildContext(aggregateId, aggregateSeqNo, aggregateType);
    }

    private static ConsistencyMarker combineAggregateMarkers(Stream<AggregateSource> resultStream) {
        return resultStream.map(AggregateSource::markerReference).map(AtomicReference::get).map(marker -> marker).reduce(ConsistencyMarker::upperBound).orElseThrow();
    }

    @Override
    public MessageStream<EventMessage> stream(@Nonnull StreamingCondition condition, @Nullable ProcessingContext processingContext) {
        GapAwareTrackingToken trackingToken = this.tokenOperations.assertGapAwareTrackingToken(condition.position());
        return new ContinuousMessageStream<TokenAndEvent>(last -> this.queryTokensAndEventsBy((TrackingToken)(last == null ? trackingToken : last.token)), tae -> new SimpleEntry((Message)this.convertToEventMessage(tae.event), AggregateBasedJpaEventStorageEngine.buildTrackedContext(tae)), (ms, r) -> {
            this.streamCallbacks.put(ms, (Runnable)r);
            return () -> this.streamCallbacks.remove(ms) != null;
        });
    }

    private List<TokenAndEvent> queryTokensAndEventsBy(TrackingToken start) {
        Assert.isTrue((start == null || start instanceof GapAwareTrackingToken ? 1 : 0) != 0, () -> String.format("Token [%s] is of the wrong type. Expected [%s]", start, GapAwareTrackingToken.class.getSimpleName()));
        ArrayList<TokenAndEvent> result = new ArrayList<TokenAndEvent>();
        GapAwareTrackingToken cleanedToken = this.cleanedToken((GapAwareTrackingToken)start);
        List events = (List)this.transactionManager.fetchInTransaction(() -> this.queryEventsBy(cleanedToken));
        GapAwareTrackingToken token = cleanedToken;
        Instant gapTimeoutThreshold = this.tokenOperations.gapTimeoutThreshold();
        for (AggregateEventEntry event : events) {
            token = this.calculateToken(token, event.globalIndex(), event.timestamp(), gapTimeoutThreshold);
            result.add(new TokenAndEvent(token, event));
        }
        return result;
    }

    private GapAwareTrackingToken cleanedToken(GapAwareTrackingToken lastToken) {
        return lastToken != null && lastToken.getGaps().size() > this.gapCleaningThreshold ? this.tokenOperations.withGapsCleaned(lastToken, this.indexAndTimestampBetweenGaps(lastToken)) : lastToken;
    }

    private List<Object[]> indexAndTimestampBetweenGaps(GapAwareTrackingToken token) {
        return (List)this.transactionManager.fetchInTransaction(() -> {
            try (EntityManager entityManager = this.entityManager();){
                List list = entityManager.createQuery(INDEX_AND_TIMESTAMP_QUERY, Object[].class).setParameter("firstGapOffset", token.getGaps().first()).setParameter("maxGlobalIndex", (Object)((Long)token.getGaps().last() + 1L)).getResultList();
                return list;
            }
        });
    }

    private List<AggregateEventEntry> queryEventsBy(GapAwareTrackingToken token) {
        try (EntityManager entityManager = this.entityManager();){
            TypedQuery eventsByTokenQuery = token == null || token.getGaps().isEmpty() ? entityManager.createQuery(EVENTS_BY_TOKEN_QUERY, AggregateEventEntry.class) : entityManager.createQuery(EVENTS_BY_GAPPED_TOKEN, AggregateEventEntry.class).setParameter("gaps", (Object)token.getGaps());
            List list = eventsByTokenQuery.setParameter("token", (Object)(token == null ? -1L : token.getIndex())).setMaxResults(this.batchSize).getResultList();
            return list;
        }
    }

    private GapAwareTrackingToken calculateToken(@Nullable GapAwareTrackingToken token, long globalIndex, @Nonnull Instant timestamp, @Nonnull Instant gapTimeoutThreshold) {
        boolean allowGaps = timestamp.isAfter(gapTimeoutThreshold);
        return token == null ? GapAwareTrackingToken.newInstance((long)globalIndex, this.calculateGaps(globalIndex, allowGaps)) : token.advanceTo(globalIndex, allowGaps ? this.maxGapOffset : 0);
    }

    @Nonnull
    private Collection<Long> calculateGaps(long globalIndex, boolean allowGaps) {
        return allowGaps ? (Collection)LongStream.range(Math.min(this.lowestGlobalSequence, globalIndex), globalIndex).boxed().collect(Collectors.toCollection(TreeSet::new)) : Collections.emptySortedSet();
    }

    private GenericEventMessage convertToEventMessage(AggregateEventEntry event) {
        return new GenericEventMessage(event.identifier(), new MessageType(event.type(), event.version()), (Object)event.payload(), (Map)this.converter.convert((Object)event.metadata(), METADATA_MAP_TYPE_REF.getType()), event.timestamp());
    }

    private static Context buildTrackedContext(@Nonnull TokenAndEvent tokenAndEvent) {
        AggregateEventEntry entry = tokenAndEvent.event();
        Context context = AggregateBasedJpaEventStorageEngine.buildContext(Objects.requireNonNullElse(entry.aggregateIdentifier(), entry.identifier()), Objects.requireNonNullElse(entry.aggregateSequenceNumber(), 0L), entry.aggregateType());
        return context.withResource(TrackingToken.RESOURCE_KEY, (Object)tokenAndEvent.token);
    }

    private static Context buildContext(@Nonnull String aggregateIdentifier, @Nonnull Long aggregateSequenceNumber, @Nullable String aggregateType) {
        Context context = Context.with((Context.ResourceKey)LegacyResources.AGGREGATE_IDENTIFIER_KEY, (Object)aggregateIdentifier).withResource(LegacyResources.AGGREGATE_SEQUENCE_NUMBER_KEY, (Object)aggregateSequenceNumber);
        return aggregateType != null ? context.withResource(LegacyResources.AGGREGATE_TYPE_KEY, (Object)aggregateType) : context;
    }

    @Override
    public CompletableFuture<TrackingToken> firstToken(@Nullable ProcessingContext processingContext) {
        return this.queryToken(FIRST_TOKEN_QUERY);
    }

    @Override
    public CompletableFuture<TrackingToken> latestToken(@Nullable ProcessingContext processingContext) {
        return this.queryToken(LATEST_TOKEN_QUERY);
    }

    @Nonnull
    private CompletableFuture<TrackingToken> queryToken(String firstTokenQuery) {
        try (EntityManager entityManager = this.entityManager();){
            long position = (Long)entityManager.createQuery(firstTokenQuery, Long.class).getSingleResult();
            CompletableFuture<GapAwareTrackingToken> completableFuture = CompletableFuture.completedFuture(new GapAwareTrackingToken(position, Set.of()));
            return completableFuture;
        }
    }

    @Override
    public CompletableFuture<TrackingToken> tokenAt(@Nonnull Instant at, @Nullable ProcessingContext processingContext) {
        try (EntityManager entityManager = this.entityManager();){
            long position = (Long)entityManager.createQuery(TOKEN_AT_QUERY, Long.class).setParameter("dateTime", (Object)DateTimeUtils.formatInstant((TemporalAccessor)at)).getSingleResult();
            CompletableFuture<GapAwareTrackingToken> completableFuture = CompletableFuture.completedFuture(new GapAwareTrackingToken(position, Set.of()));
            return completableFuture;
        }
    }

    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("entityManagerProvider", (Object)this.entityManagerProvider);
        descriptor.describeProperty("transactionManager", (Object)this.transactionManager);
        descriptor.describeProperty("converter", (Object)this.converter);
        descriptor.describeProperty("persistenceExceptionResolver", (Object)this.persistenceExceptionResolver);
        descriptor.describeProperty("tokenOperations", (Object)this.tokenOperations);
    }

    public void close() {
        if (this.eventCoordinatorHandle != null) {
            this.eventCoordinatorHandle.terminate();
            this.eventCoordinatorHandle = null;
        }
    }

    private void onAppendDetected() {
        for (Runnable callback : this.streamCallbacks.values()) {
            callback.run();
        }
    }

    private record AggregateSource(AtomicReference<AggregateBasedConsistencyMarker> markerReference, MessageStream<EventMessage> source) {
    }

    private record TokenAndEvent(GapAwareTrackingToken token, AggregateEventEntry event) {
    }
}

