/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.axonframework.axonserver.connector.query.QuerySerializer;
import org.axonframework.axonserver.connector.query.StreamableResponse;
import org.axonframework.messaging.GenericMessage;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageType;
import org.axonframework.messaging.MetaData;
import org.axonframework.queryhandling.GenericQueryResponseMessage;
import org.axonframework.queryhandling.QueryResponseMessage;

class StreamableMultiInstanceResponse<T>
implements StreamableResponse {
    private final QueryResponseMessage resultMessage;
    private final Class<T> responseType;
    private final Iterator<T> result;
    private final ReplyChannel<QueryResponse> responseHandler;
    private final QuerySerializer serializer;
    private final String requestId;
    private final AtomicLong requestedRef = new AtomicLong();
    private final AtomicBoolean flowGate = new AtomicBoolean();
    private final AtomicBoolean firstResponseToBeSent = new AtomicBoolean(true);
    private volatile boolean cancelled = false;

    public StreamableMultiInstanceResponse(QueryResponseMessage resultMessage, Class<T> responseType, ReplyChannel<QueryResponse> responseHandler, QuerySerializer serializer, String requestId) {
        this.resultMessage = resultMessage;
        this.responseType = responseType;
        this.responseHandler = responseHandler;
        this.serializer = serializer;
        this.requestId = requestId;
        List payload = (List)resultMessage.payload();
        this.result = payload != null ? payload.iterator() : Collections.emptyIterator();
    }

    public void request(long requested) {
        this.requestedRef.getAndUpdate(current -> {
            try {
                return Math.addExact(requested, current);
            }
            catch (ArithmeticException e) {
                return Long.MAX_VALUE;
            }
        });
        this.stream();
    }

    public void cancel() {
        this.responseHandler.complete();
        this.cancelled = true;
    }

    private void stream() {
        do {
            if (!this.flowGate.compareAndSet(false, true)) {
                return;
            }
            try {
                while (this.requestedRef.get() > 0L && this.result.hasNext() && !this.cancelled) {
                    this.send();
                    this.requestedRef.decrementAndGet();
                }
                if (this.result.hasNext()) continue;
                this.responseHandler.complete();
            }
            finally {
                this.flowGate.set(false);
            }
        } while (this.requestedRef.get() > 0L && this.result.hasNext() && !this.cancelled);
    }

    private void send() {
        GenericMessage delegate = this.firstResponseToBeSent.compareAndSet(true, false) ? new GenericMessage(this.resultMessage.identifier(), new MessageType(this.responseType), this.result.next(), this.responseType, (Map)this.resultMessage.metaData()) : new GenericMessage(new MessageType(this.responseType), this.result.next(), this.responseType, (Map)MetaData.emptyInstance());
        GenericQueryResponseMessage message = new GenericQueryResponseMessage((Message)delegate);
        this.responseHandler.send((Object)this.serializer.serializeResponse((QueryResponseMessage)message, this.requestId));
    }
}

