/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventsourcing.eventstore.jpa;

import jakarta.annotation.Nonnull;
import jakarta.persistence.EntityManager;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.common.Assert;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.common.jdbc.PersistenceExceptionResolver;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.common.transaction.Transaction;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.DomainEventData;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventData;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GapAwareTrackingToken;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.GenericTrackedDomainEventMessage;
import org.axonframework.eventhandling.GenericTrackedEventMessage;
import org.axonframework.eventhandling.TerminalEventMessage;
import org.axonframework.eventhandling.TrackedDomainEventData;
import org.axonframework.eventhandling.TrackedEventData;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.AggregateBasedConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.AppendCondition;
import org.axonframework.eventsourcing.eventstore.ConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.EmptyAppendTransaction;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.LegacyAggregateBasedEventStorageEngineUtils;
import org.axonframework.eventsourcing.eventstore.LegacyResources;
import org.axonframework.eventsourcing.eventstore.SourcingCondition;
import org.axonframework.eventsourcing.eventstore.TaggedEventMessage;
import org.axonframework.eventsourcing.eventstore.jpa.DomainEventEntry;
import org.axonframework.eventsourcing.eventstore.jpa.GapAwareTrackingTokenOperations;
import org.axonframework.eventsourcing.eventstore.jpa.LegacyJpaEventStorageOperations;
import org.axonframework.eventstreaming.EventCriterion;
import org.axonframework.eventstreaming.StreamingCondition;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.MessageType;
import org.axonframework.messaging.MetaData;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AggregateBasedJpaEventStorageEngine
implements EventStorageEngine {
    private static final Logger logger = LoggerFactory.getLogger(AggregateBasedJpaEventStorageEngine.class);
    private static final String DOMAIN_EVENT_ENTRY_ENTITY_NAME = DomainEventEntry.class.getSimpleName();
    private final EntityManagerProvider entityManagerProvider;
    private final TransactionManager transactionManager;
    private final Serializer eventSerializer;
    private final PersistenceExceptionResolver persistenceExceptionResolver;
    private final LegacyJpaEventStorageOperations legacyJpaOperations;
    private final BatchingEventStorageOperations batchingOperations;
    private final GapAwareTrackingTokenOperations tokenOperations;

    public AggregateBasedJpaEventStorageEngine(@Nonnull EntityManagerProvider entityManagerProvider, @Nonnull TransactionManager transactionManager, @Nonnull Serializer eventSerializer, @Nonnull UnaryOperator<Customization> configurationOverride) {
        this.entityManagerProvider = Objects.requireNonNull(entityManagerProvider, "entityManagerProvider may not be null");
        this.transactionManager = Objects.requireNonNull(transactionManager, "transactionManager may not be null");
        this.eventSerializer = Objects.requireNonNull(eventSerializer, "eventSerializer may not be null");
        Customization customization = (Customization)Objects.requireNonNull(configurationOverride, "configurationOverride may not be null").apply(Customization.withDefaultValues());
        this.legacyJpaOperations = new LegacyJpaEventStorageOperations(transactionManager, entityManagerProvider, DOMAIN_EVENT_ENTRY_ENTITY_NAME, "unused");
        this.tokenOperations = new GapAwareTrackingTokenOperations(customization.tokenGapsHandling().timeout(), logger);
        this.batchingOperations = new BatchingEventStorageOperations(transactionManager, this.legacyJpaOperations, this.tokenOperations, customization.batchSize(), customization.finalAggregateBatchPredicate(), true, customization.tokenGapsHandling().cleaningThreshold(), customization.lowestGlobalSequence(), customization.tokenGapsHandling().maxOffset());
        this.persistenceExceptionResolver = customization.persistenceExceptionResolver();
    }

    @Override
    public CompletableFuture<EventStorageEngine.AppendTransaction> appendEvents(final @Nonnull AppendCondition condition, final @Nonnull List<TaggedEventMessage<?>> events) {
        try {
            LegacyAggregateBasedEventStorageEngineUtils.assertValidTags(events);
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
        if (events.isEmpty()) {
            return CompletableFuture.completedFuture(EmptyAppendTransaction.INSTANCE);
        }
        return CompletableFuture.completedFuture(new EventStorageEngine.AppendTransaction(){
            private final AtomicBoolean txFinished = new AtomicBoolean(false);
            private final AggregateBasedConsistencyMarker preCommitConsistencyMarker = AggregateBasedConsistencyMarker.from(condition);

            @Override
            public CompletableFuture<ConsistencyMarker> commit() {
                if (this.txFinished.getAndSet(true)) {
                    return CompletableFuture.failedFuture(new IllegalStateException("Already committed or rolled back"));
                }
                LegacyAggregateBasedEventStorageEngineUtils.AggregateSequencer aggregateSequencer = LegacyAggregateBasedEventStorageEngineUtils.AggregateSequencer.with(this.preCommitConsistencyMarker);
                CompletableFuture txResult = new CompletableFuture();
                Transaction tx = AggregateBasedJpaEventStorageEngine.this.transactionManager.startTransaction();
                try {
                    AggregateBasedJpaEventStorageEngine.this.entityManagerPersistEvents(aggregateSequencer, events);
                    tx.commit();
                    txResult.complete(null);
                }
                catch (Exception e2) {
                    tx.rollback();
                    txResult.completeExceptionally(e2);
                }
                AggregateBasedConsistencyMarker afterCommitConsistencyMarker = aggregateSequencer.forwarded();
                return ((CompletableFuture)txResult.exceptionallyCompose(e -> CompletableFuture.failedFuture(this.translateConflictException((Throwable)e)))).thenApply(r -> afterCommitConsistencyMarker);
            }

            private Throwable translateConflictException(Throwable e) {
                Predicate<Throwable> isConflictException = t -> {
                    Exception ex;
                    return AggregateBasedJpaEventStorageEngine.this.persistenceExceptionResolver != null && t instanceof Exception && AggregateBasedJpaEventStorageEngine.this.persistenceExceptionResolver.isDuplicateKeyViolation(ex = (Exception)t);
                };
                return LegacyAggregateBasedEventStorageEngineUtils.translateConflictException(this.preCommitConsistencyMarker, e, isConflictException);
            }

            @Override
            public void rollback() {
                this.txFinished.set(true);
            }
        });
    }

    private void entityManagerPersistEvents(LegacyAggregateBasedEventStorageEngineUtils.AggregateSequencer aggregateSequencer, List<TaggedEventMessage<?>> events) {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        events.stream().map(taggedEvent -> AggregateBasedJpaEventStorageEngine.toDomainEventMessage(taggedEvent, aggregateSequencer)).map(domainEventMessage -> new DomainEventEntry((DomainEventMessage<?>)domainEventMessage, this.eventSerializer)).forEach(arg_0 -> ((EntityManager)entityManager).persist(arg_0));
    }

    private static DomainEventMessage<?> toDomainEventMessage(TaggedEventMessage<?> taggedEvent, LegacyAggregateBasedEventStorageEngineUtils.AggregateSequencer aggregateSequencer) {
        boolean isAggregateEvent;
        String aggregateIdentifier = LegacyAggregateBasedEventStorageEngineUtils.resolveAggregateIdentifier(taggedEvent.tags());
        String aggregateType = LegacyAggregateBasedEventStorageEngineUtils.resolveAggregateType(taggedEvent.tags());
        Object event = taggedEvent.event();
        boolean bl = isAggregateEvent = aggregateIdentifier != null && aggregateType != null && !taggedEvent.tags().isEmpty();
        if (isAggregateEvent) {
            long nextSequence = aggregateSequencer.incrementAndGetSequenceOf(aggregateIdentifier);
            return new GenericDomainEventMessage(aggregateType, aggregateIdentifier, nextSequence, event.getIdentifier(), event.type(), event.getPayload(), (Map)event.getMetaData(), event.getTimestamp());
        }
        return new GenericDomainEventMessage(null, event.getIdentifier(), 0L, event, () -> event.getTimestamp());
    }

    private DomainEventMessage<?> convertToDomainEventMessage(DomainEventData<?> event) {
        return new GenericDomainEventMessage(event.getType(), event.getAggregateIdentifier(), event.getSequenceNumber(), this.convertToEventMessage((EventData<?>)event), event.getTimestamp());
    }

    private GenericEventMessage<?> convertToEventMessage(EventData<?> event) {
        SerializedObject payload = event.getPayload();
        String revision = payload.getType().getRevision();
        Class payloadClass = this.eventSerializer.classForType(payload.getType());
        MessageType messageType = revision == null ? new MessageType(payloadClass) : new MessageType(payloadClass, revision);
        SerializedObject metadata = event.getMetaData();
        MetaData metaData = (MetaData)this.eventSerializer.convert(metadata.getData(), MetaData.class);
        return new GenericEventMessage(event.getEventIdentifier(), messageType, payload.getData(), (Map)metaData, event.getTimestamp());
    }

    @Override
    public MessageStream<EventMessage<?>> source(@Nonnull SourcingCondition condition) {
        CompletableFuture endOfStreams = new CompletableFuture();
        List<AggregateSource> aggregateSources = condition.criteria().flatten().stream().map(criterion -> this.aggregateSourceForCriterion(condition, (EventCriterion)criterion)).toList();
        return aggregateSources.stream().map(AggregateSource::source).reduce((MessageStream)MessageStream.empty().cast(), MessageStream::concatWith).whenComplete(() -> endOfStreams.complete(null)).concatWith((MessageStream)MessageStream.fromFuture((CompletableFuture)endOfStreams.thenApply(event -> TerminalEventMessage.INSTANCE), unused -> Context.with(ConsistencyMarker.RESOURCE_KEY, (Object)AggregateBasedJpaEventStorageEngine.combineAggregateMarkers(aggregateSources.stream()))));
    }

    private AggregateSource aggregateSourceForCriterion(SourcingCondition condition, EventCriterion criterion) {
        AtomicReference<AggregateBasedConsistencyMarker> markerReference = new AtomicReference<AggregateBasedConsistencyMarker>();
        String aggregateIdentifier = LegacyAggregateBasedEventStorageEngineUtils.resolveAggregateIdentifier(criterion.tags());
        Stream<? extends DomainEventData<?>> events = this.batchingOperations.readEventData(aggregateIdentifier, condition.start());
        MessageStream source = MessageStream.fromStream(events, this::convertToDomainEventMessage, event -> AggregateBasedJpaEventStorageEngine.setMarkerAndBuildContext(event.getAggregateIdentifier(), event.getSequenceNumber(), event.getType(), markerReference)).whenComplete(() -> markerReference.compareAndSet(null, new AggregateBasedConsistencyMarker(aggregateIdentifier, 0L))).cast();
        return new AggregateSource(markerReference, source);
    }

    private static Context setMarkerAndBuildContext(String aggregateIdentifier, long sequenceNumber, String aggregateType, AtomicReference<AggregateBasedConsistencyMarker> markerReference) {
        markerReference.set(new AggregateBasedConsistencyMarker(aggregateIdentifier, sequenceNumber));
        return AggregateBasedJpaEventStorageEngine.buildContext(aggregateIdentifier, sequenceNumber, aggregateType);
    }

    private static ConsistencyMarker combineAggregateMarkers(Stream<AggregateSource> resultStream) {
        return resultStream.map(AggregateSource::markerReference).map(AtomicReference::get).map(marker -> marker).reduce(ConsistencyMarker::upperBound).orElseThrow();
    }

    @Override
    public MessageStream<EventMessage<?>> stream(@Nonnull StreamingCondition condition) {
        GapAwareTrackingToken trackingToken = this.tokenOperations.assertGapAwareTrackingToken(condition.position());
        Stream<? extends TrackedEventData<?>> events = this.batchingOperations.readEventData((TrackingToken)trackingToken);
        return MessageStream.fromStream(events, this::convertToTrackedEventMessage, AggregateBasedJpaEventStorageEngine::trackedEventContext);
    }

    private TrackedEventMessage<?> convertToTrackedEventMessage(TrackedEventData<?> event) {
        TrackingToken trackingToken = event.trackingToken();
        if (event instanceof TrackedDomainEventData) {
            TrackedDomainEventData trackedDomainEventData = (TrackedDomainEventData)event;
            DomainEventMessage<?> domainEventMessage = this.convertToDomainEventMessage((DomainEventData<?>)trackedDomainEventData);
            return new GenericTrackedDomainEventMessage(trackingToken, domainEventMessage);
        }
        return new GenericTrackedEventMessage(trackingToken, this.convertToEventMessage((EventData<?>)event));
    }

    private static Context trackedEventContext(TrackedEventData<?> trackedEventData) {
        TrackedDomainEventData trackedDomainEventData;
        Context context = Context.empty();
        if (trackedEventData instanceof TrackedDomainEventData && (trackedDomainEventData = (TrackedDomainEventData)trackedEventData).getAggregateIdentifier() != null && trackedDomainEventData.getType() != null) {
            context = AggregateBasedJpaEventStorageEngine.buildContext(trackedDomainEventData.getAggregateIdentifier(), trackedDomainEventData.getSequenceNumber(), trackedDomainEventData.getType());
        }
        TrackingToken trackingToken = trackedEventData.trackingToken();
        return context.withResource(TrackingToken.RESOURCE_KEY, (Object)trackingToken);
    }

    private static Context buildContext(String aggregateIdentifier, long sequenceNumber, String aggregateType) {
        return Context.with(LegacyResources.AGGREGATE_IDENTIFIER_KEY, (Object)aggregateIdentifier).withResource(LegacyResources.AGGREGATE_SEQUENCE_NUMBER_KEY, (Object)sequenceNumber).withResource(LegacyResources.AGGREGATE_TYPE_KEY, (Object)aggregateType);
    }

    @Override
    public CompletableFuture<TrackingToken> firstToken() {
        TrackingToken first = this.legacyJpaOperations.minGlobalIndex().flatMap(this::gapAwareTrackingTokenOn).orElse(null);
        return CompletableFuture.completedFuture(first);
    }

    @Override
    public CompletableFuture<TrackingToken> latestToken() {
        TrackingToken latest = this.legacyJpaOperations.maxGlobalIndex().flatMap(this::gapAwareTrackingTokenOn).orElse(null);
        return CompletableFuture.completedFuture(latest);
    }

    @Override
    public CompletableFuture<TrackingToken> tokenAt(@Nonnull Instant at) {
        TrackingToken token = this.legacyJpaOperations.globalIndexAt(at).flatMap(this::gapAwareTrackingTokenOn).or(() -> this.legacyJpaOperations.maxGlobalIndex().flatMap(this::gapAwareTrackingTokenOn)).orElse(null);
        return CompletableFuture.completedFuture(token);
    }

    private Optional<TrackingToken> gapAwareTrackingTokenOn(Long globalIndex) {
        return globalIndex == null ? Optional.empty() : Optional.of(GapAwareTrackingToken.newInstance((long)globalIndex, Collections.emptySet()));
    }

    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("entityManagerProvider", (Object)this.entityManagerProvider);
        descriptor.describeProperty("transactionManager", (Object)this.transactionManager);
        descriptor.describeProperty("eventSerializer", (Object)this.eventSerializer);
        descriptor.describeProperty("persistenceExceptionResolver", (Object)this.persistenceExceptionResolver);
        descriptor.describeProperty("legacyJpaOperations", (Object)this.legacyJpaOperations);
        descriptor.describeProperty("tokenOperations", (Object)this.tokenOperations);
        descriptor.describeProperty("batchingOperations", (Object)this.batchingOperations);
    }

    public record Customization(PersistenceExceptionResolver persistenceExceptionResolver, int batchSize, Predicate<List<? extends DomainEventData<?>>> finalAggregateBatchPredicate, long lowestGlobalSequence, TokenGapsHandlingConfig tokenGapsHandling) {
        private static final int DEFAULT_BATCH_SIZE = 100;
        private static final long DEFAULT_LOWEST_GLOBAL_SEQUENCE = 1L;

        public Customization {
            BuilderUtils.assertThat((Object)batchSize, size -> size > 0, (String)"The batchSize must be a positive number");
            BuilderUtils.assertThat((Object)lowestGlobalSequence, number -> number > 0L, (String)"The lowestGlobalSequence must be a positive number");
        }

        public static Customization withDefaultValues() {
            return new Customization(null, 100, null, 1L, TokenGapsHandlingConfig.withDefaultValues());
        }

        public Customization persistenceExceptionResolver(PersistenceExceptionResolver persistenceExceptionResolver) {
            return new Customization(persistenceExceptionResolver, this.batchSize, this.finalAggregateBatchPredicate, this.lowestGlobalSequence, this.tokenGapsHandling);
        }

        public Customization batchSize(int batchSize) {
            return new Customization(this.persistenceExceptionResolver, batchSize, this.finalAggregateBatchPredicate, this.lowestGlobalSequence, this.tokenGapsHandling);
        }

        public Customization finalAggregateBatchPredicate(Predicate<List<? extends DomainEventData<?>>> finalAggregateBatchPredicate) {
            return new Customization(this.persistenceExceptionResolver, this.batchSize, finalAggregateBatchPredicate, this.lowestGlobalSequence, this.tokenGapsHandling);
        }

        public Customization lowestGlobalSequence(long lowestGlobalSequence) {
            return new Customization(this.persistenceExceptionResolver, this.batchSize, this.finalAggregateBatchPredicate, lowestGlobalSequence, this.tokenGapsHandling);
        }

        public Customization tokenGapsHandling(UnaryOperator<TokenGapsHandlingConfig> configurationOverride) {
            return new Customization(this.persistenceExceptionResolver, this.batchSize, this.finalAggregateBatchPredicate, this.lowestGlobalSequence, this.tokenGapsHandling);
        }

        public record TokenGapsHandlingConfig(int maxOffset, int timeout, int cleaningThreshold) {
            private static final int DEFAULT_MAX_GAP_OFFSET = 10000;
            private static final int DEFAULT_GAP_TIMEOUT = 60000;
            private static final int DEFAULT_GAP_CLEANING_THRESHOLD = 250;

            public TokenGapsHandlingConfig {
                BuilderUtils.assertPositive((int)maxOffset, (String)"maxOffset");
                BuilderUtils.assertPositive((int)timeout, (String)"timeout");
                BuilderUtils.assertPositive((int)cleaningThreshold, (String)"cleaningThreshold");
            }

            static TokenGapsHandlingConfig withDefaultValues() {
                return new TokenGapsHandlingConfig(10000, 60000, 250);
            }
        }
    }

    private record BatchingEventStorageOperations(TransactionManager transactionManager, LegacyJpaEventStorageOperations legacyJpaOperations, GapAwareTrackingTokenOperations tokenOperations, int batchSize, Predicate<List<? extends DomainEventData<?>>> finalAggregateBatchPredicate, boolean fetchForAggregateUntilEmpty, int gapCleaningThreshold, long lowestGlobalSequence, int maxGapOffset) {
        private static final boolean BATCH_OPTIMIZATION_DISABLED = false;

        private BatchingEventStorageOperations(TransactionManager transactionManager, LegacyJpaEventStorageOperations legacyJpaOperations, GapAwareTrackingTokenOperations tokenOperations, int batchSize, Predicate<List<? extends DomainEventData<?>>> finalAggregateBatchPredicate, boolean fetchForAggregateUntilEmpty, int gapCleaningThreshold, long lowestGlobalSequence, int maxGapOffset) {
            this.transactionManager = transactionManager;
            this.legacyJpaOperations = legacyJpaOperations;
            this.tokenOperations = tokenOperations;
            this.batchSize = batchSize;
            this.finalAggregateBatchPredicate = (Predicate)ObjectUtils.getOrDefault(finalAggregateBatchPredicate, this::defaultFinalAggregateBatchPredicate);
            this.fetchForAggregateUntilEmpty = fetchForAggregateUntilEmpty;
            this.gapCleaningThreshold = gapCleaningThreshold;
            this.lowestGlobalSequence = lowestGlobalSequence;
            this.maxGapOffset = maxGapOffset;
        }

        Stream<? extends DomainEventData<?>> readEventData(String identifier, long firstSequenceNumber) {
            EventStreamSpliterator spliterator = new EventStreamSpliterator(lastItem -> (List)this.transactionManager.fetchInTransaction(() -> this.legacyJpaOperations.fetchDomainEvents(identifier, lastItem == null ? firstSequenceNumber : lastItem.getSequenceNumber() + 1L, this.batchSize)), this.finalAggregateBatchPredicate);
            return StreamSupport.stream(spliterator, false);
        }

        Stream<? extends TrackedEventData<?>> readEventData(TrackingToken trackingToken) {
            EventStreamSpliterator spliterator = new EventStreamSpliterator(lastItem -> this.fetchTrackedEvents(lastItem == null ? trackingToken : lastItem.trackingToken(), this.batchSize), batch -> false);
            return StreamSupport.stream(spliterator, false);
        }

        private List<? extends TrackedEventData<?>> fetchTrackedEvents(TrackingToken lastToken, int batchSize) {
            Assert.isTrue((lastToken == null || lastToken instanceof GapAwareTrackingToken ? 1 : 0) != 0, () -> String.format("Token [%s] is of the wrong type. Expected [%s]", lastToken, GapAwareTrackingToken.class.getSimpleName()));
            GapAwareTrackingToken previousToken = this.cleanedToken((GapAwareTrackingToken)lastToken);
            List entries = (List)this.transactionManager.fetchInTransaction(() -> this.legacyJpaOperations.fetchEvents(previousToken, batchSize));
            return this.legacyJpaOperations.entriesToEvents(previousToken, entries, this.tokenOperations.gapTimeoutThreshold(), this.lowestGlobalSequence, this.maxGapOffset);
        }

        private GapAwareTrackingToken cleanedToken(GapAwareTrackingToken lastToken) {
            if (lastToken != null && lastToken.getGaps().size() > this.gapCleaningThreshold) {
                return this.tokenOperations.withGapsCleaned(lastToken, this.indexAndTimestampBetweenGaps(lastToken));
            }
            return lastToken;
        }

        private List<Object[]> indexAndTimestampBetweenGaps(GapAwareTrackingToken lastToken) {
            return (List)this.transactionManager.fetchInTransaction(() -> this.legacyJpaOperations.indexAndTimestampBetweenGaps(lastToken));
        }

        private boolean defaultFinalAggregateBatchPredicate(List<? extends DomainEventData<?>> recentBatch) {
            return this.fetchForAggregateUntilEmpty() ? recentBatch.isEmpty() : recentBatch.size() < this.batchSize;
        }

        private static class EventStreamSpliterator<T>
        extends Spliterators.AbstractSpliterator<T> {
            private final Function<T, List<? extends T>> fetchFunction;
            private final Predicate<List<? extends T>> finalBatchPredicate;
            private Iterator<? extends T> iterator;
            private T lastItem;
            private boolean lastBatchFound;

            private EventStreamSpliterator(Function<T, List<? extends T>> fetchFunction, Predicate<List<? extends T>> finalBatchPredicate) {
                super(Long.MAX_VALUE, 4369);
                this.fetchFunction = fetchFunction;
                this.finalBatchPredicate = finalBatchPredicate;
            }

            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                Objects.requireNonNull(action);
                if (this.iterator == null || !this.iterator.hasNext()) {
                    if (this.lastBatchFound) {
                        return false;
                    }
                    List<T> items = this.fetchFunction.apply(this.lastItem);
                    this.lastBatchFound = this.finalBatchPredicate.test(items);
                    this.iterator = items.iterator();
                }
                if (!this.iterator.hasNext()) {
                    return false;
                }
                this.lastItem = this.iterator.next();
                action.accept(this.lastItem);
                return true;
            }
        }
    }

    private record AggregateSource(AtomicReference<AggregateBasedConsistencyMarker> markerReference, MessageStream<EventMessage<?>> source) {
    }
}

