/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.InstructionHandler;
import io.axoniq.axonserver.connector.impl.FlowControlledStream;
import io.axoniq.axonserver.connector.impl.ForwardingReplyChannel;
import io.axoniq.axonserver.connector.impl.SynchronizedRequestStream;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractIncomingInstructionStream<IN, OUT>
extends FlowControlledStream<IN, OUT> {
    private static final InstructionAck NO_HANDLER_FOR_INSTRUCTION = InstructionAck.newBuilder().setSuccess(false).setError(ErrorMessage.newBuilder().setErrorCode(ErrorCategory.UNSUPPORTED_INSTRUCTION.errorCode()).setMessage("No handler for instruction").build()).build();
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final Consumer<Throwable> disconnectHandler;
    private StreamObserver<OUT> instructionsForPlatform;

    public AbstractIncomingInstructionStream(String clientId, int permits, int permitsBatch, Consumer<Throwable> disconnectHandler) {
        super(clientId, permits, permitsBatch);
        this.disconnectHandler = disconnectHandler;
    }

    public void onNext(IN value) {
        InstructionHandler<IN, OUT> handler = this.getHandler(value);
        if (handler == null) {
            this.markConsumed();
            String instructionId = this.getInstructionId(value);
            if (instructionId != null && !instructionId.isEmpty()) {
                this.instructionsForPlatform.onNext(this.buildAckMessage(NO_HANDLER_FOR_INSTRUCTION));
            }
        } else {
            ForwardingReplyChannel<Object> replyChannel = new ForwardingReplyChannel<Object>(this.getInstructionId(value), this.clientId(), this.instructionsForPlatform, this::buildAckMessage, this::markConsumed);
            handler.handle(value, replyChannel);
        }
    }

    protected abstract OUT buildAckMessage(InstructionAck var1);

    protected abstract String getInstructionId(IN var1);

    protected abstract InstructionHandler<IN, OUT> getHandler(IN var1);

    public void onCompleted() {
        this.logger.debug("Stream completed from server side");
        if (this.unregisterOutboundStream(this.instructionsForPlatform)) {
            this.instructionsForPlatform.onCompleted();
        }
    }

    public void onError(Throwable t) {
        this.logger.debug("Error received", t);
        if (this.unregisterOutboundStream(this.instructionsForPlatform)) {
            this.logger.debug("Instruction stream disconnected. Scheduling reconnect");
            this.disconnectHandler.accept(t);
            this.instructionsForPlatform.onCompleted();
        }
    }

    @Override
    public void beforeStart(ClientCallStreamObserver<OUT> requestStream) {
        SynchronizedRequestStream<OUT> synchronizedRequestStream = new SynchronizedRequestStream<OUT>(requestStream);
        super.beforeStart(synchronizedRequestStream);
        this.instructionsForPlatform = synchronizedRequestStream;
    }

    public StreamObserver<OUT> getInstructionsForPlatform() {
        return this.outboundStream();
    }

    protected abstract boolean unregisterOutboundStream(StreamObserver<OUT> var1);
}

