/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

public class ForwardingReplyChannel<T>
implements ReplyChannel<T> {
    private final AtomicBoolean ackSent = new AtomicBoolean(false);
    private final String instructionId;
    private final String clientId;
    private final StreamObserver<T> stream;
    private final Function<InstructionAck, T> ackBuilder;
    private final Runnable onConsumed;
    private final AtomicBoolean completed = new AtomicBoolean();

    public ForwardingReplyChannel(String instructionId, String clientId, StreamObserver<T> stream, Function<InstructionAck, T> ackBuilder, Runnable onComplete) {
        this.instructionId = instructionId;
        this.clientId = clientId;
        this.stream = stream;
        this.ackBuilder = ackBuilder;
        this.onConsumed = onComplete;
    }

    @Override
    public void send(T outboundMessage) {
        this.stream.onNext(outboundMessage);
    }

    @Override
    public void sendAck() {
        if (this.instructionId != null && !this.instructionId.isEmpty() && this.ackSent.compareAndSet(false, true)) {
            this.stream.onNext(this.ackBuilder.apply(InstructionAck.newBuilder().setInstructionId(this.instructionId).setSuccess(true).build()));
        }
    }

    @Override
    public void complete() {
        this.sendAck();
        this.markConsumed();
    }

    @Override
    public void completeWithError(ErrorMessage errorMessage) {
        this.sendNack(errorMessage);
        this.markConsumed();
    }

    @Override
    public void sendNack(ErrorMessage errorMessage) {
        if (this.instructionId != null && !this.instructionId.isEmpty() && this.ackSent.compareAndSet(false, true)) {
            InstructionAck nack = InstructionAck.newBuilder().setInstructionId(this.instructionId).setError(errorMessage == null ? ErrorMessage.getDefaultInstance() : errorMessage).setSuccess(false).build();
            this.stream.onNext(this.ackBuilder.apply(nack));
        }
    }

    @Override
    public void completeWithError(ErrorCategory errorCategory, String message) {
        this.completeWithError(ErrorMessage.newBuilder().setErrorCode(errorCategory.errorCode()).setLocation(this.clientId).setMessage(message).build());
    }

    private void markConsumed() {
        if (this.completed.compareAndSet(false, true)) {
            this.onConsumed.run();
        }
    }
}

