/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.event.transformation.impl.grpc;

import com.google.protobuf.Empty;
import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.event.transformation.EventTransformation;
import io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService;
import io.axoniq.axonserver.connector.event.transformation.impl.grpc.GrpcEventTransformation;
import io.axoniq.axonserver.connector.event.transformation.impl.grpc.GrpcTransformationStream;
import io.axoniq.axonserver.connector.event.transformation.impl.grpc.NonTransientException;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.FutureListStreamObserver;
import io.axoniq.axonserver.connector.impl.FutureStreamObserver;
import io.axoniq.axonserver.grpc.event.ApplyTransformationRequest;
import io.axoniq.axonserver.grpc.event.CompactionRequest;
import io.axoniq.axonserver.grpc.event.EventTransformationServiceGrpc;
import io.axoniq.axonserver.grpc.event.StartTransformationRequest;
import io.axoniq.axonserver.grpc.event.TransformRequestAck;
import io.axoniq.axonserver.grpc.event.Transformation;
import io.axoniq.axonserver.grpc.event.TransformationId;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GrpcEventTransformationService
implements EventTransformationService {
    private final Logger logger = LoggerFactory.getLogger(GrpcEventTransformationService.class);
    private final EventTransformationServiceGrpc.EventTransformationServiceStub stub;

    public GrpcEventTransformationService(AxonServerManagedChannel axonServerManagedChannel) {
        this(EventTransformationServiceGrpc.newStub((Channel)axonServerManagedChannel));
    }

    public GrpcEventTransformationService(EventTransformationServiceGrpc.EventTransformationServiceStub stub) {
        this.stub = stub;
    }

    @Nonnull
    private static ApplyTransformationRequest applyTransformationRequest(String transformationId, Long sequence) {
        return ApplyTransformationRequest.newBuilder().setTransformationId(TransformationId.newBuilder().setId(transformationId)).setLastSequence(sequence).build();
    }

    @Override
    public CompletableFuture<Iterable<EventTransformation>> transformations() {
        FutureListStreamObserver<Transformation> responseObserver = new FutureListStreamObserver<Transformation>();
        this.stub.transformations(Empty.newBuilder().build(), responseObserver);
        return responseObserver.thenApply(n -> n.stream().map(GrpcEventTransformation::new).collect(Collectors.toList()));
    }

    @Override
    public CompletableFuture<EventTransformation> transformationById(String id) {
        return this.transformations().thenApply(list -> StreamSupport.stream(list.spliterator(), false).filter(t -> id.equals(t.id())).findFirst().orElseThrow(IllegalStateException::new));
    }

    @Override
    public CompletableFuture<String> newTransformation(String description) {
        FutureStreamObserver<TransformationId> responseObserver = new FutureStreamObserver<TransformationId>(new AxonServerException(ErrorCategory.OTHER, "An unknown error occurred while starting transformation. No response received from Server.", ""));
        this.stub.startTransformation(StartTransformationRequest.newBuilder().setDescription(description).build(), responseObserver);
        return responseObserver.thenApply(TransformationId::getId);
    }

    @Override
    public EventTransformationService.TransformationStream transformationStream(String transformationId) {
        AtomicReference<Consumer<Long>> ackListener = new AtomicReference<Consumer<Long>>(seq -> {});
        AtomicReference<Consumer<Throwable>> onCompletedByServer = new AtomicReference<Consumer<Throwable>>(error -> {});
        StreamObserver<TransformRequestAck> responseObserver = this.responseObserver(ackSequence -> ((Consumer)ackListener.get()).accept(ackSequence), error -> ((Consumer)onCompletedByServer.get()).accept(error));
        return new GrpcTransformationStream(transformationId, this.stub.transformEvents(responseObserver), ackListener::set, onCompletedByServer::set);
    }

    private StreamObserver<TransformRequestAck> responseObserver(final Consumer<Long> ackListenerSupplier, final Consumer<Throwable> onError) {
        return new StreamObserver<TransformRequestAck>(){

            public void onNext(TransformRequestAck value) {
                ackListenerSupplier.accept(value.getSequence());
            }

            public void onError(Throwable t) {
                GrpcEventTransformationService.this.logger.warn("Transformation failed by server", t);
                onError.accept(t);
            }

            public void onCompleted() {
                String message = "The server unexpectedly completed the transformation stream";
                NonTransientException e = new NonTransientException(message);
                GrpcEventTransformationService.this.logger.warn(message, (Throwable)e);
                onError.accept(e);
            }
        };
    }

    @Override
    public CompletableFuture<Void> startApplying(String transformationId, Long sequence) {
        FutureStreamObserver<Empty> responseObserver = new FutureStreamObserver<Empty>(Empty.getDefaultInstance());
        this.stub.applyTransformation(GrpcEventTransformationService.applyTransformationRequest(transformationId, sequence), responseObserver);
        return responseObserver.thenAccept(empty -> {});
    }

    @Override
    public CompletableFuture<Void> cancel(String transformationId) {
        FutureStreamObserver<Empty> responseObserver = new FutureStreamObserver<Empty>(Empty.getDefaultInstance());
        this.stub.cancelTransformation(TransformationId.newBuilder().setId(transformationId).build(), responseObserver);
        return responseObserver.thenAccept(empty -> {});
    }

    @Override
    public CompletableFuture<Void> startCompacting() {
        FutureStreamObserver<Empty> responseObserver = new FutureStreamObserver<Empty>(Empty.getDefaultInstance());
        this.stub.compact(CompactionRequest.newBuilder().build(), responseObserver);
        return responseObserver.thenAccept(empty -> {});
    }
}

