/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.grpc.FlowControl;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FlowControlledStream<IN, OUT>
implements ClientResponseObserver<OUT, IN> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final AtomicInteger permitsConsumed = new AtomicInteger();
    private final String clientId;
    private final int permits;
    private final int permitsBatch;
    private final FlowControl flowControl;
    private ClientCallStreamObserver<OUT> outboundStream;

    public FlowControlledStream(String clientId, int permits, int permitsBatch) {
        this.clientId = clientId;
        this.permits = permits;
        this.permitsBatch = permitsBatch;
        this.flowControl = FlowControl.newBuilder().setPermits(permitsBatch).setClientId(clientId).build();
    }

    public void enableFlowControl() {
        if (this.permitsBatch > 0) {
            this.permitsConsumed.set(0);
            OUT out = this.buildInitialFlowControlMessage(FlowControl.newBuilder().setPermits(this.permits).setClientId(this.clientId).build());
            if (out != null) {
                this.outboundStream().onNext(out);
            }
        }
    }

    protected abstract OUT buildFlowControlMessage(FlowControl var1);

    protected OUT buildInitialFlowControlMessage(FlowControl flowControl) {
        return this.buildFlowControlMessage(flowControl);
    }

    protected String clientId() {
        return this.clientId;
    }

    protected void markConsumed() {
        OUT permitsRequest;
        int ticker;
        if (this.permitsBatch > 0 && (ticker = this.permitsConsumed.updateAndGet(current -> {
            if (current == this.permitsBatch - 1) {
                return 0;
            }
            return current + 1;
        })) == 0 && (permitsRequest = this.buildFlowControlMessage(this.flowControl)) != null) {
            logger.debug("Requesting additional {} permits", (Object)this.permitsBatch);
            this.outboundStream().onNext(permitsRequest);
        }
    }

    public void beforeStart(ClientCallStreamObserver<OUT> requestStream) {
        this.outboundStream = requestStream;
    }

    protected ClientCallStreamObserver<OUT> outboundStream() {
        return this.outboundStream;
    }
}

