/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event;

import io.axoniq.axonserver.grpc.event.Confirmation;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.EventStoreGrpc;
import io.axoniq.axonserver.grpc.event.EventWithToken;
import io.axoniq.axonserver.grpc.event.GetAggregateEventsRequest;
import io.axoniq.axonserver.grpc.event.GetAggregateSnapshotsRequest;
import io.axoniq.axonserver.grpc.event.GetEventsRequest;
import io.axoniq.axonserver.grpc.event.GetFirstTokenRequest;
import io.axoniq.axonserver.grpc.event.GetLastTokenRequest;
import io.axoniq.axonserver.grpc.event.GetTokenAtRequest;
import io.axoniq.axonserver.grpc.event.QueryEventsRequest;
import io.axoniq.axonserver.grpc.event.QueryEventsResponse;
import io.axoniq.axonserver.grpc.event.ReadHighestSequenceNrRequest;
import io.axoniq.axonserver.grpc.event.ReadHighestSequenceNrResponse;
import io.axoniq.axonserver.grpc.event.TrackingToken;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.AxonServerException;
import org.axonframework.axonserver.connector.event.AppendEventTransaction;
import org.axonframework.axonserver.connector.event.util.EventCipher;
import org.axonframework.axonserver.connector.event.util.GrpcExceptionParser;
import org.axonframework.axonserver.connector.util.ContextAddingInterceptor;
import org.axonframework.axonserver.connector.util.TokenAddingInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerEventStoreClient {
    private final Logger logger = LoggerFactory.getLogger(AxonServerEventStoreClient.class);
    private final TokenAddingInterceptor tokenAddingInterceptor;
    private final ContextAddingInterceptor contextAddingInterceptor;
    private final EventCipher eventCipher;
    private final AxonServerConnectionManager axonServerConnectionManager;
    private boolean shutdown;

    public AxonServerEventStoreClient(AxonServerConfiguration eventStoreConfiguration, AxonServerConnectionManager axonServerConnectionManager) {
        this.tokenAddingInterceptor = new TokenAddingInterceptor(eventStoreConfiguration.getToken());
        this.eventCipher = eventStoreConfiguration.getEventCipher();
        this.axonServerConnectionManager = axonServerConnectionManager;
        this.contextAddingInterceptor = new ContextAddingInterceptor(eventStoreConfiguration.getContext());
    }

    public void shutdown() {
        this.shutdown = true;
    }

    private EventStoreGrpc.EventStoreStub eventStoreStub() {
        return (EventStoreGrpc.EventStoreStub)((EventStoreGrpc.EventStoreStub)EventStoreGrpc.newStub(this.getChannelToEventStore()).withInterceptors(new ClientInterceptor[]{this.tokenAddingInterceptor})).withInterceptors(new ClientInterceptor[]{this.contextAddingInterceptor});
    }

    private Channel getChannelToEventStore() {
        if (this.shutdown) {
            return null;
        }
        return this.axonServerConnectionManager.getChannel();
    }

    public Stream<Event> listAggregateEvents(final GetAggregateEventsRequest request) throws ExecutionException, InterruptedException {
        final CompletableFuture stream = new CompletableFuture();
        final long before = System.currentTimeMillis();
        this.eventStoreStub().listAggregateEvents(request, new StreamObserver<Event>(){
            Stream.Builder<Event> eventStream = Stream.builder();
            int count;

            public void onNext(Event event) {
                this.eventStream.accept(AxonServerEventStoreClient.this.eventCipher.decrypt(event));
                ++this.count;
            }

            public void onError(Throwable throwable) {
                AxonServerEventStoreClient.this.checkConnectionException(throwable);
                stream.completeExceptionally(GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
                stream.complete(this.eventStream.build());
                if (AxonServerEventStoreClient.this.logger.isDebugEnabled()) {
                    AxonServerEventStoreClient.this.logger.debug("Done request for {}: {}ms, {} events", new Object[]{request.getAggregateId(), System.currentTimeMillis() - before, this.count});
                }
            }
        });
        return (Stream)stream.get();
    }

    public StreamObserver<GetEventsRequest> listEvents(final StreamObserver<EventWithToken> responseStreamObserver) {
        StreamObserver<EventWithToken> wrappedStreamObserver = new StreamObserver<EventWithToken>(){

            public void onNext(EventWithToken eventWithToken) {
                responseStreamObserver.onNext((Object)AxonServerEventStoreClient.this.eventCipher.decrypt(eventWithToken));
            }

            public void onError(Throwable throwable) {
                AxonServerEventStoreClient.this.checkConnectionException(throwable);
                responseStreamObserver.onError((Throwable)GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
                responseStreamObserver.onCompleted();
            }
        };
        return this.eventStoreStub().listEvents(wrappedStreamObserver);
    }

    public CompletableFuture<Confirmation> appendSnapshot(Event snapshot) {
        CompletableFuture<Confirmation> confirmationFuture = new CompletableFuture<Confirmation>();
        this.eventStoreStub().appendSnapshot(this.eventCipher.encrypt(snapshot), new SingleResultStreamObserver<Confirmation>(confirmationFuture));
        return confirmationFuture;
    }

    public CompletableFuture<TrackingToken> getLastToken() {
        CompletableFuture<TrackingToken> trackingTokenFuture = new CompletableFuture<TrackingToken>();
        this.eventStoreStub().getLastToken(GetLastTokenRequest.getDefaultInstance(), new SingleResultStreamObserver<TrackingToken>(trackingTokenFuture));
        return trackingTokenFuture;
    }

    public CompletableFuture<TrackingToken> getFirstToken() {
        CompletableFuture<TrackingToken> trackingTokenFuture = new CompletableFuture<TrackingToken>();
        this.eventStoreStub().getFirstToken(GetFirstTokenRequest.getDefaultInstance(), new SingleResultStreamObserver<TrackingToken>(trackingTokenFuture));
        return trackingTokenFuture;
    }

    public CompletableFuture<TrackingToken> getTokenAt(Instant instant) {
        CompletableFuture<TrackingToken> trackingTokenFuture = new CompletableFuture<TrackingToken>();
        this.eventStoreStub().getTokenAt(GetTokenAtRequest.newBuilder().setInstant(instant.toEpochMilli()).build(), new SingleResultStreamObserver<TrackingToken>(trackingTokenFuture));
        return trackingTokenFuture;
    }

    public AppendEventTransaction createAppendEventConnection() {
        final CompletableFuture<Confirmation> futureConfirmation = new CompletableFuture<Confirmation>();
        return new AppendEventTransaction(this.eventStoreStub().appendEvent(new StreamObserver<Confirmation>(){

            public void onNext(Confirmation confirmation) {
                futureConfirmation.complete(confirmation);
            }

            public void onError(Throwable throwable) {
                AxonServerEventStoreClient.this.checkConnectionException(throwable);
                futureConfirmation.completeExceptionally(GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
            }
        }), futureConfirmation, this.eventCipher);
    }

    public StreamObserver<QueryEventsRequest> query(final StreamObserver<QueryEventsResponse> responseStreamObserver) {
        StreamObserver<QueryEventsResponse> wrappedStreamObserver = new StreamObserver<QueryEventsResponse>(){

            public void onNext(QueryEventsResponse eventWithToken) {
                responseStreamObserver.onNext((Object)eventWithToken);
            }

            public void onError(Throwable throwable) {
                AxonServerEventStoreClient.this.checkConnectionException(throwable);
                responseStreamObserver.onError((Throwable)GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
                responseStreamObserver.onCompleted();
            }
        };
        return this.eventStoreStub().queryEvents(wrappedStreamObserver);
    }

    private void checkConnectionException(Throwable ex) {
        if (ex instanceof StatusRuntimeException && ((StatusRuntimeException)ex).getStatus().getCode().equals((Object)Status.UNAVAILABLE.getCode())) {
            this.stopChannelToEventStore();
        }
    }

    private void stopChannelToEventStore() {
    }

    public CompletableFuture<ReadHighestSequenceNrResponse> lastSequenceNumberFor(String aggregateIdentifier) {
        CompletableFuture<ReadHighestSequenceNrResponse> completableFuture = new CompletableFuture<ReadHighestSequenceNrResponse>();
        this.eventStoreStub().readHighestSequenceNr(ReadHighestSequenceNrRequest.newBuilder().setAggregateId(aggregateIdentifier).build(), new SingleResultStreamObserver<ReadHighestSequenceNrResponse>(completableFuture));
        return completableFuture;
    }

    public Stream<Event> listAggregateSnapshots(final GetAggregateSnapshotsRequest request) throws ExecutionException, InterruptedException {
        final CompletableFuture stream = new CompletableFuture();
        final long before = System.currentTimeMillis();
        this.eventStoreStub().listAggregateSnapshots(request, new StreamObserver<Event>(){
            Stream.Builder<Event> eventStream = Stream.builder();
            int count;

            public void onNext(Event event) {
                this.eventStream.accept(AxonServerEventStoreClient.this.eventCipher.decrypt(event));
                ++this.count;
            }

            public void onError(Throwable throwable) {
                AxonServerEventStoreClient.this.checkConnectionException(throwable);
                stream.completeExceptionally(GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
                stream.complete(this.eventStream.build());
                if (AxonServerEventStoreClient.this.logger.isDebugEnabled()) {
                    AxonServerEventStoreClient.this.logger.debug("Done request for {}: {}ms, {} events", new Object[]{request.getAggregateId(), System.currentTimeMillis() - before, this.count});
                }
            }
        });
        return (Stream)stream.get();
    }

    private class SingleResultStreamObserver<T>
    implements StreamObserver<T> {
        private final CompletableFuture<T> future;

        private SingleResultStreamObserver(CompletableFuture<T> future) {
            this.future = future;
        }

        public void onNext(T t) {
            this.future.complete(t);
        }

        public void onError(Throwable throwable) {
            AxonServerEventStoreClient.this.checkConnectionException(throwable);
            this.future.completeExceptionally(GrpcExceptionParser.parse(throwable));
        }

        public void onCompleted() {
            if (!this.future.isDone()) {
                this.future.completeExceptionally(new AxonServerException("AXONIQ-0001", "Async call completed before answer"));
            }
        }
    }
}

