/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event;

import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.grpc.event.dcb.SequencedEvent;
import io.axoniq.axonserver.grpc.event.dcb.StreamEventsResponse;
import jakarta.annotation.Nonnull;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.Optional;
import org.axonframework.axonserver.connector.event.TaggedEventConverter;
import org.axonframework.common.annotations.Internal;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.processors.streaming.token.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.processors.streaming.token.TrackingToken;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.SimpleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamingEventMessageStream
implements MessageStream<EventMessage> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final ResultStream<StreamEventsResponse> stream;
    private final TaggedEventConverter converter;

    public StreamingEventMessageStream(@Nonnull ResultStream<StreamEventsResponse> stream, @Nonnull TaggedEventConverter converter) {
        this.stream = Objects.requireNonNull(stream, "The result stream cannot be null.");
        this.converter = Objects.requireNonNull(converter, "The converter cannot be null.");
    }

    public Optional<MessageStream.Entry<EventMessage>> next() {
        StreamEventsResponse response = (StreamEventsResponse)this.stream.nextIfAvailable();
        if (response == null) {
            logger.debug("There are no more events to stream at this moment in time.");
            return Optional.empty();
        }
        return Optional.of(this.convertToEntry(response.getEvent()));
    }

    private SimpleEntry<EventMessage> convertToEntry(SequencedEvent event) {
        EventMessage eventMessage = this.converter.convertEvent(event.getEvent());
        GlobalSequenceTrackingToken token = new GlobalSequenceTrackingToken(event.getSequence() + 1L);
        Context context = Context.with((Context.ResourceKey)TrackingToken.RESOURCE_KEY, (Object)token);
        return new SimpleEntry((Message)eventMessage, context);
    }

    public Optional<MessageStream.Entry<EventMessage>> peek() {
        StreamEventsResponse response = (StreamEventsResponse)this.stream.peek();
        if (response == null) {
            logger.debug("There are no more events to peek at this moment in time.");
            return Optional.empty();
        }
        return Optional.of(this.convertToEntry(response.getEvent()));
    }

    public void setCallback(@Nonnull Runnable callback) {
        this.stream.onAvailable(callback);
    }

    public Optional<Throwable> error() {
        return this.stream.getError();
    }

    public boolean isCompleted() {
        return this.stream.isClosed();
    }

    public boolean hasNextAvailable() {
        return this.stream.peek() != null;
    }

    public void close() {
        this.stream.close();
    }
}

