/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventstreaming;

import jakarta.annotation.Nonnull;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventstreaming.AnyEvent;
import org.axonframework.eventstreaming.EventCriteria;
import org.axonframework.eventstreaming.StreamableEventSource;
import org.axonframework.eventstreaming.StreamingCondition;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.SimpleEntry;
import org.axonframework.messaging.StreamableMessageSource;

@Deprecated(since="5.0.0")
public class LegacyStreamableEventSource<E extends EventMessage>
implements StreamableEventSource<E> {
    private final StreamableMessageSource<E> delegate;

    public LegacyStreamableEventSource(@Nonnull StreamableMessageSource<E> delegate) {
        Objects.requireNonNull(delegate, "Delegate is required");
        this.delegate = delegate;
    }

    @Override
    public MessageStream<E> open(@Nonnull StreamingCondition condition) {
        TrackingToken position = condition.position();
        return new BlockingMessageStream<E>(this.delegate.openStream(position), condition.criteria());
    }

    @Override
    public CompletableFuture<TrackingToken> firstToken() {
        return this.delegate.firstToken();
    }

    @Override
    public CompletableFuture<TrackingToken> latestToken() {
        return this.delegate.latestToken();
    }

    @Override
    public CompletableFuture<TrackingToken> tokenAt(@Nonnull Instant at) {
        return this.delegate.tokenAt(at);
    }

    private static class BlockingMessageStream<E extends EventMessage>
    implements MessageStream<E> {
        private final BlockingStream<E> stream;

        BlockingMessageStream(BlockingStream<E> stream, EventCriteria criteria) {
            this.stream = stream;
            if (criteria != AnyEvent.INSTANCE) {
                throw new IllegalArgumentException("Only AnyEvent criteria is supported in this legacy adapter, but received: " + String.valueOf(criteria));
            }
        }

        @Override
        public Optional<MessageStream.Entry<E>> next() {
            try {
                if (!this.stream.hasNextAvailable()) {
                    return Optional.empty();
                }
                return Optional.ofNullable((EventMessage)this.stream.nextAvailable()).map(this::createEntryForMessage);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return Optional.empty();
            }
        }

        @Override
        public Optional<MessageStream.Entry<E>> peek() {
            return this.stream.peek().map(this::createEntryForMessage);
        }

        @Override
        public void onAvailable(@Nonnull Runnable callback) {
            this.stream.setOnAvailableCallback(callback);
        }

        @Override
        public Optional<Throwable> error() {
            return Optional.empty();
        }

        @Override
        public boolean isCompleted() {
            return !this.stream.hasNextAvailable() && this.stream.peek().isEmpty();
        }

        @Override
        public boolean hasNextAvailable() {
            return this.stream.hasNextAvailable();
        }

        @Override
        public void close() {
            this.stream.close();
        }

        private MessageStream.Entry<E> createEntryForMessage(E message) {
            Context context = Context.empty();
            if (message instanceof TrackedEventMessage) {
                TrackedEventMessage trackedMessage = (TrackedEventMessage)message;
                context = TrackingToken.addToContext(context, trackedMessage.trackingToken());
            }
            context = context.withResource(Message.RESOURCE_KEY, message);
            return new SimpleEntry<E>(message, context);
        }
    }
}

