/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event.axon;

import io.axoniq.axonserver.connector.event.EventStream;
import io.axoniq.axonserver.grpc.event.EventWithToken;
import java.lang.invoke.MethodHandles;
import java.util.Iterator;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
import org.axonframework.axonserver.connector.AxonServerException;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.event.axon.GrpcBackedDomainEventData;
import org.axonframework.eventhandling.DomainEventData;
import org.axonframework.eventhandling.EventUtils;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackedDomainEventData;
import org.axonframework.eventhandling.TrackedEventData;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventStream;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.serialization.SerializedType;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.UnknownSerializedType;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventBuffer
implements TrackingEventStream {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int MAX_AWAIT_AVAILABLE_DATA = 500;
    private final EventStream delegate;
    private final Iterator<TrackedEventMessage<?>> eventStream;
    private final Serializer serializer;
    private final boolean disableIgnoredEventFiltering;
    private TrackedEventMessage<?> peekEvent;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition dataAvailable = this.lock.newCondition();

    public EventBuffer(EventStream delegate, EventUpcaster upcasterChain, Serializer serializer, boolean disableIgnoredEventFiltering) {
        this.delegate = delegate;
        this.serializer = serializer;
        this.disableIgnoredEventFiltering = disableIgnoredEventFiltering;
        this.eventStream = EventUtils.upcastAndDeserializeTrackedEvents(StreamSupport.stream(new SimpleSpliterator<TrackedEventData>(this::poll), false), (Serializer)serializer, (EventUpcaster)upcasterChain).iterator();
        delegate.onAvailable(() -> {
            this.lock.lock();
            try {
                this.dataAvailable.signalAll();
            }
            finally {
                this.lock.unlock();
            }
        });
    }

    private TrackedEventData<byte[]> poll() {
        EventWithToken eventWithToken = (EventWithToken)this.delegate.nextIfAvailable();
        return eventWithToken == null ? null : this.convert(eventWithToken);
    }

    private TrackedEventData<byte[]> convert(EventWithToken eventWithToken) {
        GlobalSequenceTrackingToken trackingToken = new GlobalSequenceTrackingToken(eventWithToken.getToken());
        return new TrackedDomainEventData((TrackingToken)trackingToken, (DomainEventData)new GrpcBackedDomainEventData(eventWithToken.getEvent()));
    }

    public void skipMessagesWithPayloadTypeOf(TrackedEventMessage<?> ignoredMessage) {
        if (!this.disableIgnoredEventFiltering) {
            SerializedType serializedType;
            if (UnknownSerializedType.class.equals((Object)ignoredMessage.getPayloadType())) {
                UnknownSerializedType unknownSerializedType = (UnknownSerializedType)ignoredMessage.getPayload();
                serializedType = unknownSerializedType.serializedType();
            } else {
                serializedType = this.serializer.typeForClass(ignoredMessage.getPayloadType());
            }
            this.delegate.excludePayloadType(serializedType.getName(), serializedType.getRevision());
        }
    }

    public Optional<TrackedEventMessage<?>> peek() {
        return Optional.ofNullable(this.peekNullable());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean hasNextAvailable(int timeout, TimeUnit timeUnit) throws InterruptedException {
        long deadline = System.currentTimeMillis() + timeUnit.toMillis(timeout);
        try {
            while (this.peekNullable() == null && System.currentTimeMillis() < deadline) {
                this.lock.lock();
                try {
                    long waitTime = deadline - System.currentTimeMillis();
                    if (this.peekNullable() != null || waitTime <= 0L) continue;
                    boolean await = this.dataAvailable.await(Math.min(waitTime, 500L), TimeUnit.MILLISECONDS);
                    logger.trace(await ? "Signaled new events are available" : "No signal received for new events, exiting await");
                }
                finally {
                    this.lock.unlock();
                }
            }
            return this.peekNullable() != null;
        }
        catch (InterruptedException e) {
            logger.warn("Event consumer thread was interrupted. Returning thread to event processor.", (Throwable)e);
            Thread.currentThread().interrupt();
            throw e;
        }
    }

    private TrackedEventMessage<?> peekNullable() {
        if (this.peekEvent == null && this.eventStream.hasNext()) {
            this.peekEvent = this.eventStream.next();
        }
        if (this.peekEvent == null && this.delegate.isClosed()) {
            throw new AxonServerException(ErrorCode.OTHER.errorCode(), "The Event Stream has been closed, so no further events can be retrieved", this.delegate.getError().orElse(null));
        }
        return this.peekEvent;
    }

    public TrackedEventMessage<?> nextAvailable() throws InterruptedException {
        try {
            this.hasNextAvailable(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            TrackedEventMessage<?> trackedEventMessage = this.peekEvent == null ? this.eventStream.next() : this.peekEvent;
            return trackedEventMessage;
        }
        finally {
            this.peekEvent = null;
        }
    }

    public void close() {
        this.delegate.close();
    }

    public boolean setOnAvailableCallback(Runnable callback) {
        this.delegate.onAvailable(callback);
        return true;
    }

    private static class SimpleSpliterator<T>
    implements Spliterator<T> {
        private final Supplier<T> supplier;

        protected SimpleSpliterator(Supplier<T> supplier) {
            this.supplier = supplier;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            T nextValue = this.supplier.get();
            if (nextValue != null) {
                action.accept(nextValue);
            }
            return nextValue != null;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return 1296;
        }
    }
}

