/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventsourcing.eventstore.inmemory;

import jakarta.annotation.Nonnull;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.AppendCondition;
import org.axonframework.eventsourcing.eventstore.AppendEventsTransactionRejectedException;
import org.axonframework.eventsourcing.eventstore.ConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.EventsCondition;
import org.axonframework.eventsourcing.eventstore.GlobalIndexConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.SourcingCondition;
import org.axonframework.eventsourcing.eventstore.StreamingCondition;
import org.axonframework.eventsourcing.eventstore.Tag;
import org.axonframework.eventsourcing.eventstore.TaggedEventMessage;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.QualifiedName;
import org.axonframework.messaging.SimpleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryEventStorageEngine
implements EventStorageEngine {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final NavigableMap<Long, TaggedEventMessage<? extends EventMessage<?>>> eventStorage = new ConcurrentSkipListMap();
    private final ReentrantLock appendLock = new ReentrantLock();
    private final Set<MapBackedMessageStream> openStreams = new CopyOnWriteArraySet<MapBackedMessageStream>();
    private final long offset;

    public InMemoryEventStorageEngine() {
        this(0L);
    }

    public InMemoryEventStorageEngine(long offset) {
        this.offset = offset;
    }

    @Override
    public CompletableFuture<EventStorageEngine.AppendTransaction> appendEvents(final @Nonnull AppendCondition condition, final @Nonnull List<TaggedEventMessage<?>> events) {
        if (this.containsConflicts(condition)) {
            return CompletableFuture.failedFuture((Throwable)((Object)AppendEventsTransactionRejectedException.conflictingEventsDetected(condition.consistencyMarker())));
        }
        return CompletableFuture.completedFuture(new EventStorageEngine.AppendTransaction(){
            private final AtomicBoolean finished = new AtomicBoolean(false);

            @Override
            public CompletableFuture<ConsistencyMarker> commit() {
                if (this.finished.getAndSet(true)) {
                    return CompletableFuture.failedFuture(new IllegalStateException("Already committed or rolled back"));
                }
                InMemoryEventStorageEngine.this.appendLock.lock();
                try {
                    if (InMemoryEventStorageEngine.this.containsConflicts(condition)) {
                        CompletableFuture<ConsistencyMarker> completableFuture = CompletableFuture.failedFuture((Throwable)((Object)AppendEventsTransactionRejectedException.conflictingEventsDetected(condition.consistencyMarker())));
                        return completableFuture;
                    }
                    Optional<ConsistencyMarker> newHead = events.stream().map(event -> {
                        long next = InMemoryEventStorageEngine.this.nextIndex();
                        InMemoryEventStorageEngine.this.eventStorage.put(next, (TaggedEventMessage<EventMessage<?>>)event);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Appended event [{}] with position [{}] and timestamp [{}].", new Object[]{event.event().getIdentifier(), next, event.event().getTimestamp()});
                        }
                        return new GlobalIndexConsistencyMarker(next);
                    }).reduce(ConsistencyMarker::upperBound);
                    InMemoryEventStorageEngine.this.openStreams.forEach(m -> m.callback.get().run());
                    CompletableFuture<ConsistencyMarker> completableFuture = CompletableFuture.completedFuture(newHead.orElse(ConsistencyMarker.ORIGIN));
                    return completableFuture;
                }
                finally {
                    InMemoryEventStorageEngine.this.appendLock.unlock();
                }
            }

            @Override
            public void rollback() {
                this.finished.set(true);
            }
        });
    }

    private long nextIndex() {
        return this.eventStorage.isEmpty() ? 0L : (Long)this.eventStorage.lastKey() + 1L;
    }

    private boolean containsConflicts(AppendCondition condition) {
        if (Objects.equals(condition.consistencyMarker(), ConsistencyMarker.INFINITY)) {
            return false;
        }
        return this.eventStorage.tailMap(GlobalIndexConsistencyMarker.position(condition.consistencyMarker()) + 1L).values().stream().map(event -> event).anyMatch(taggedEvent -> condition.matches(taggedEvent.event().type().qualifiedName(), taggedEvent.tags()));
    }

    @Override
    public MessageStream<EventMessage<?>> source(@Nonnull SourcingCondition condition) {
        if (logger.isDebugEnabled()) {
            logger.debug("Start sourcing events with condition [{}].", (Object)condition);
        }
        return this.eventsToMessageStream(condition.start(), this.eventStorage.isEmpty() ? -1L : Math.min(condition.end(), (Long)this.eventStorage.lastKey()), condition);
    }

    @Override
    public MessageStream<EventMessage<?>> stream(@Nonnull StreamingCondition condition) {
        if (logger.isDebugEnabled()) {
            logger.debug("Start streaming events with condition [{}].", (Object)condition);
        }
        return this.eventsToMessageStream(condition.position().position().orElse(-1L) + 1L, Long.MAX_VALUE, condition);
    }

    private MessageStream<EventMessage<?>> eventsToMessageStream(long start, long end, EventsCondition condition) {
        MapBackedMessageStream mapBackedMessageStream = new MapBackedMessageStream(start, end, condition);
        this.openStreams.add(mapBackedMessageStream);
        return mapBackedMessageStream;
    }

    private static boolean match(TaggedEventMessage<?> taggedEvent, EventsCondition condition) {
        QualifiedName qualifiedName = taggedEvent.event().type().qualifiedName();
        return condition.matches(qualifiedName, taggedEvent.tags());
    }

    @Override
    public CompletableFuture<TrackingToken> tailToken() {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation tailToken() is invoked.");
        }
        return CompletableFuture.completedFuture(this.eventStorage.isEmpty() ? new GlobalSequenceTrackingToken(this.offset - 1L) : new GlobalSequenceTrackingToken((Long)this.eventStorage.firstKey() - 1L));
    }

    @Override
    public CompletableFuture<TrackingToken> headToken() {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation headToken() is invoked.");
        }
        return CompletableFuture.completedFuture(this.eventStorage.isEmpty() ? new GlobalSequenceTrackingToken(this.offset - 1L) : new GlobalSequenceTrackingToken(((Long)this.eventStorage.lastKey()).longValue()));
    }

    @Override
    public CompletableFuture<TrackingToken> tokenAt(@Nonnull Instant at) {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation tokenAt() is invoked with Instant [{}].", (Object)at);
        }
        return this.eventStorage.entrySet().stream().filter(positionToEventEntry -> {
            Object event = ((TaggedEventMessage)positionToEventEntry.getValue()).event();
            Instant eventTimestamp = event.getTimestamp();
            logger.debug("instant [{}]", (Object)eventTimestamp);
            return eventTimestamp.equals(at) || eventTimestamp.isAfter(at);
        }).map(Map.Entry::getKey).min(Comparator.comparingLong(Long::longValue)).map(position -> position - 1L).map(GlobalSequenceTrackingToken::new).map(tt -> tt).map(CompletableFuture::completedFuture).orElseGet(this::headToken);
    }

    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("offset", Long.valueOf(this.offset));
    }

    private class MapBackedMessageStream
    implements MessageStream<EventMessage<?>> {
        private final AtomicLong position;
        private final AtomicReference<Runnable> callback;
        private final long end;
        private final EventsCondition condition;

        public MapBackedMessageStream(long start, long end, EventsCondition condition) {
            this.end = end;
            this.condition = condition;
            this.position = new AtomicLong(start);
            this.callback = new AtomicReference<Runnable>(() -> {});
        }

        public Optional<MessageStream.Entry<EventMessage<?>>> next() {
            long currentPosition = this.position.get();
            while (currentPosition <= this.end && InMemoryEventStorageEngine.this.eventStorage.containsKey(currentPosition) && this.position.compareAndSet(currentPosition, currentPosition + 1L)) {
                TaggedEventMessage nextEvent = (TaggedEventMessage)InMemoryEventStorageEngine.this.eventStorage.get(currentPosition);
                if (InMemoryEventStorageEngine.match(nextEvent, this.condition)) {
                    Context context = Context.empty();
                    context = TrackingToken.addToContext((Context)context, (TrackingToken)new GlobalSequenceTrackingToken(currentPosition));
                    context = Tag.addToContext(context, nextEvent.tags());
                    context = ConsistencyMarker.addToContext(context, new GlobalIndexConsistencyMarker(this.end));
                    return Optional.of(new SimpleEntry(nextEvent.event(), context));
                }
                currentPosition = this.position.get();
            }
            return Optional.empty();
        }

        public void onAvailable(@Nonnull Runnable callback) {
            this.callback.set(callback);
            if (InMemoryEventStorageEngine.this.eventStorage.isEmpty() || InMemoryEventStorageEngine.this.eventStorage.containsKey(this.position.get())) {
                callback.run();
            }
        }

        public Optional<Throwable> error() {
            return Optional.empty();
        }

        public boolean isCompleted() {
            long currentPosition = this.position.get();
            return currentPosition > this.end;
        }

        public boolean hasNextAvailable() {
            long currentPosition = this.position.get();
            return currentPosition <= this.end && InMemoryEventStorageEngine.this.eventStorage.containsKey(currentPosition);
        }

        public void close() {
            this.position.set(this.end + 1L);
        }
    }
}

