/*
 * Decompiled with CFR 0.152.
 */
package com.salesforce.reactorgrpc.stub;

import com.google.common.base.Preconditions;
import com.salesforce.reactorgrpc.stub.ReactorCallOptions;
import com.salesforce.reactorgrpc.stub.ReactorServerStreamObserverAndPublisher;
import com.salesforce.reactorgrpc.stub.ReactorSubscriberAndServerProducer;
import io.grpc.CallOptions;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class ServerCalls {
    private ServerCalls() {
    }

    public static <TRequest, TResponse> void oneToOne(TRequest request, StreamObserver<TResponse> responseObserver, Function<Mono<TRequest>, Mono<TResponse>> delegate) {
        try {
            Mono<TRequest> rxRequest = Mono.just(request);
            Mono rxResponse = (Mono)Preconditions.checkNotNull(delegate.apply(rxRequest));
            rxResponse.subscribe(value -> {
                if (responseObserver instanceof ServerCallStreamObserver && ((ServerCallStreamObserver)responseObserver).isCancelled()) {
                    return;
                }
                responseObserver.onNext(value);
            }, throwable -> responseObserver.onError(ServerCalls.prepareError(throwable)), () -> responseObserver.onCompleted());
        }
        catch (Throwable throwable2) {
            responseObserver.onError(ServerCalls.prepareError(throwable2));
        }
    }

    public static <TRequest, TResponse> void oneToMany(TRequest request, StreamObserver<TResponse> responseObserver, Function<Mono<TRequest>, Flux<TResponse>> delegate) {
        try {
            Mono<TRequest> rxRequest = Mono.just(request);
            Flux rxResponse = (Flux)Preconditions.checkNotNull(delegate.apply(rxRequest));
            ReactorSubscriberAndServerProducer server = rxResponse.subscribeWith(new ReactorSubscriberAndServerProducer());
            server.subscribe((ServerCallStreamObserver)responseObserver);
        }
        catch (Throwable throwable) {
            responseObserver.onError(ServerCalls.prepareError(throwable));
        }
    }

    public static <TRequest, TResponse> StreamObserver<TRequest> manyToOne(StreamObserver<TResponse> responseObserver, Function<Flux<TRequest>, Mono<TResponse>> delegate, CallOptions options) {
        int prefetch = ReactorCallOptions.getPrefetch(options);
        int lowTide = ReactorCallOptions.getLowTide(options);
        ReactorServerStreamObserverAndPublisher streamObserverPublisher = new ReactorServerStreamObserverAndPublisher((ServerCallStreamObserver)responseObserver, null, prefetch, lowTide);
        try {
            Mono rxResponse = (Mono)Preconditions.checkNotNull(delegate.apply(Flux.from(streamObserverPublisher)));
            rxResponse.subscribe(value -> {
                if (!streamObserverPublisher.isCancelled()) {
                    responseObserver.onNext(value);
                }
            }, throwable -> {
                if (!streamObserverPublisher.isCancelled()) {
                    streamObserverPublisher.abortPendingCancel();
                    responseObserver.onError(ServerCalls.prepareError(throwable));
                }
            }, () -> responseObserver.onCompleted());
        }
        catch (Throwable throwable2) {
            responseObserver.onError(ServerCalls.prepareError(throwable2));
        }
        return streamObserverPublisher;
    }

    public static <TRequest, TResponse> StreamObserver<TRequest> manyToMany(StreamObserver<TResponse> responseObserver, Function<Flux<TRequest>, Flux<TResponse>> delegate, CallOptions options) {
        int prefetch = ReactorCallOptions.getPrefetch(options);
        int lowTide = ReactorCallOptions.getLowTide(options);
        ReactorServerStreamObserverAndPublisher streamObserverPublisher = new ReactorServerStreamObserverAndPublisher((ServerCallStreamObserver)responseObserver, null, prefetch, lowTide);
        try {
            Flux rxResponse = (Flux)Preconditions.checkNotNull(delegate.apply(Flux.from(streamObserverPublisher)));
            ReactorSubscriberAndServerProducer subscriber = new ReactorSubscriberAndServerProducer();
            subscriber.subscribe((ServerCallStreamObserver)responseObserver);
            rxResponse.subscribe(subscriber);
        }
        catch (Throwable throwable) {
            responseObserver.onError(ServerCalls.prepareError(throwable));
        }
        return streamObserverPublisher;
    }

    private static Throwable prepareError(Throwable throwable) {
        if (throwable instanceof StatusException || throwable instanceof StatusRuntimeException) {
            return throwable;
        }
        return Status.fromThrowable((Throwable)throwable).asException();
    }
}

