/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventsourcing.eventstore;

import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.io.IOUtils;
import org.axonframework.configuration.LifecycleRegistry;
import org.axonframework.eventhandling.EventBusSpanFactory;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventStream;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.AbstractLegacyEventStore;
import org.axonframework.eventsourcing.eventstore.LegacyEventStorageEngine;
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.monitoring.MessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated(since="5.0.0")
public class LegacyEmbeddedEventStore
extends AbstractLegacyEventStore
implements Lifecycle {
    private static final Logger logger = LoggerFactory.getLogger(LegacyEmbeddedEventStore.class);
    private static final ThreadGroup THREAD_GROUP = new ThreadGroup(LegacyEmbeddedEventStore.class.getSimpleName());
    private static final String OPTIMIZE_EVENT_CONSUMPTION_SYSTEM_PROPERTY = "optimize-event-consumption";
    private final Lock consumerLock = new ReentrantLock();
    private final Condition consumableEventsCondition = this.consumerLock.newCondition();
    private final Set<EventConsumer> tailingConsumers = new CopyOnWriteArraySet<EventConsumer>();
    private final EventProducer producer;
    private final long cleanupDelayMillis;
    private final ThreadFactory threadFactory;
    private final boolean optimizeEventConsumption;
    private final ScheduledExecutorService cleanupService;
    private final AtomicBoolean producerStarted = new AtomicBoolean();
    private volatile Node oldest;

    protected LegacyEmbeddedEventStore(Builder builder) {
        super(builder);
        this.threadFactory = builder.threadFactory;
        this.optimizeEventConsumption = builder.optimizeEventConsumption;
        this.cleanupService = Executors.newScheduledThreadPool(1, this.threadFactory);
        TimeUnit timeUnit = builder.timeUnit;
        this.producer = new EventProducer(timeUnit.toNanos(builder.fetchDelay), builder.cachedEvents);
        this.cleanupDelayMillis = timeUnit.toMillis(builder.cleanupDelay);
    }

    public static Builder builder() {
        return new Builder();
    }

    public void shutDown() {
        this.tailingConsumers.forEach(IOUtils::closeQuietly);
        IOUtils.closeQuietly((AutoCloseable)this.producer);
        this.cleanupService.shutdownNow();
    }

    private void ensureProducerStarted() {
        if (this.producerStarted.compareAndSet(false, true)) {
            this.threadFactory.newThread(() -> {
                try {
                    this.producer.run();
                }
                catch (InterruptedException e) {
                    logger.warn("Producer thread was interrupted. Shutting down event store.", (Throwable)e);
                    Thread.currentThread().interrupt();
                }
            }).start();
            this.cleanupService.scheduleWithFixedDelay(new Cleaner(), this.cleanupDelayMillis, this.cleanupDelayMillis, TimeUnit.MILLISECONDS);
        }
    }

    protected void afterCommit(List<? extends EventMessage<?>> events) {
        this.producer.fetchIfWaiting();
    }

    public TrackingEventStream openStream(TrackingToken trackingToken) {
        EventConsumer eventConsumer;
        Node node = this.findNode(trackingToken);
        if (node != null && this.optimizeEventConsumption) {
            eventConsumer = new EventConsumer(node);
            this.tailingConsumers.add(eventConsumer);
        } else {
            eventConsumer = new EventConsumer(trackingToken);
        }
        return eventConsumer;
    }

    private Node findNode(TrackingToken trackingToken) {
        Node node = this.oldest;
        while (node != null && !node.event.trackingToken().equals((Object)trackingToken)) {
            node = node.next;
        }
        return node;
    }

    public void registerLifecycleHandlers(@Nonnull LifecycleRegistry handle) {
        handle.onShutdown(0x3FFFFFF5, this::shutDown);
    }

    public static class Builder
    extends AbstractLegacyEventStore.Builder {
        private int cachedEvents = 10000;
        private long fetchDelay = 1000L;
        private long cleanupDelay = 10000L;
        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
        private ThreadFactory threadFactory = new AxonThreadFactory(THREAD_GROUP);
        private boolean optimizeEventConsumption = Builder.fetchEventConsumptionSystemPropertyOrDefault();

        private static boolean fetchEventConsumptionSystemPropertyOrDefault() {
            String optimizeEventConsumptionSystemProperty = System.getProperty(LegacyEmbeddedEventStore.OPTIMIZE_EVENT_CONSUMPTION_SYSTEM_PROPERTY);
            return optimizeEventConsumptionSystemProperty == null || Boolean.TRUE.toString().equalsIgnoreCase(optimizeEventConsumptionSystemProperty);
        }

        @Override
        public Builder storageEngine(LegacyEventStorageEngine storageEngine) {
            super.storageEngine(storageEngine);
            return this;
        }

        @Override
        public Builder messageMonitor(@Nonnull MessageMonitor<? super EventMessage<?>> messageMonitor) {
            super.messageMonitor((MessageMonitor)messageMonitor);
            return this;
        }

        public Builder spanFactory(@Nonnull EventBusSpanFactory spanFactory) {
            super.spanFactory(spanFactory);
            return this;
        }

        public Builder cachedEvents(int cachedEvents) {
            BuilderUtils.assertPositive((int)cachedEvents, (String)"The cached events count should be a positive number");
            this.cachedEvents = cachedEvents;
            return this;
        }

        public Builder fetchDelay(long fetchDelay) {
            BuilderUtils.assertPositive((long)fetchDelay, (String)"The fetch delay should be a positive number");
            this.fetchDelay = fetchDelay;
            return this;
        }

        public Builder cleanupDelay(long cleanupDelay) {
            BuilderUtils.assertPositive((long)cleanupDelay, (String)"The clean-up delay should be a positive number");
            this.cleanupDelay = cleanupDelay;
            return this;
        }

        public Builder timeUnit(TimeUnit timeUnit) {
            BuilderUtils.assertNonNull((Object)((Object)timeUnit), (String)"TimeUnit may not be null");
            this.timeUnit = timeUnit;
            return this;
        }

        public Builder threadFactory(ThreadFactory threadFactory) {
            BuilderUtils.assertNonNull((Object)threadFactory, (String)"ThreadFactory may not be null");
            this.threadFactory = threadFactory;
            return this;
        }

        public Builder optimizeEventConsumption(boolean optimizeEventConsumption) {
            this.optimizeEventConsumption = optimizeEventConsumption;
            return this;
        }

        public LegacyEmbeddedEventStore build() {
            return new LegacyEmbeddedEventStore(this);
        }

        @Override
        protected void validate() throws AxonConfigurationException {
            super.validate();
        }
    }

    private class EventProducer
    implements AutoCloseable {
        private final Lock lock = new ReentrantLock();
        private final Condition dataAvailableCondition = this.lock.newCondition();
        private final long fetchDelayNanos;
        private final int cachedEvents;
        private volatile boolean shouldFetch;
        private volatile boolean closed;
        private Stream<? extends TrackedEventMessage<?>> eventStream;
        private Node newest;

        private EventProducer(long fetchDelayNanos, int cachedEvents) {
            this.fetchDelayNanos = fetchDelayNanos;
            this.cachedEvents = cachedEvents;
        }

        private void run() throws InterruptedException {
            boolean dataFound = false;
            while (!this.closed) {
                this.shouldFetch = true;
                while (this.shouldFetch) {
                    this.shouldFetch = false;
                    dataFound = this.fetchData();
                }
                if (dataFound) continue;
                this.waitForData();
            }
        }

        private void waitForData() throws InterruptedException {
            this.lock.lock();
            try {
                if (!this.shouldFetch) {
                    this.dataAvailableCondition.awaitNanos(this.fetchDelayNanos);
                }
            }
            finally {
                this.lock.unlock();
            }
        }

        private void fetchIfWaiting() {
            this.shouldFetch = true;
            this.lock.lock();
            try {
                this.dataAvailableCondition.signalAll();
            }
            finally {
                this.lock.unlock();
            }
        }

        private boolean fetchData() {
            Node currentNewest = this.newest;
            if (!LegacyEmbeddedEventStore.this.tailingConsumers.isEmpty()) {
                try {
                    this.eventStream = LegacyEmbeddedEventStore.this.storageEngine().readEvents(this.lastToken(), true);
                    this.eventStream.forEach(event -> {
                        Node node = new Node(this.nextIndex(), this.lastToken(), (TrackedEventMessage<?>)event);
                        if (this.newest != null) {
                            this.newest.next = node;
                        }
                        this.newest = node;
                        if (LegacyEmbeddedEventStore.this.oldest == null) {
                            LegacyEmbeddedEventStore.this.oldest = node;
                        }
                        this.notifyConsumers();
                        this.trimCache();
                    });
                }
                catch (Exception e) {
                    logger.error("Failed to read events from the underlying event storage", (Throwable)e);
                }
            }
            return !Objects.equals(this.newest, currentNewest);
        }

        private TrackingToken lastToken() {
            if (this.newest == null) {
                List tokens = LegacyEmbeddedEventStore.this.tailingConsumers.stream().map(EventConsumer::lastToken).collect(Collectors.toList());
                return tokens.isEmpty() || tokens.contains(null) ? null : (TrackingToken)tokens.get(0);
            }
            return this.newest.event.trackingToken();
        }

        private long nextIndex() {
            return this.newest == null ? 0L : this.newest.index + 1L;
        }

        private void notifyConsumers() {
            LegacyEmbeddedEventStore.this.consumerLock.lock();
            try {
                LegacyEmbeddedEventStore.this.consumableEventsCondition.signalAll();
            }
            finally {
                LegacyEmbeddedEventStore.this.consumerLock.unlock();
            }
        }

        private void trimCache() {
            Node last = LegacyEmbeddedEventStore.this.oldest;
            while (this.newest != null && last != null && this.newest.index - last.index >= (long)this.cachedEvents) {
                last = last.next;
            }
            LegacyEmbeddedEventStore.this.oldest = last;
        }

        @Override
        public void close() {
            this.closed = true;
            if (this.eventStream != null) {
                this.eventStream.close();
            }
        }
    }

    private class Cleaner
    implements Runnable {
        private Cleaner() {
        }

        @Override
        public void run() {
            Node oldestCachedNode = LegacyEmbeddedEventStore.this.oldest;
            if (oldestCachedNode == null || oldestCachedNode.previousToken == null) {
                return;
            }
            LegacyEmbeddedEventStore.this.tailingConsumers.stream().filter(EventConsumer::behindGlobalCache).forEach(consumer -> {
                logger.debug("An event stream cannot read from the local cache. It either runs behind, or its current token cannot be found in the cache. Opening a dedicated stream.");
                consumer.stopTailingGlobalStream();
            });
        }
    }

    private static class Node {
        private final long index;
        private final TrackingToken previousToken;
        private final TrackedEventMessage<?> event;
        private volatile Node next;

        private Node(long index, TrackingToken previousToken, TrackedEventMessage<?> event) {
            this.index = index;
            this.previousToken = previousToken;
            this.event = event;
        }
    }

    private class EventConsumer
    implements TrackingEventStream {
        private Stream<? extends TrackedEventMessage<?>> privateStream;
        private Iterator<? extends TrackedEventMessage<?>> privateIterator;
        private volatile TrackingToken lastToken;
        private volatile Node lastNode;
        private TrackedEventMessage<?> peekedEvent;

        private EventConsumer(Node lastNode) {
            this(lastNode.event.trackingToken());
            this.lastNode = lastNode;
        }

        private EventConsumer(TrackingToken startToken) {
            this.lastToken = startToken;
        }

        public Optional<TrackedEventMessage<?>> peek() {
            return Optional.ofNullable(this.peekedEvent == null && !this.hasNextAvailable() ? null : this.peekedEvent);
        }

        public boolean hasNextAvailable(int timeout, TimeUnit unit) throws InterruptedException {
            return this.peekedEvent != null || (this.peekedEvent = this.peek(timeout, unit)) != null;
        }

        public TrackedEventMessage<?> nextAvailable() throws InterruptedException {
            while (this.peekedEvent == null) {
                this.peekedEvent = this.peek(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
            TrackedEventMessage<?> result = this.peekedEvent;
            this.peekedEvent = null;
            return result;
        }

        private TrackedEventMessage<?> peek(int timeout, TimeUnit timeUnit) throws InterruptedException {
            boolean allowSwitchToTailingConsumer = LegacyEmbeddedEventStore.this.optimizeEventConsumption;
            if (LegacyEmbeddedEventStore.this.tailingConsumers.contains(this)) {
                if (!this.behindGlobalCache()) {
                    return this.peekGlobalStream(timeout, timeUnit);
                }
                this.stopTailingGlobalStream();
                allowSwitchToTailingConsumer = false;
            }
            return this.peekPrivateStream(allowSwitchToTailingConsumer, timeout, timeUnit);
        }

        private boolean behindGlobalCache() {
            return LegacyEmbeddedEventStore.this.oldest != null && (this.lastNode != null ? this.lastNode.index < LegacyEmbeddedEventStore.this.oldest.index : this.nextNode() == null);
        }

        private void stopTailingGlobalStream() {
            LegacyEmbeddedEventStore.this.tailingConsumers.remove(this);
            this.lastNode = null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private TrackedEventMessage<?> peekGlobalStream(int timeout, TimeUnit timeUnit) throws InterruptedException {
            Node nextNode = this.nextNode();
            if (nextNode == null && timeout > 0) {
                LegacyEmbeddedEventStore.this.consumerLock.lock();
                try {
                    if (LegacyEmbeddedEventStore.this.consumableEventsCondition.await(timeout, timeUnit)) {
                        nextNode = this.nextNode();
                    }
                }
                finally {
                    LegacyEmbeddedEventStore.this.consumerLock.unlock();
                }
            }
            if (nextNode != null) {
                if (LegacyEmbeddedEventStore.this.tailingConsumers.contains(this)) {
                    this.lastNode = nextNode;
                }
                this.lastToken = nextNode.event.trackingToken();
                return nextNode.event;
            }
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private TrackedEventMessage<?> peekPrivateStream(boolean allowSwitchToTailingConsumer, int timeout, TimeUnit timeUnit) throws InterruptedException {
            if (this.privateIterator == null) {
                this.privateStream = LegacyEmbeddedEventStore.this.storageEngine().readEvents(this.lastToken, false);
                this.privateIterator = this.privateStream.iterator();
            }
            if (this.privateIterator.hasNext()) {
                TrackedEventMessage<?> nextEvent = this.privateIterator.next();
                this.lastToken = nextEvent.trackingToken();
                return nextEvent;
            }
            if (allowSwitchToTailingConsumer) {
                this.closePrivateStream();
                this.lastNode = LegacyEmbeddedEventStore.this.findNode(this.lastToken);
                LegacyEmbeddedEventStore.this.tailingConsumers.add(this);
                LegacyEmbeddedEventStore.this.ensureProducerStarted();
                return timeout > 0 ? this.peek(timeout, timeUnit) : null;
            }
            LegacyEmbeddedEventStore.this.consumerLock.lock();
            try {
                if (LegacyEmbeddedEventStore.this.consumableEventsCondition.await(timeout, timeUnit) && this.privateIterator.hasNext()) {
                    TrackedEventMessage<?> nextEvent = this.privateIterator.next();
                    this.lastToken = nextEvent.trackingToken();
                    TrackedEventMessage<?> trackedEventMessage = nextEvent;
                    return trackedEventMessage;
                }
                TrackedEventMessage<?> trackedEventMessage = null;
                return trackedEventMessage;
            }
            finally {
                LegacyEmbeddedEventStore.this.consumerLock.unlock();
            }
        }

        private Node nextNode() {
            Node node = this.lastNode;
            if (node != null) {
                return node.next;
            }
            node = LegacyEmbeddedEventStore.this.oldest;
            while (node != null && !Objects.equals(node.previousToken, this.lastToken)) {
                node = node.next;
            }
            return node;
        }

        private TrackingToken lastToken() {
            return this.lastToken;
        }

        public void close() {
            this.closePrivateStream();
            this.stopTailingGlobalStream();
        }

        private void closePrivateStream() {
            Optional.ofNullable(this.privateStream).ifPresent(stream -> {
                this.privateStream = null;
                this.privateIterator = null;
                stream.close();
            });
        }
    }
}

