package com.azure.core.amqp.implementation.handler;

import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.util.logging.ClientLogger;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.class */
public class ReceiveLinkHandler extends LinkHandler {
    private final String linkName;
    private AtomicBoolean isFirstResponse;
    private final DirectProcessor<Delivery> deliveries;
    private FluxSink<Delivery> deliverySink;

    public ReceiveLinkHandler(String str, String str2, String str3, String str4) {
        super(str, str2, str4, new ClientLogger(ReceiveLinkHandler.class));
        this.isFirstResponse = new AtomicBoolean(true);
        this.deliveries = DirectProcessor.create();
        this.deliverySink = this.deliveries.sink(FluxSink.OverflowStrategy.BUFFER);
        this.linkName = str3;
    }

    public String getLinkName() {
        return this.linkName;
    }

    public Flux<Delivery> getDeliveredMessages() {
        return this.deliveries;
    }

    @Override // com.azure.core.amqp.implementation.handler.Handler, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.deliverySink.complete();
        super.close();
    }

    public void onLinkLocalOpen(Event event) {
        Link link = event.getLink();
        if (link instanceof Receiver) {
            this.logger.info("onLinkLocalOpen connectionId[{}], linkName[{}], localSource[{}]", new Object[]{getConnectionId(), link.getName(), link.getSource()});
        }
    }

    public void onLinkRemoteOpen(Event event) {
        Link link = event.getLink();
        if (link instanceof Receiver) {
            if (link.getRemoteSource() == null) {
                this.logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], action[waitingForError]", new Object[]{getConnectionId(), link.getName()});
                return;
            }
            this.logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteSource[{}]", new Object[]{getConnectionId(), link.getName(), link.getRemoteSource()});
            if (this.isFirstResponse.getAndSet(false)) {
                onNext(EndpointState.ACTIVE);
            }
        }
    }

    public void onDelivery(Event event) {
        if (this.isFirstResponse.getAndSet(false)) {
            onNext(EndpointState.ACTIVE);
        }
        Delivery delivery = event.getDelivery();
        Receiver link = delivery.getLink();
        if (!delivery.isPartial()) {
            if (!delivery.isSettled()) {
                this.deliverySink.next(delivery);
            } else if (link != null) {
                this.logger.verbose("onDelivery connectionId[{}], linkName[{}], updatedLinkCredit[{}], remoteCredit[{}], remoteCondition[{}], delivery.isSettled[{}]", new Object[]{getConnectionId(), link.getName(), Integer.valueOf(link.getCredit()), Integer.valueOf(link.getRemoteCredit()), link.getRemoteCondition(), Boolean.valueOf(delivery.isSettled())});
            } else {
                this.logger.warning("connectionId[{}], delivery.isSettled[{}]", new Object[]{getConnectionId(), Boolean.valueOf(delivery.isSettled())});
            }
        }
        if (link != null) {
            this.logger.verbose("onDelivery connectionId[{}], linkName[{}], updatedLinkCredit[{}], remoteCredit[{}], remoteCondition[{}], delivery.isPartial[{}]", new Object[]{getConnectionId(), link.getName(), Integer.valueOf(link.getCredit()), Integer.valueOf(link.getRemoteCredit()), link.getRemoteCondition(), Boolean.valueOf(delivery.isPartial())});
        }
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler
    public /* bridge */ /* synthetic */ AmqpErrorContext getErrorContext(Link link) {
        return super.getErrorContext(link);
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler
    public /* bridge */ /* synthetic */ void onLinkFinal(Event event) {
        super.onLinkFinal(event);
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler
    public /* bridge */ /* synthetic */ void onLinkRemoteDetach(Event event) {
        super.onLinkRemoteDetach(event);
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler
    public /* bridge */ /* synthetic */ void onLinkRemoteClose(Event event) {
        super.onLinkRemoteClose(event);
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler
    public /* bridge */ /* synthetic */ void onLinkLocalClose(Event event) {
        super.onLinkLocalClose(event);
    }
}
