/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.connector.FlowControl;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.query.QueryConverter;
import org.axonframework.axonserver.connector.util.ExceptionConverter;
import org.axonframework.common.annotations.Internal;
import org.axonframework.messaging.MessageStream;
import org.axonframework.queryhandling.QueryResponseMessage;

@Internal
class FlowControlledResponseSender
implements FlowControl {
    private final String clientId;
    private final String queryIdentifier;
    private final MessageStream<QueryResponseMessage> upstream;
    private final ReplyChannel<QueryResponse> downstream;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicLong requests = new AtomicLong();
    private final AtomicBoolean sendingGate = new AtomicBoolean(false);

    public FlowControlledResponseSender(String clientId, String queryIdentifier, MessageStream<QueryResponseMessage> upstream, ReplyChannel<QueryResponse> downstream) {
        this.clientId = clientId;
        this.queryIdentifier = queryIdentifier;
        this.upstream = upstream;
        this.downstream = downstream;
    }

    public void request(long requested) {
        if (requested <= 0L) {
            return;
        }
        if (this.requests.getAndUpdate(current -> current > Long.MAX_VALUE - requested ? Long.MAX_VALUE : current + requested) == 0L) {
            this.upstream.setCallback(this::responseSendingLoop);
        }
        this.responseSendingLoop();
    }

    private void responseSendingLoop() {
        while (!this.sendingGate.get() && (this.requests.get() > 0L && this.upstream.hasNextAvailable() || this.upstream.isCompleted() && !this.closed.get())) {
            this.sendResponses();
        }
    }

    private void sendResponses() {
        if (this.sendingGate.getAndSet(true)) {
            return;
        }
        try {
            while (this.requests.get() > 0L && this.upstream.hasNextAvailable()) {
                Optional next = this.upstream.next();
                if (next.isPresent()) {
                    this.requests.decrementAndGet();
                }
                next.ifPresent(i -> this.downstream.send((Object)QueryConverter.convertQueryResponseMessage(this.queryIdentifier, (QueryResponseMessage)i.message())));
            }
            if (this.upstream.isCompleted()) {
                this.closed.set(true);
                this.upstream.error().ifPresentOrElse(error -> {
                    ErrorCode errorCode = ErrorCode.getQueryExecutionErrorCode(error);
                    ErrorMessage ex = ExceptionConverter.convertToErrorMessage(this.clientId, errorCode, error);
                    QueryResponse errorResponse = QueryResponse.newBuilder().setErrorCode(errorCode.errorCode()).setErrorMessage(ex).setRequestIdentifier(this.queryIdentifier).build();
                    this.downstream.sendLast((Object)errorResponse);
                }, () -> this.downstream.complete());
            }
        }
        finally {
            this.sendingGate.set(false);
        }
    }

    public void cancel() {
        this.upstream.close();
    }
}

