/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.reactive.calls;

import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.reactive.ClientTripleReactorPublisher;
import org.apache.dubbo.reactive.ClientTripleReactorSubscriber;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.StubMethodDescriptor;
import org.apache.dubbo.rpc.stub.StubInvocationUtil;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public final class ReactorClientCalls {
    private ReactorClientCalls() {
    }

    public static <TRequest, TResponse, TInvoker> Mono<TResponse> oneToOne(Invoker<TInvoker> invoker, Mono<TRequest> monoRequest, StubMethodDescriptor methodDescriptor) {
        try {
            return Mono.create(emitter -> monoRequest.subscribe(request -> StubInvocationUtil.unaryCall((Invoker)invoker, (MethodDescriptor)methodDescriptor, (Object)request, (StreamObserver)new StreamObserver<TResponse>((MonoSink)emitter){
                final /* synthetic */ MonoSink val$emitter;
                {
                    this.val$emitter = monoSink;
                }

                public void onNext(TResponse tResponse) {
                    this.val$emitter.success(tResponse);
                }

                public void onError(Throwable throwable) {
                    this.val$emitter.error(throwable);
                }

                public void onCompleted() {
                }
            }), arg_0 -> ((MonoSink)emitter).error(arg_0)));
        }
        catch (Throwable throwable) {
            return Mono.error((Throwable)throwable);
        }
    }

    public static <TRequest, TResponse, TInvoker> Flux<TResponse> oneToMany(Invoker<TInvoker> invoker, Mono<TRequest> monoRequest, StubMethodDescriptor methodDescriptor) {
        try {
            return monoRequest.flatMapMany(request -> {
                ClientTripleReactorPublisher clientPublisher = new ClientTripleReactorPublisher();
                StubInvocationUtil.serverStreamCall((Invoker)invoker, (MethodDescriptor)methodDescriptor, (Object)request, clientPublisher);
                return clientPublisher;
            });
        }
        catch (Throwable throwable) {
            return Flux.error((Throwable)throwable);
        }
    }

    public static <TRequest, TResponse, TInvoker> Mono<TResponse> manyToOne(Invoker<TInvoker> invoker, Flux<TRequest> requestFlux, StubMethodDescriptor methodDescriptor) {
        try {
            ClientTripleReactorSubscriber clientSubscriber = (ClientTripleReactorSubscriber)requestFlux.subscribeWith(new ClientTripleReactorSubscriber());
            ClientTripleReactorPublisher clientPublisher = new ClientTripleReactorPublisher(s -> clientSubscriber.subscribe(s), clientSubscriber::cancel);
            return Mono.from(clientPublisher).doOnSubscribe(dummy -> StubInvocationUtil.biOrClientStreamCall((Invoker)invoker, (MethodDescriptor)methodDescriptor, (StreamObserver)clientPublisher));
        }
        catch (Throwable throwable) {
            return Mono.error((Throwable)throwable);
        }
    }

    public static <TRequest, TResponse, TInvoker> Flux<TResponse> manyToMany(Invoker<TInvoker> invoker, Flux<TRequest> requestFlux, StubMethodDescriptor methodDescriptor) {
        try {
            ClientTripleReactorSubscriber clientSubscriber = (ClientTripleReactorSubscriber)requestFlux.subscribeWith(new ClientTripleReactorSubscriber());
            ClientTripleReactorPublisher clientPublisher = new ClientTripleReactorPublisher(s -> clientSubscriber.subscribe(s), clientSubscriber::cancel);
            return Flux.from(clientPublisher).doOnSubscribe(dummy -> StubInvocationUtil.biOrClientStreamCall((Invoker)invoker, (MethodDescriptor)methodDescriptor, (StreamObserver)clientPublisher));
        }
        catch (Throwable throwable) {
            return Flux.error((Throwable)throwable);
        }
    }
}

