/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event;

import io.axoniq.axonserver.grpc.event.Confirmation;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.EventStoreGrpc;
import io.axoniq.axonserver.grpc.event.EventWithToken;
import io.axoniq.axonserver.grpc.event.GetAggregateEventsRequest;
import io.axoniq.axonserver.grpc.event.GetAggregateSnapshotsRequest;
import io.axoniq.axonserver.grpc.event.GetEventsRequest;
import io.axoniq.axonserver.grpc.event.GetFirstTokenRequest;
import io.axoniq.axonserver.grpc.event.GetLastTokenRequest;
import io.axoniq.axonserver.grpc.event.GetTokenAtRequest;
import io.axoniq.axonserver.grpc.event.QueryEventsRequest;
import io.axoniq.axonserver.grpc.event.QueryEventsResponse;
import io.axoniq.axonserver.grpc.event.ReadHighestSequenceNrRequest;
import io.axoniq.axonserver.grpc.event.ReadHighestSequenceNrResponse;
import io.axoniq.axonserver.grpc.event.TrackingToken;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.AxonServerException;
import org.axonframework.axonserver.connector.event.AppendEventTransaction;
import org.axonframework.axonserver.connector.event.util.EventCipher;
import org.axonframework.axonserver.connector.event.util.GrpcExceptionParser;
import org.axonframework.axonserver.connector.util.BufferingSpliterator;
import org.axonframework.axonserver.connector.util.UpstreamAwareStreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerEventStoreClient {
    private final Logger logger = LoggerFactory.getLogger(AxonServerEventStoreClient.class);
    private final EventCipher eventCipher;
    private final AxonServerConnectionManager axonServerConnectionManager;
    private final int timeout;
    private final String defaultContext;
    private final int bufferCapacity;

    public AxonServerEventStoreClient(AxonServerConfiguration eventStoreConfiguration, AxonServerConnectionManager axonServerConnectionManager) {
        this.eventCipher = eventStoreConfiguration.getEventCipher();
        this.axonServerConnectionManager = axonServerConnectionManager;
        this.timeout = eventStoreConfiguration.getCommitTimeout();
        this.bufferCapacity = eventStoreConfiguration.getInitialNrOfPermits();
        this.defaultContext = eventStoreConfiguration.getContext();
    }

    @Deprecated
    public void shutdown() {
    }

    private EventStoreGrpc.EventStoreStub eventStoreStub(String context) {
        return EventStoreGrpc.newStub(this.getChannelToEventStore(context));
    }

    private Channel getChannelToEventStore(String context) {
        return this.axonServerConnectionManager.getChannel(context);
    }

    @Deprecated
    public Stream<Event> listAggregateEvents(GetAggregateEventsRequest request) {
        return this.listAggregateEvents(this.defaultContext, request);
    }

    public Stream<Event> listAggregateEvents(String context, GetAggregateEventsRequest request) {
        BufferingSpliterator queue = new BufferingSpliterator();
        StreamingEventStreamObserver responseObserver = new StreamingEventStreamObserver(queue, context, request.getAggregateId());
        this.eventStoreStub(context).listAggregateEvents(request, (StreamObserver<Event>)responseObserver);
        return (Stream)StreamSupport.stream(queue, false).onClose(() -> queue.cancel(null));
    }

    @Deprecated
    public StreamObserver<GetEventsRequest> listEvents(StreamObserver<EventWithToken> responseStreamObserver) {
        return this.listEvents(this.defaultContext, responseStreamObserver);
    }

    public StreamObserver<GetEventsRequest> listEvents(final String context, final StreamObserver<EventWithToken> responseStreamObserver) {
        StreamObserver<EventWithToken> wrappedStreamObserver = new StreamObserver<EventWithToken>(){

            public void onNext(EventWithToken eventWithToken) {
                responseStreamObserver.onNext((Object)AxonServerEventStoreClient.this.eventCipher.decrypt(eventWithToken));
            }

            public void onError(Throwable throwable) {
                AxonServerEventStoreClient.this.checkConnectionException(throwable, context);
                responseStreamObserver.onError((Throwable)GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
                responseStreamObserver.onCompleted();
            }
        };
        return this.eventStoreStub(context).listEvents(wrappedStreamObserver);
    }

    @Deprecated
    public CompletableFuture<Confirmation> appendSnapshot(Event snapshot) {
        return this.appendSnapshot(this.defaultContext, snapshot);
    }

    public CompletableFuture<Confirmation> appendSnapshot(String context, Event snapshot) {
        CompletableFuture<Confirmation> confirmationFuture = new CompletableFuture<Confirmation>();
        this.eventStoreStub(context).appendSnapshot(this.eventCipher.encrypt(snapshot), new SingleResultStreamObserver<Confirmation>(context, confirmationFuture));
        return confirmationFuture;
    }

    @Deprecated
    public CompletableFuture<TrackingToken> getLastToken() {
        return this.getLastToken(this.defaultContext);
    }

    public CompletableFuture<TrackingToken> getLastToken(String context) {
        CompletableFuture<TrackingToken> trackingTokenFuture = new CompletableFuture<TrackingToken>();
        this.eventStoreStub(context).getLastToken(GetLastTokenRequest.getDefaultInstance(), new SingleResultStreamObserver<TrackingToken>(context, trackingTokenFuture));
        return trackingTokenFuture;
    }

    @Deprecated
    public CompletableFuture<TrackingToken> getFirstToken() {
        return this.getFirstToken(this.defaultContext);
    }

    public CompletableFuture<TrackingToken> getFirstToken(String context) {
        CompletableFuture<TrackingToken> trackingTokenFuture = new CompletableFuture<TrackingToken>();
        this.eventStoreStub(context).getFirstToken(GetFirstTokenRequest.getDefaultInstance(), new SingleResultStreamObserver<TrackingToken>(context, trackingTokenFuture));
        return trackingTokenFuture;
    }

    @Deprecated
    public CompletableFuture<TrackingToken> getTokenAt(Instant instant) {
        return this.getTokenAt(this.defaultContext, instant);
    }

    public CompletableFuture<TrackingToken> getTokenAt(String context, Instant instant) {
        CompletableFuture<TrackingToken> trackingTokenFuture = new CompletableFuture<TrackingToken>();
        this.eventStoreStub(context).getTokenAt(GetTokenAtRequest.newBuilder().setInstant(instant.toEpochMilli()).build(), new SingleResultStreamObserver<TrackingToken>(context, trackingTokenFuture));
        return trackingTokenFuture;
    }

    @Deprecated
    public AppendEventTransaction createAppendEventConnection() {
        return this.createAppendEventConnection(this.defaultContext);
    }

    public AppendEventTransaction createAppendEventConnection(final String context) {
        final CompletableFuture<Confirmation> futureConfirmation = new CompletableFuture<Confirmation>();
        return new AppendEventTransaction(this.timeout, this.eventStoreStub(context).appendEvent(new StreamObserver<Confirmation>(){

            public void onNext(Confirmation confirmation) {
                futureConfirmation.complete(confirmation);
            }

            public void onError(Throwable throwable) {
                AxonServerEventStoreClient.this.checkConnectionException(throwable, context);
                futureConfirmation.completeExceptionally(GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
            }
        }), futureConfirmation, this.eventCipher);
    }

    @Deprecated
    public StreamObserver<QueryEventsRequest> query(StreamObserver<QueryEventsResponse> responseStreamObserver) {
        return this.query(this.defaultContext, responseStreamObserver);
    }

    public StreamObserver<QueryEventsRequest> query(final String context, final StreamObserver<QueryEventsResponse> responseStreamObserver) {
        StreamObserver<QueryEventsResponse> wrappedStreamObserver = new StreamObserver<QueryEventsResponse>(){

            public void onNext(QueryEventsResponse eventWithToken) {
                responseStreamObserver.onNext((Object)eventWithToken);
            }

            public void onError(Throwable throwable) {
                AxonServerEventStoreClient.this.checkConnectionException(throwable, context);
                responseStreamObserver.onError((Throwable)GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
                responseStreamObserver.onCompleted();
            }
        };
        return this.eventStoreStub(context).queryEvents(wrappedStreamObserver);
    }

    private void checkConnectionException(Throwable ex, String context) {
        if (ex instanceof StatusRuntimeException && ((StatusRuntimeException)ex).getStatus().getCode().equals((Object)Status.UNAVAILABLE.getCode())) {
            this.stopChannelToEventStore(context);
        }
    }

    private void stopChannelToEventStore(String context) {
        this.axonServerConnectionManager.disconnect(context);
    }

    @Deprecated
    public CompletableFuture<ReadHighestSequenceNrResponse> lastSequenceNumberFor(String aggregateIdentifier) {
        return this.lastSequenceNumberFor(this.defaultContext, aggregateIdentifier);
    }

    public CompletableFuture<ReadHighestSequenceNrResponse> lastSequenceNumberFor(String context, String aggregateIdentifier) {
        CompletableFuture<ReadHighestSequenceNrResponse> completableFuture = new CompletableFuture<ReadHighestSequenceNrResponse>();
        this.eventStoreStub(context).readHighestSequenceNr(ReadHighestSequenceNrRequest.newBuilder().setAggregateId(aggregateIdentifier).build(), new SingleResultStreamObserver<ReadHighestSequenceNrResponse>(context, completableFuture));
        return completableFuture;
    }

    @Deprecated
    public Stream<Event> listAggregateSnapshots(GetAggregateSnapshotsRequest request) {
        return this.listAggregateSnapshots(this.defaultContext, request);
    }

    public Stream<Event> listAggregateSnapshots(String context, GetAggregateSnapshotsRequest request) {
        BufferingSpliterator queue = new BufferingSpliterator(this.bufferCapacity);
        StreamingEventStreamObserver responseObserver = new StreamingEventStreamObserver(queue, context, request.getAggregateId());
        this.eventStoreStub(context).listAggregateSnapshots(request, (StreamObserver<Event>)responseObserver);
        return (Stream)StreamSupport.stream(queue, false).onClose(() -> queue.cancel(null));
    }

    private class StreamingEventStreamObserver
    extends UpstreamAwareStreamObserver<Event> {
        private final long before;
        private final String aggregateId;
        private final BufferingSpliterator<Event> events;
        private final String context;
        private int count;

        private StreamingEventStreamObserver(BufferingSpliterator<Event> queue, String context, String aggregateId) {
            this.context = context;
            this.before = System.currentTimeMillis();
            this.events = queue;
            this.aggregateId = aggregateId;
        }

        public void onNext(Event event) {
            if (!this.events.put(AxonServerEventStoreClient.this.eventCipher.decrypt(event))) {
                this.getRequestStream().cancel("Client requested cancellation", null);
            }
            ++this.count;
        }

        public void onError(Throwable throwable) {
            AxonServerEventStoreClient.this.checkConnectionException(throwable, this.context);
            this.events.cancel(throwable);
        }

        public void onCompleted() {
            this.events.cancel(null);
            if (AxonServerEventStoreClient.this.logger.isDebugEnabled()) {
                AxonServerEventStoreClient.this.logger.debug("Done request for {}: {}ms, {} events", new Object[]{this.aggregateId, System.currentTimeMillis() - this.before, this.count});
            }
        }
    }

    private class SingleResultStreamObserver<T>
    implements StreamObserver<T> {
        private final String context;
        private final CompletableFuture<T> future;

        private SingleResultStreamObserver(String context, CompletableFuture<T> future) {
            this.context = context;
            this.future = future;
        }

        public void onNext(T t) {
            this.future.complete(t);
        }

        public void onError(Throwable throwable) {
            AxonServerEventStoreClient.this.checkConnectionException(throwable, this.context);
            this.future.completeExceptionally(GrpcExceptionParser.parse(throwable));
        }

        public void onCompleted() {
            if (!this.future.isDone()) {
                this.future.completeExceptionally((Throwable)((Object)new AxonServerException("AXONIQ-0001", "Async call completed before answer")));
            }
        }
    }
}

