/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.query.impl;

import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.impl.AbstractBufferedStream;
import io.axoniq.axonserver.connector.impl.FlowControlledStream;
import io.axoniq.axonserver.connector.impl.SynchronizedRequestStream;
import io.axoniq.axonserver.connector.query.impl.SubscriptionQueryUpdateBuffer;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryUpdate;
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryRequest;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryResponse;
import io.grpc.stub.ClientCallStreamObserver;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionQueryStream
extends FlowControlledStream<SubscriptionQueryResponse, SubscriptionQueryRequest> {
    private static final Logger logger = LoggerFactory.getLogger(SubscriptionQueryStream.class);
    private final String subscriptionQueryId;
    private final CompletableFuture<QueryResponse> initialResultFuture;
    private final AbstractBufferedStream<QueryUpdate, SubscriptionQueryRequest> updateBuffer;
    private final String clientId;
    private final Supplier<ResultStream<QueryResponse>> initialResultSupplier;
    private final AtomicReference<ResultStream<QueryResponse>> initialResult = new AtomicReference();

    public SubscriptionQueryStream(String subscriptionQueryId, CompletableFuture<QueryResponse> initialResultFuture, String clientId, Supplier<ResultStream<QueryResponse>> initialResultSupplier, int bufferSize, int fetchSize) {
        super(clientId, bufferSize, fetchSize);
        this.clientId = clientId;
        this.initialResultSupplier = initialResultSupplier;
        this.subscriptionQueryId = subscriptionQueryId;
        this.initialResultFuture = initialResultFuture;
        this.updateBuffer = new SubscriptionQueryUpdateBuffer(clientId, subscriptionQueryId, bufferSize, fetchSize);
    }

    public ResultStream<QueryUpdate> updatesBuffer() {
        return this.updateBuffer;
    }

    public ResultStream<QueryResponse> initialStream() {
        return this.initialResult.updateAndGet(i -> i == null ? this.buildInitialResultStream() : i);
    }

    private ResultStream<QueryResponse> buildInitialResultStream() {
        final ResultStream<QueryResponse> delegate = this.initialResultSupplier.get();
        delegate.onAvailable(() -> {
            if (delegate.getError().isPresent()) {
                this.updatesBuffer().close();
            }
        });
        return new ResultStream<QueryResponse>(){

            @Override
            public QueryResponse peek() {
                return (QueryResponse)delegate.peek();
            }

            @Override
            public QueryResponse nextIfAvailable() {
                return (QueryResponse)delegate.nextIfAvailable();
            }

            @Override
            public QueryResponse nextIfAvailable(long timeout, TimeUnit unit) throws InterruptedException {
                return (QueryResponse)delegate.nextIfAvailable(timeout, unit);
            }

            @Override
            public QueryResponse next() throws InterruptedException {
                return (QueryResponse)delegate.next();
            }

            @Override
            public void onAvailable(Runnable callback) {
                delegate.onAvailable(() -> {
                    if (delegate.getError().isPresent()) {
                        SubscriptionQueryStream.this.updatesBuffer().close();
                    }
                    callback.run();
                });
            }

            @Override
            public void close() {
                delegate.close();
                SubscriptionQueryStream.this.updatesBuffer().close();
            }

            @Override
            public boolean isClosed() {
                return delegate.isClosed();
            }

            @Override
            public Optional<Throwable> getError() {
                return delegate.getError();
            }
        };
    }

    public void onNext(SubscriptionQueryResponse value) {
        switch (value.getResponseCase()) {
            case UPDATE: {
                logger.debug("Received subscription query update. Subscription Id: {}. Message Id: {}.", (Object)value.getSubscriptionIdentifier(), (Object)value.getMessageIdentifier());
                this.updateBuffer.onNext(value.getUpdate());
                break;
            }
            case COMPLETE: {
                logger.debug("Received subscription query complete. Subscription Id: {}.", (Object)value.getSubscriptionIdentifier());
                this.updateBuffer.onCompleted();
                break;
            }
            case COMPLETE_EXCEPTIONALLY: {
                logger.debug("Received subscription query complete exceptionally. Subscription Id: {}.", (Object)value.getSubscriptionIdentifier());
                AxonServerException exception = new AxonServerException(ErrorCategory.getFromCode(value.getCompleteExceptionally().getErrorCode()), value.getCompleteExceptionally().getErrorMessage().getMessage(), value.getCompleteExceptionally().getClientId());
                this.updateBuffer.onError(exception);
                if (this.initialResultFuture.isDone()) break;
                this.initialResultFuture.completeExceptionally(exception);
                break;
            }
            case INITIAL_RESULT: {
                logger.debug("Received subscription query initial result. Subscription Id: {}. Message Id: {}.", (Object)value.getSubscriptionIdentifier(), (Object)value.getMessageIdentifier());
                this.initialResultFuture.complete(value.getInitialResult());
                break;
            }
            default: {
                logger.info("Received unsupported message from SubscriptionQuery. It doesn't declare one of the expected types: {}", (Object)value.getResponseCase());
            }
        }
    }

    public void onError(Throwable t) {
        this.initialResultFuture.completeExceptionally(t);
        ResultStream<QueryResponse> initialResult = this.initialResult.get();
        if (initialResult != null && !initialResult.isClosed()) {
            initialResult.close();
        }
        this.updateBuffer.onError(t);
        try {
            SubscriptionQuery subscriptionQueryToUnsubscribe = SubscriptionQuery.newBuilder().setSubscriptionIdentifier(this.subscriptionQueryId).build();
            this.outboundStream().onNext((Object)SubscriptionQueryRequest.newBuilder().setUnsubscribe(subscriptionQueryToUnsubscribe).build());
            this.outboundStream().onCompleted();
        }
        catch (Exception e) {
            logger.debug("Cannot complete stream. Already completed.", (Throwable)e);
        }
    }

    public void onCompleted() {
        this.updateBuffer.onCompleted();
        if (!this.initialResultFuture.isDone()) {
            this.initialResultFuture.completeExceptionally(new AxonServerException(ErrorCategory.QUERY_DISPATCH_ERROR, "Subscription query has already been completed", this.clientId));
        }
    }

    @Override
    public void beforeStart(ClientCallStreamObserver<SubscriptionQueryRequest> requestStream) {
        SynchronizedRequestStream<SubscriptionQueryRequest> synchronizedRequestStream = new SynchronizedRequestStream<SubscriptionQueryRequest>(requestStream);
        super.beforeStart(synchronizedRequestStream);
        this.updateBuffer.beforeStart((ClientCallStreamObserver<SubscriptionQueryRequest>)synchronizedRequestStream);
    }

    @Override
    public void enableFlowControl() {
        this.updateBuffer.enableFlowControl();
    }

    @Override
    protected SubscriptionQueryRequest buildFlowControlMessage(FlowControl flowControl) {
        return null;
    }
}

