/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query.subscription;

import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryUpdate;
import io.axoniq.axonserver.grpc.query.QueryUpdateCompleteExceptionally;
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryRequest;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryResponse;
import io.grpc.stub.StreamObserver;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.Publisher;
import org.axonframework.axonserver.connector.util.FlowControllingStreamObserver;
import org.axonframework.common.Registration;
import org.axonframework.queryhandling.DefaultSubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public class AxonServerSubscriptionQueryResult
implements Supplier<SubscriptionQueryResult<QueryResponse, QueryUpdate>>,
StreamObserver<SubscriptionQueryResponse> {
    private final Logger logger = LoggerFactory.getLogger(AxonServerSubscriptionQueryResult.class);
    private final SubscriptionQuery subscriptionQuery;
    private final FlowControllingStreamObserver<SubscriptionQueryRequest> requestObserver;
    private final SubscriptionQueryResult<QueryResponse, QueryUpdate> result;
    private final FluxSink<QueryUpdate> updateMessageFluxSink;
    private final Runnable onDispose;
    private MonoSink<QueryResponse> initialResultSink;

    public AxonServerSubscriptionQueryResult(SubscriptionQuery subscriptionQuery, Function<StreamObserver<SubscriptionQueryResponse>, StreamObserver<SubscriptionQueryRequest>> openStreamFn, AxonServerConfiguration configuration, SubscriptionQueryBackpressure backPressure, int bufferSize, Runnable onDispose) {
        this.subscriptionQuery = subscriptionQuery;
        StreamObserver<SubscriptionQueryRequest> subscriptionStreamObserver = openStreamFn.apply(this);
        Function<FlowControl, SubscriptionQueryRequest> requestMapping = flowControl -> SubscriptionQueryRequest.newBuilder().setFlowControl(SubscriptionQuery.newBuilder(this.subscriptionQuery).setNumberOfPermits(flowControl.getPermits())).build();
        this.requestObserver = new FlowControllingStreamObserver<SubscriptionQueryRequest>(subscriptionStreamObserver, configuration.getClientId(), configuration.getQueryFlowControl(), requestMapping, t -> false);
        this.requestObserver.sendInitialPermits();
        this.requestObserver.onNext(SubscriptionQueryRequest.newBuilder().setSubscribe(this.subscriptionQuery).build());
        EmitterProcessor processor = EmitterProcessor.create((int)bufferSize);
        this.updateMessageFluxSink = processor.sink(backPressure.getOverflowStrategy());
        this.updateMessageFluxSink.onDispose(this.requestObserver::onCompleted);
        Registration registration = () -> {
            this.updateMessageFluxSink.complete();
            return true;
        };
        Mono mono = Mono.create(sink -> this.initialResult((MonoSink<QueryResponse>)sink, this.requestObserver::onNext));
        this.result = new DefaultSubscriptionQueryResult(mono, processor.replay().autoConnect(), registration);
        this.onDispose = onDispose;
    }

    private void initialResult(MonoSink<QueryResponse> sink, Publisher<SubscriptionQueryRequest> publisher) {
        this.initialResultSink = sink;
        publisher.publish(SubscriptionQueryRequest.newBuilder().setGetInitialResult(this.subscriptionQuery).build());
    }

    public void onNext(SubscriptionQueryResponse response) {
        this.requestObserver.markConsumed(1);
        switch (response.getResponseCase()) {
            case INITIAL_RESULT: {
                Optional.ofNullable(this.initialResultSink).ifPresent(sink -> sink.success((Object)response.getInitialResult()));
                break;
            }
            case UPDATE: {
                this.updateMessageFluxSink.next((Object)response.getUpdate());
                break;
            }
            case COMPLETE: {
                this.requestObserver.onCompleted();
                this.complete();
                break;
            }
            case COMPLETE_EXCEPTIONALLY: {
                this.requestObserver.onCompleted();
                QueryUpdateCompleteExceptionally exceptionally = response.getCompleteExceptionally();
                this.completeExceptionally((Throwable)ErrorCode.getFromCode(exceptionally.getErrorCode()).convert(exceptionally.getErrorMessage()));
            }
        }
    }

    public void onError(Throwable t) {
        this.completeExceptionally(t);
    }

    private void completeExceptionally(Throwable t) {
        this.onDispose.run();
        this.updateError(t);
        this.initialResultError(t);
    }

    private void updateError(Throwable t) {
        try {
            this.updateMessageFluxSink.error(t);
        }
        catch (Exception e) {
            this.updateMessageFluxSink.complete();
            this.logger.warn("Problem signaling updates error.", (Throwable)e);
        }
    }

    public void onCompleted() {
        this.complete();
    }

    private void complete() {
        this.onDispose.run();
        this.updateMessageFluxSink.complete();
        this.initialResultError(new IllegalStateException("Subscription Completed"));
    }

    private void initialResultError(Throwable t) {
        try {
            Optional.ofNullable(this.initialResultSink).ifPresent(sink -> sink.error(t));
        }
        catch (Exception e) {
            this.logger.warn("Problem signaling initial result error.", (Throwable)e);
        }
    }

    @Override
    public SubscriptionQueryResult<QueryResponse, QueryUpdate> get() {
        return this.result;
    }
}

