/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.mutiny.calls;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.mutiny.ServerTripleMutinyPublisher;
import org.apache.dubbo.mutiny.ServerTripleMutinySubscriber;
import org.apache.dubbo.rpc.StatusRpcException;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;

public class MutinyServerCalls {
    private MutinyServerCalls() {
    }

    public static <T, R> void oneToOne(T request, StreamObserver<R> responseObserver, Function<Uni<T>, Uni<R>> func) {
        try {
            func.apply(Uni.createFrom().item(request)).onItem().ifNull().failWith((Throwable)TriRpcStatus.NOT_FOUND.asException()).subscribe().with(item -> {
                responseObserver.onNext(item);
                responseObserver.onCompleted();
            }, throwable -> MutinyServerCalls.doOnResponseHasException(throwable, responseObserver));
        }
        catch (Throwable throwable2) {
            MutinyServerCalls.doOnResponseHasException(throwable2, responseObserver);
        }
    }

    public static <T, R> CompletableFuture<List<R>> oneToMany(T request, StreamObserver<R> responseObserver, Function<Uni<T>, Multi<R>> func) {
        try {
            CallStreamObserver callStreamObserver = (CallStreamObserver)responseObserver;
            Multi<R> response = func.apply(Uni.createFrom().item(request));
            ServerTripleMutinySubscriber mutinySubscriber = new ServerTripleMutinySubscriber(callStreamObserver);
            ((ServerTripleMutinySubscriber)response.subscribe().withSubscriber(mutinySubscriber)).subscribe(callStreamObserver);
            return mutinySubscriber.getExecutionFuture();
        }
        catch (Throwable throwable) {
            MutinyServerCalls.doOnResponseHasException(throwable, responseObserver);
            CompletableFuture<List<R>> failed = new CompletableFuture<List<R>>();
            failed.completeExceptionally(throwable);
            return failed;
        }
    }

    public static <T, R> StreamObserver<T> manyToOne(StreamObserver<R> responseObserver, Function<Multi<T>, Uni<R>> func) {
        CallStreamObserver callStreamObserver = (CallStreamObserver)responseObserver;
        ServerTripleMutinyPublisher serverPublisher = new ServerTripleMutinyPublisher(callStreamObserver);
        try {
            Uni responseUni = func.apply(Multi.createFrom().publisher(serverPublisher)).onItem().ifNull().failWith((Throwable)TriRpcStatus.NOT_FOUND.asException());
            responseUni.subscribe().with(value -> {
                if (!serverPublisher.isCancelled()) {
                    callStreamObserver.onNext(value);
                    callStreamObserver.onCompleted();
                }
            }, throwable -> {
                if (!serverPublisher.isCancelled()) {
                    callStreamObserver.onError(throwable);
                }
            });
            serverPublisher.startRequest();
        }
        catch (Throwable throwable2) {
            responseObserver.onError(throwable2);
        }
        return serverPublisher;
    }

    public static <T, R> StreamObserver<T> manyToMany(StreamObserver<R> responseObserver, Function<Multi<T>, Multi<R>> func) {
        CallStreamObserver callStreamObserver = (CallStreamObserver)responseObserver;
        ServerTripleMutinyPublisher serverPublisher = new ServerTripleMutinyPublisher(callStreamObserver);
        try {
            Multi<R> responseMulti = func.apply(Multi.createFrom().publisher(serverPublisher));
            ServerTripleMutinySubscriber serverSubscriber = (ServerTripleMutinySubscriber)responseMulti.subscribe().withSubscriber(new ServerTripleMutinySubscriber());
            serverSubscriber.subscribe(callStreamObserver);
            serverPublisher.startRequest();
        }
        catch (Throwable throwable) {
            responseObserver.onError(throwable);
        }
        return serverPublisher;
    }

    private static void doOnResponseHasException(Throwable throwable, StreamObserver<?> responseObserver) {
        StatusRpcException statusRpcException = TriRpcStatus.getStatus((Throwable)throwable).asException();
        responseObserver.onError((Throwable)statusRpcException);
    }
}

