/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.event.impl;

import io.axoniq.axonserver.connector.event.AggregateEventStream;
import io.axoniq.axonserver.connector.impl.FlowControlledBuffer;
import io.axoniq.axonserver.connector.impl.StreamClosedException;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.GetAggregateEventsRequest;
import io.grpc.stub.ClientCallStreamObserver;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferedAggregateEventStream
extends FlowControlledBuffer<Event, GetAggregateEventsRequest>
implements AggregateEventStream {
    private static final Logger logger = LoggerFactory.getLogger(BufferedAggregateEventStream.class);
    private static final Event TERMINAL_MESSAGE = Event.newBuilder().setAggregateSequenceNumber(-1729L).build();
    private final AtomicReference<Long> lastReceivedEventSequence = new AtomicReference();
    private Event peeked;

    public BufferedAggregateEventStream() {
        this(Integer.MAX_VALUE);
    }

    public BufferedAggregateEventStream(int bufferSize) {
        super("unused", bufferSize, 0);
    }

    @Override
    public Event next() throws InterruptedException {
        Event taken;
        if (this.peeked != null) {
            taken = this.peeked;
            this.peeked = null;
        } else {
            taken = (Event)this.take();
        }
        return taken;
    }

    @Override
    public boolean hasNext() {
        Throwable errorResult;
        if (this.peeked != null) {
            return true;
        }
        try {
            this.peeked = (Event)this.tryTake();
        }
        catch (InterruptedException e) {
            this.cancel();
            Thread.currentThread().interrupt();
            return false;
        }
        if (this.peeked == null && (errorResult = this.getErrorResult()) != null) {
            throw new StreamClosedException(errorResult);
        }
        return this.peeked != null;
    }

    @Override
    public void cancel() {
        this.outboundStream().cancel("Request cancelled by client", null);
    }

    @Override
    protected GetAggregateEventsRequest buildFlowControlMessage(FlowControl flowControl) {
        return null;
    }

    @Override
    protected Event terminalMessage() {
        return TERMINAL_MESSAGE;
    }

    @Override
    public void onNext(Event event) {
        Long prevSequence = this.lastReceivedEventSequence.get();
        if (prevSequence == null || prevSequence + 1L == event.getAggregateSequenceNumber()) {
            super.onNext(event);
            this.lastReceivedEventSequence.set(event.getAggregateSequenceNumber());
        } else {
            String message = String.format("Invalid sequence number for aggregate with identifier [%s]. Received seqNo: %d, expected seqNo: %d", event.getAggregateIdentifier(), event.getAggregateSequenceNumber(), prevSequence + 1L);
            logger.error(message);
            RuntimeException invalidAggregateEventStreamException = new RuntimeException(message);
            ClientCallStreamObserver outboundStream = this.outboundStream();
            if (outboundStream != null) {
                outboundStream.onError((Throwable)invalidAggregateEventStreamException);
            }
            this.onError(invalidAggregateEventStreamException);
        }
    }
}

