/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.grpc.stubs;

import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.quarkus.arc.Arc;
import io.quarkus.grpc.ExceptionHandlerProvider;
import io.quarkus.grpc.stubs.StreamCollector;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.mutiny.subscription.Cancellable;
import java.lang.annotation.Annotation;
import java.util.function.Function;
import org.jboss.logging.Logger;

public class ServerCalls {
    private static final Logger log = Logger.getLogger(ServerCalls.class);
    private static StreamCollector streamCollector = StreamCollector.NO_OP;
    private static ExceptionHandlerProvider ehp;

    private ServerCalls() {
    }

    public static <I, O> void oneToOne(I request, StreamObserver<O> response, String compression, Function<I, Uni<O>> implementation) {
        ServerCalls.trySetCompression(response, compression);
        streamCollector.add(response);
        try {
            Uni<O> uni = implementation.apply(request);
            if (uni == null) {
                log.error("gRPC service method returned null instead of Uni. Please change the implementation to return a Uni object, either carrying a value or a failure, or throw StatusRuntimeException");
                ServerCalls.onError(response, Status.fromCode(Status.Code.INTERNAL).asException());
                return;
            }
            uni.subscribe().with(item -> {
                response.onNext(item);
                ServerCalls.onCompleted(response);
            }, failure -> ServerCalls.onError(response, failure));
        }
        catch (Throwable t) {
            ServerCalls.onError(response, t);
        }
    }

    public static <I, O> void oneToMany(I request, StreamObserver<O> response, String compression, Function<I, Multi<O>> implementation) {
        try {
            ServerCalls.trySetCompression(response, compression);
            streamCollector.add(response);
            Multi<O> returnValue = implementation.apply(request);
            if (returnValue == null) {
                log.error("gRPC service method returned null instead of Multi. Please change the implementation to return a Multi object or throw StatusRuntimeException");
                ServerCalls.onError(response, Status.fromCode(Status.Code.INTERNAL).asException());
                return;
            }
            ServerCalls.handleSubscription(returnValue.subscribe().with(response::onNext, throwable -> ServerCalls.onError(response, throwable), () -> ServerCalls.onCompleted(response)), response);
        }
        catch (Throwable throwable2) {
            ServerCalls.onError(response, throwable2);
        }
    }

    public static <I, O> StreamObserver<I> manyToOne(StreamObserver<O> response, Function<Multi<I>, Uni<O>> implementation) {
        try {
            UnicastProcessor input = UnicastProcessor.create();
            StreamObserver pump = ServerCalls.getStreamObserverFeedingProcessor(input);
            streamCollector.add(response);
            Uni<O> uni = implementation.apply(input);
            if (uni == null) {
                log.error("gRPC service method returned null instead of Uni. Please change the implementation to return a Uni object, either carrying a value or a failure, or throw StatusRuntimeException");
                ServerCalls.onError(response, Status.fromCode(Status.Code.INTERNAL).asException());
                return null;
            }
            uni.subscribe().with(item -> {
                response.onNext(item);
                ServerCalls.onCompleted(response);
            }, failure -> ServerCalls.onError(response, failure));
            return pump;
        }
        catch (Throwable throwable) {
            ServerCalls.onError(response, throwable);
            return null;
        }
    }

    private static <O> void handleSubscription(Cancellable cancellable, StreamObserver<O> response) {
        if (response instanceof ServerCallStreamObserver) {
            ServerCallStreamObserver serverCallResponse = (ServerCallStreamObserver)response;
            Runnable cancel = cancellable::cancel;
            serverCallResponse.setOnCloseHandler(cancel);
            serverCallResponse.setOnCancelHandler(cancel);
        }
    }

    public static <I, O> StreamObserver<I> manyToMany(StreamObserver<O> response, Function<Multi<I>, Multi<O>> implementation) {
        try {
            streamCollector.add(response);
            UnicastProcessor input = UnicastProcessor.create();
            StreamObserver pump = ServerCalls.getStreamObserverFeedingProcessor(input);
            Multi<O> multi = implementation.apply(input);
            if (multi == null) {
                log.error("gRPC service method returned null instead of Multi. Please change the implementation to return a Multi object or throw StatusRuntimeException");
                ServerCalls.onError(response, Status.fromCode(Status.Code.INTERNAL).asException());
                return null;
            }
            ServerCalls.handleSubscription(multi.subscribe().with(response::onNext, failure -> ServerCalls.onError(response, failure), () -> ServerCalls.onCompleted(response)), response);
            return pump;
        }
        catch (Throwable throwable) {
            ServerCalls.onError(response, throwable);
            return null;
        }
    }

    private static <O> void onCompleted(StreamObserver<O> response) {
        try {
            response.onCompleted();
        }
        finally {
            streamCollector.remove(response);
        }
    }

    private static ExceptionHandlerProvider getEhp() {
        if (ehp == null) {
            ehp = (ExceptionHandlerProvider)Arc.container().select(ExceptionHandlerProvider.class, new Annotation[0]).get();
        }
        return ehp;
    }

    private static <O> void onError(StreamObserver<O> response, Throwable error) {
        try {
            response.onError(ServerCalls.getEhp().transform(error));
        }
        finally {
            streamCollector.remove(response);
        }
    }

    private static <I> StreamObserver<I> getStreamObserverFeedingProcessor(final UnicastProcessor<I> input) {
        StreamObserver result = new StreamObserver<I>(){

            @Override
            public void onNext(I i) {
                input.onNext(i);
            }

            @Override
            public void onError(Throwable throwable) {
                input.onError(throwable);
                streamCollector.remove(this);
            }

            @Override
            public void onCompleted() {
                input.onComplete();
                streamCollector.remove(this);
            }
        };
        streamCollector.add(result);
        return result;
    }

    private static void trySetCompression(StreamObserver<?> response, String compression) {
        if (compression != null && response instanceof ServerCallStreamObserver) {
            ServerCallStreamObserver serverResponse = (ServerCallStreamObserver)response;
            serverResponse.setCompression(compression);
        }
    }

    public static void setStreamCollector(StreamCollector collector) {
        streamCollector = collector;
    }

    public static StreamCollector getStreamCollector() {
        return streamCollector;
    }
}

