/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event.axon;

import com.google.protobuf.ByteString;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.EventWithToken;
import io.axoniq.axonserver.grpc.event.GetAggregateEventsRequest;
import io.axoniq.axonserver.grpc.event.GetEventsRequest;
import io.axoniq.axonserver.grpc.event.QueryEventsRequest;
import io.axoniq.axonserver.grpc.event.QueryEventsResponse;
import io.axoniq.axonserver.grpc.event.ReadHighestSequenceNrResponse;
import io.axoniq.axonserver.grpc.event.TrackingToken;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.PlatformConnectionManager;
import org.axonframework.axonserver.connector.event.AppendEventTransaction;
import org.axonframework.axonserver.connector.event.AxonDBClient;
import org.axonframework.axonserver.connector.event.axon.EventBuffer;
import org.axonframework.axonserver.connector.event.axon.GrpcBackedDomainEventData;
import org.axonframework.axonserver.connector.event.axon.GrpcMetaDataAwareSerializer;
import org.axonframework.axonserver.connector.event.axon.QueryResultBuffer;
import org.axonframework.axonserver.connector.event.axon.QueryResultStream;
import org.axonframework.axonserver.connector.util.FlowControllingStreamObserver;
import org.axonframework.axonserver.connector.util.GrpcMetaDataConverter;
import org.axonframework.common.Assert;
import org.axonframework.common.ObjectUtils;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.GenericDomainEventMessage;
import org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.AbstractEventStore;
import org.axonframework.eventsourcing.eventstore.DomainEventData;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.EventStoreException;
import org.axonframework.eventsourcing.eventstore.EventUtils;
import org.axonframework.eventsourcing.eventstore.GlobalSequenceTrackingToken;
import org.axonframework.eventsourcing.eventstore.TrackedEventData;
import org.axonframework.eventsourcing.eventstore.TrackingEventStream;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.NoOpEventUpcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerEventStore
extends AbstractEventStore {
    private static final Logger logger = LoggerFactory.getLogger(AxonServerEventStore.class);

    public AxonServerEventStore(AxonServerConfiguration configuration, PlatformConnectionManager platformConnectionManager, Serializer serializer) {
        this(configuration, platformConnectionManager, serializer, (EventUpcaster)NoOpEventUpcaster.INSTANCE);
    }

    public AxonServerEventStore(AxonServerConfiguration configuration, PlatformConnectionManager platformConnectionManager, Serializer serializer, EventUpcaster upcasterChain) {
        super((EventStorageEngine)new AxonIQEventStorageEngine(serializer, upcasterChain, configuration, new AxonDBClient(configuration, platformConnectionManager)));
    }

    public AxonServerEventStore(AxonServerConfiguration configuration, PlatformConnectionManager platformConnectionManager, Serializer snapshotSerializer, Serializer eventSerializer, EventUpcaster upcasterChain) {
        super((EventStorageEngine)new AxonIQEventStorageEngine(snapshotSerializer, eventSerializer, upcasterChain, configuration, new AxonDBClient(configuration, platformConnectionManager)));
    }

    public TrackingEventStream openStream(org.axonframework.eventsourcing.eventstore.TrackingToken trackingToken) {
        return this.storageEngine().openStream(trackingToken);
    }

    public QueryResultStream query(String query, boolean liveUpdates) {
        return this.storageEngine().query(query, liveUpdates);
    }

    protected AxonIQEventStorageEngine storageEngine() {
        return (AxonIQEventStorageEngine)super.storageEngine();
    }

    private static class AxonIQEventStorageEngine
    extends AbstractEventStorageEngine {
        private static final int ALLOW_SNAPSHOTS_MAGIC_VALUE = -42;
        private final String APPEND_EVENT_TRANSACTION = (Object)((Object)this) + "/APPEND_EVENT_TRANSACTION";
        private final EventUpcaster upcasterChain;
        private final AxonServerConfiguration configuration;
        private final AxonDBClient eventStoreClient;
        private final GrpcMetaDataConverter converter;

        private AxonIQEventStorageEngine(Serializer serializer, EventUpcaster upcasterChain, AxonServerConfiguration configuration, AxonDBClient eventStoreClient) {
            super(serializer, upcasterChain, null, serializer);
            this.upcasterChain = (EventUpcaster)ObjectUtils.getOrDefault((Object)upcasterChain, (Object)NoOpEventUpcaster.INSTANCE);
            this.configuration = configuration;
            this.eventStoreClient = eventStoreClient;
            this.converter = new GrpcMetaDataConverter(serializer);
        }

        private AxonIQEventStorageEngine(Serializer snapshotSerializer, Serializer serializer, EventUpcaster upcasterChain, AxonServerConfiguration configuration, AxonDBClient eventStoreClient) {
            super(snapshotSerializer, upcasterChain, null, serializer, null);
            this.upcasterChain = (EventUpcaster)ObjectUtils.getOrDefault((Object)upcasterChain, (Object)NoOpEventUpcaster.INSTANCE);
            this.configuration = configuration;
            this.eventStoreClient = eventStoreClient;
            this.converter = new GrpcMetaDataConverter(serializer);
        }

        protected void appendEvents(List<? extends EventMessage<?>> events, Serializer serializer) {
            AppendEventTransaction sender = CurrentUnitOfWork.isStarted() ? (AppendEventTransaction)CurrentUnitOfWork.get().root().getOrComputeResource(this.APPEND_EVENT_TRANSACTION, k -> {
                AppendEventTransaction appendEventTransaction = this.eventStoreClient.createAppendEventConnection();
                CurrentUnitOfWork.get().root().onRollback(u -> appendEventTransaction.rollback(u.getExecutionResult().getExceptionResult()));
                CurrentUnitOfWork.get().root().onCommit(u -> this.commit(appendEventTransaction));
                return appendEventTransaction;
            }) : this.eventStoreClient.createAppendEventConnection();
            for (EventMessage<?> eventMessage : events) {
                sender.append(this.map(eventMessage, serializer));
            }
            if (!CurrentUnitOfWork.isStarted()) {
                this.commit(sender);
            }
        }

        private void commit(AppendEventTransaction appendEventTransaction) {
            try {
                appendEventTransaction.commit();
            }
            catch (ExecutionException e) {
                throw ErrorCode.convert(e.getCause());
            }
            catch (InterruptedException | TimeoutException e) {
                throw ErrorCode.convert(e);
            }
        }

        public Event map(EventMessage<?> eventMessage, Serializer serializer) {
            Event.Builder builder = Event.newBuilder();
            if (eventMessage instanceof GenericDomainEventMessage) {
                builder.setAggregateIdentifier(((GenericDomainEventMessage)eventMessage).getAggregateIdentifier()).setAggregateSequenceNumber(((GenericDomainEventMessage)eventMessage).getSequenceNumber()).setAggregateType(((GenericDomainEventMessage)eventMessage).getType());
            }
            SerializedObject serializedPayload = eventMessage.serializePayload(serializer, byte[].class);
            builder.setMessageIdentifier(eventMessage.getIdentifier()).setPayload(io.axoniq.axonserver.grpc.SerializedObject.newBuilder().setType(serializedPayload.getType().getName()).setRevision((String)ObjectUtils.getOrDefault((Object)serializedPayload.getType().getRevision(), (Object)"")).setData(ByteString.copyFrom((byte[])((byte[])serializedPayload.getData())))).setTimestamp(eventMessage.getTimestamp().toEpochMilli());
            eventMessage.getMetaData().forEach((k, v) -> builder.putMetaData((String)k, this.converter.convertToMetaDataValue(v)));
            return builder.build();
        }

        protected void storeSnapshot(DomainEventMessage<?> snapshot, Serializer serializer) {
            try {
                this.eventStoreClient.appendSnapshot(this.map((EventMessage<?>)snapshot, serializer)).whenComplete((c, e) -> {
                    if (e != null) {
                        logger.warn("Error occurred while creating a snapshot", e);
                    } else if (c != null) {
                        if (c.getSuccess()) {
                            logger.info("Snapshot created");
                        } else {
                            logger.warn("Snapshot creation failed for unknown reason. Check server logs for details.");
                        }
                    }
                });
            }
            catch (Throwable e2) {
                throw ErrorCode.convert(e2);
            }
        }

        protected Stream<? extends DomainEventData<?>> readEventData(String aggregateIdentifier, long firstSequenceNumber) {
            logger.debug("Reading events for aggregate id {}", (Object)aggregateIdentifier);
            GetAggregateEventsRequest.Builder request = GetAggregateEventsRequest.newBuilder().setAggregateId(aggregateIdentifier);
            if (firstSequenceNumber > 0L) {
                request.setInitialSequence(firstSequenceNumber);
            } else if (firstSequenceNumber == -42L) {
                request.setAllowSnapshots(true);
            }
            try {
                return this.eventStoreClient.listAggregateEvents(request.build()).map(GrpcBackedDomainEventData::new);
            }
            catch (Exception e) {
                throw ErrorCode.convert(e);
            }
        }

        public TrackingEventStream openStream(org.axonframework.eventsourcing.eventstore.TrackingToken trackingToken) {
            Assert.isTrue((trackingToken == null || trackingToken instanceof GlobalSequenceTrackingToken ? 1 : 0) != 0, () -> "Invalid tracking token type. Must be GlobalSequenceTrackingToken.");
            long nextToken = trackingToken == null ? 0L : ((GlobalSequenceTrackingToken)trackingToken).getGlobalIndex() + 1L;
            final EventBuffer consumer = new EventBuffer(this.upcasterChain, this.getEventSerializer());
            logger.info("open stream: {}", (Object)nextToken);
            StreamObserver<GetEventsRequest> requestStream = this.eventStoreClient.listEvents(new StreamObserver<EventWithToken>(){

                public void onNext(EventWithToken eventWithToken) {
                    logger.debug("Received event with token: {}", (Object)eventWithToken.getToken());
                    consumer.push(eventWithToken);
                }

                public void onError(Throwable throwable) {
                    consumer.fail((RuntimeException)new EventStoreException("Error while reading events from the server", throwable));
                }

                public void onCompleted() {
                    consumer.fail((RuntimeException)new EventStoreException("Error while reading events from the server", (Throwable)new RuntimeException("Connection closed by server")));
                }
            });
            FlowControllingStreamObserver<GetEventsRequest> observer = new FlowControllingStreamObserver<GetEventsRequest>(requestStream, this.configuration, t -> GetEventsRequest.newBuilder().setNumberOfPermits(t.getPermits()).build(), t -> false);
            GetEventsRequest request = GetEventsRequest.newBuilder().setTrackingToken(nextToken).setClient(this.configuration.getClientName()).setComponent(this.configuration.getComponentName()).setNumberOfPermits(this.configuration.getInitialNrOfPermits().intValue()).build();
            observer.onNext(request);
            consumer.registerCloseListener(eventConsumer -> observer.onCompleted());
            consumer.registerConsumeListener(observer::markConsumed);
            return consumer;
        }

        public QueryResultStream query(String query, boolean liveUpdates) {
            final QueryResultBuffer consumer = new QueryResultBuffer();
            logger.debug("query: {}", (Object)query);
            StreamObserver<QueryEventsRequest> requestStream = this.eventStoreClient.query(new StreamObserver<QueryEventsResponse>(){

                public void onNext(QueryEventsResponse eventWithToken) {
                    consumer.push(eventWithToken);
                }

                public void onError(Throwable throwable) {
                    logger.info("Failed to receive events - {}", (Object)throwable.getMessage());
                    consumer.fail(new EventStoreException("Error while reading query results from the server", throwable));
                }

                public void onCompleted() {
                    consumer.close();
                }
            });
            FlowControllingStreamObserver<QueryEventsRequest> observer = new FlowControllingStreamObserver<QueryEventsRequest>(requestStream, this.configuration, t -> QueryEventsRequest.newBuilder().setNumberOfPermits(t.getPermits()).build(), t -> false);
            observer.onNext(QueryEventsRequest.newBuilder().setQuery(query).setNumberOfPermits(this.configuration.getInitialNrOfPermits().intValue()).setLiveEvents(liveUpdates).build());
            consumer.registerCloseListener(eventConsumer -> observer.onCompleted());
            consumer.registerConsumeListener(observer::markConsumed);
            return consumer;
        }

        public DomainEventStream readEvents(String aggregateIdentifier) {
            Stream<DomainEventData<?>> input = this.readEventData(aggregateIdentifier, -42L);
            return DomainEventStream.of(input.map(this::upcastAndDeserializeDomainEvent).filter(Objects::nonNull));
        }

        private DomainEventMessage<?> upcastAndDeserializeDomainEvent(DomainEventData<?> domainEventData) {
            DomainEventStream upcastedStream = EventUtils.upcastAndDeserializeDomainEvents(Stream.of(domainEventData), (Serializer)new GrpcMetaDataAwareSerializer(this.isSnapshot(domainEventData) ? this.getSerializer() : this.getEventSerializer()), (EventUpcaster)this.upcasterChain, (boolean)false);
            return upcastedStream.hasNext() ? upcastedStream.next() : null;
        }

        private boolean isSnapshot(DomainEventData<?> domainEventData) {
            if (domainEventData instanceof GrpcBackedDomainEventData) {
                GrpcBackedDomainEventData grpcBackedDomainEventData = (GrpcBackedDomainEventData)domainEventData;
                return grpcBackedDomainEventData.isSnapshot();
            }
            return false;
        }

        public Optional<Long> lastSequenceNumberFor(String aggregateIdentifier) {
            try {
                ReadHighestSequenceNrResponse lastSequenceNumber = this.eventStoreClient.lastSequenceNumberFor(aggregateIdentifier).get();
                return lastSequenceNumber.getToSequenceNr() < 0L ? Optional.empty() : Optional.of(lastSequenceNumber.getToSequenceNr());
            }
            catch (Throwable e) {
                throw ErrorCode.convert(e);
            }
        }

        public org.axonframework.eventsourcing.eventstore.TrackingToken createTailToken() {
            try {
                TrackingToken token = this.eventStoreClient.getFirstToken().get();
                if (token.getToken() < 0L) {
                    return null;
                }
                return new GlobalSequenceTrackingToken(token.getToken() - 1L);
            }
            catch (Throwable e) {
                throw ErrorCode.convert(e);
            }
        }

        public org.axonframework.eventsourcing.eventstore.TrackingToken createHeadToken() {
            try {
                TrackingToken token = this.eventStoreClient.getLastToken().get();
                return new GlobalSequenceTrackingToken(token.getToken());
            }
            catch (Throwable e) {
                throw ErrorCode.convert(e);
            }
        }

        public org.axonframework.eventsourcing.eventstore.TrackingToken createTokenAt(Instant instant) {
            try {
                TrackingToken token = this.eventStoreClient.getTokenAt(instant).get();
                if (token.getToken() < 0L) {
                    return null;
                }
                return new GlobalSequenceTrackingToken(token.getToken() - 1L);
            }
            catch (Throwable e) {
                throw ErrorCode.convert(e);
            }
        }

        protected Stream<? extends TrackedEventData<?>> readEventData(org.axonframework.eventsourcing.eventstore.TrackingToken trackingToken, boolean mayBlock) {
            throw new UnsupportedOperationException("This method is not optimized for the AxonIQ Event Store and should not be used");
        }

        protected Stream<? extends DomainEventData<?>> readSnapshotData(String aggregateIdentifier) {
            return Stream.empty();
        }
    }
}

