package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorReceiver;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.class */
public class ServiceBusReactorReceiver extends ReactorReceiver implements ServiceBusReceiveLink {
    private static final Message EMPTY_MESSAGE = Proton.message();
    private final ClientLogger logger;
    private final ConcurrentHashMap<String, Delivery> unsettledDeliveries;
    private final ConcurrentHashMap<String, UpdateDispositionWorkItem> pendingUpdates;
    private final AtomicBoolean isDisposed;
    private final Disposable subscription;
    private final Receiver receiver;
    private final boolean isSettled;
    private final Duration timeout;
    private final AmqpRetryPolicy retryPolicy;
    private final ReceiveLinkHandler handler;
    private final ReactorProvider provider;
    private final Mono<String> sessionIdMono;
    private final Mono<OffsetDateTime> sessionLockedUntil;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.azure.messaging.servicebus.implementation.ServiceBusReactorReceiver$1, reason: invalid class name */
    /* loaded from: input_file:com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType = new int[DeliveryState.DeliveryStateType.values().length];

        static {
            try {
                $SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType[DeliveryState.DeliveryStateType.Rejected.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType[DeliveryState.DeliveryStateType.Released.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver$UpdateDispositionWorkItem.class */
    public static final class UpdateDispositionWorkItem {
        private final String lockToken;
        private final DeliveryState state;
        private final Duration timeout;
        private final AtomicInteger retryAttempts;
        private final AtomicBoolean isDisposed;
        private Mono<Void> mono;
        private Instant expirationTime;
        private MonoSink<Void> sink;
        private Throwable throwable;

        private UpdateDispositionWorkItem(String str, DeliveryState deliveryState, Duration duration) {
            this.retryAttempts = new AtomicInteger();
            this.isDisposed = new AtomicBoolean();
            this.lockToken = str;
            this.state = deliveryState;
            this.timeout = duration;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean hasTimedout() {
            return this.expirationTime.isBefore(Instant.now());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void resetStartTime() {
            this.expirationTime = Instant.now().plus((TemporalAmount) this.timeout);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int incrementRetry() {
            return this.retryAttempts.incrementAndGet();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Throwable getLastException() {
            return this.throwable;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setLastException(Throwable th) {
            this.throwable = th;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setMono(Mono<Void> mono) {
            this.mono = mono;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Mono<Void> getMono() {
            return this.mono;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public MonoSink<Void> getSink() {
            return this.sink;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void start(MonoSink<Void> monoSink) {
            Objects.requireNonNull(monoSink, "'sink' cannot be null.");
            this.sink = monoSink;
            this.sink.onDispose(() -> {
                this.isDisposed.set(true);
            });
            this.sink.onCancel(() -> {
                this.isDisposed.set(true);
            });
            resetStartTime();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public DeliveryState getDeliveryState() {
            return this.state;
        }

        public String getLockToken() {
            return this.lockToken;
        }

        /* synthetic */ UpdateDispositionWorkItem(String str, DeliveryState deliveryState, Duration duration, AnonymousClass1 anonymousClass1) {
            this(str, deliveryState, duration);
        }
    }

    public ServiceBusReactorReceiver(AmqpConnection amqpConnection, String str, Receiver receiver, ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorProvider reactorProvider, Duration duration, AmqpRetryPolicy amqpRetryPolicy) {
        super(amqpConnection, str, receiver, receiveLinkHandler, tokenManager, reactorProvider.getReactorDispatcher(), amqpRetryPolicy.getRetryOptions());
        this.logger = new ClientLogger(ServiceBusReactorReceiver.class);
        this.unsettledDeliveries = new ConcurrentHashMap<>();
        this.pendingUpdates = new ConcurrentHashMap<>();
        this.isDisposed = new AtomicBoolean();
        this.receiver = receiver;
        this.handler = receiveLinkHandler;
        this.provider = reactorProvider;
        this.isSettled = receiver.getSenderSettleMode() == SenderSettleMode.SETTLED;
        this.timeout = duration;
        this.retryPolicy = amqpRetryPolicy;
        this.subscription = Flux.interval(duration).subscribe(l -> {
            cleanupWorkItems();
        });
        this.sessionIdMono = getEndpointStates().filter(amqpEndpointState -> {
            return amqpEndpointState == AmqpEndpointState.ACTIVE;
        }).next().flatMap(amqpEndpointState2 -> {
            Object obj = receiver.getRemoteSource().getFilter().get(ServiceBusReactorSession.SESSION_FILTER);
            if (obj != null) {
                return Mono.just(String.valueOf(obj));
            }
            this.logger.info("entityPath[{}], linkName[{}]. There is no session id.", new Object[]{str, getLinkName()});
            return Mono.empty();
        }).cache(str2 -> {
            return Duration.ofMillis(Long.MAX_VALUE);
        }, th -> {
            return Duration.ZERO;
        }, () -> {
            return Duration.ZERO;
        });
        this.sessionLockedUntil = getEndpointStates().filter(amqpEndpointState3 -> {
            return amqpEndpointState3 == AmqpEndpointState.ACTIVE;
        }).next().map(amqpEndpointState4 -> {
            if (receiver.getRemoteProperties() != null && receiver.getRemoteProperties().containsKey(ServiceBusReactorSession.LOCKED_UNTIL_UTC)) {
                return MessageUtils.convertDotNetTicksToOffsetDateTime(((Long) receiver.getRemoteProperties().get(ServiceBusReactorSession.LOCKED_UNTIL_UTC)).longValue());
            }
            this.logger.info("entityPath[{}], linkName[{}]. Locked until not set.", new Object[]{str, getLinkName()});
            return Instant.EPOCH.atOffset(ZoneOffset.UTC);
        }).cache(offsetDateTime -> {
            return Duration.ofMillis(Long.MAX_VALUE);
        }, th2 -> {
            return Duration.ZERO;
        }, () -> {
            return Duration.ZERO;
        });
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink
    public Mono<Void> updateDisposition(String str, DeliveryState deliveryState) {
        return this.isDisposed.get() ? FluxUtil.monoError(this.logger, new IllegalStateException("Cannot perform operations on a disposed receiver.")) : updateDispositionInternal(str, deliveryState);
    }

    public Flux<Message> receive() {
        return super.receive().filter(message -> {
            return message != EMPTY_MESSAGE;
        }).publishOn(Schedulers.boundedElastic());
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink
    public Mono<String> getSessionId() {
        return this.sessionIdMono;
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink
    public Mono<OffsetDateTime> getSessionLockedUntil() {
        return this.sessionLockedUntil;
    }

    public Mono<Void> closeAsync() {
        Mono empty;
        if (this.isDisposed.getAndSet(true)) {
            return super.closeAsync();
        }
        cleanupWorkItems();
        if (this.pendingUpdates.isEmpty()) {
            empty = Mono.empty();
        } else {
            ArrayList arrayList = new ArrayList();
            StringJoiner stringJoiner = new StringJoiner(", ");
            for (UpdateDispositionWorkItem updateDispositionWorkItem : this.pendingUpdates.values()) {
                if (!updateDispositionWorkItem.hasTimedout()) {
                    if (updateDispositionWorkItem.getDeliveryState() instanceof TransactionalState) {
                        arrayList.add(updateDispositionInternal(updateDispositionWorkItem.getLockToken(), Released.getInstance()));
                    } else {
                        arrayList.add(updateDispositionWorkItem.getMono());
                    }
                    stringJoiner.add(updateDispositionWorkItem.getLockToken());
                }
            }
            this.logger.info("Waiting for pending updates to complete. Locks: {}", new Object[]{stringJoiner.toString()});
            empty = Mono.when(arrayList);
        }
        return empty.onErrorResume(th -> {
            this.logger.info("There was an exception while disposing of all links.", new Object[]{th});
            return Mono.empty();
        }).doFinally(signalType -> {
            this.subscription.dispose();
        }).then(super.closeAsync());
    }

    protected Message decodeDelivery(Delivery delivery) {
        byte[] tag = delivery.getTag();
        UUID convertDotNetBytesToUUID = (tag == null || tag.length != 16) ? MessageUtils.ZERO_LOCK_TOKEN : MessageUtils.convertDotNetBytesToUUID(tag);
        String uuid = convertDotNetBytesToUUID.toString();
        if (convertDotNetBytesToUUID != MessageUtils.ZERO_LOCK_TOKEN && this.unsettledDeliveries.containsKey(uuid)) {
            updateOutcome(uuid, delivery);
            return EMPTY_MESSAGE;
        }
        int pending = delivery.pending();
        byte[] bArr = new byte[pending];
        int recv = this.receiver.recv(bArr, 0, pending);
        Message message = Proton.message();
        message.decode(bArr, 0, recv);
        if (this.isSettled) {
            delivery.disposition(Accepted.getInstance());
            delivery.settle();
        } else {
            this.unsettledDeliveries.putIfAbsent(convertDotNetBytesToUUID.toString(), delivery);
            this.receiver.advance();
        }
        return new MessageWithLockToken(message, convertDotNetBytesToUUID);
    }

    private Mono<Void> updateDispositionInternal(String str, DeliveryState deliveryState) {
        Delivery delivery = this.unsettledDeliveries.get(str);
        if (delivery == null) {
            this.logger.warning("entityPath[{}], linkName[{}], deliveryTag[{}]. Delivery not found to update disposition.", new Object[]{getEntityPath(), getLinkName(), str});
            return FluxUtil.monoError(this.logger, Exceptions.propagate(new IllegalArgumentException("Delivery not on receive link.")));
        }
        UpdateDispositionWorkItem updateDispositionWorkItem = new UpdateDispositionWorkItem(str, deliveryState, this.timeout, null);
        Mono<Void> cache = Mono.create(monoSink -> {
            updateDispositionWorkItem.start(monoSink);
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    delivery.disposition(deliveryState);
                    this.pendingUpdates.put(str, updateDispositionWorkItem);
                });
            } catch (IOException e) {
                monoSink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.", e, this.handler.getErrorContext(this.receiver)));
            }
        }).cache();
        updateDispositionWorkItem.setMono(cache);
        return cache;
    }

    private void updateOutcome(String str, Delivery delivery) {
        Outcome remoteState = delivery.getRemoteState();
        this.logger.verbose("entityPath[{}], linkName[{}], deliveryTag[{}], state[{}] Received update disposition delivery.", new Object[]{getEntityPath(), getLinkName(), str, remoteState});
        Outcome outcome = remoteState instanceof Outcome ? remoteState : remoteState instanceof TransactionalState ? ((TransactionalState) remoteState).getOutcome() : null;
        if (outcome == null) {
            this.logger.warning("linkName[{}], deliveryTag[{}]. No outcome associated with delivery. Delivery: {}", new Object[]{getLinkName(), str, delivery});
            return;
        }
        UpdateDispositionWorkItem updateDispositionWorkItem = this.pendingUpdates.get(str);
        if (updateDispositionWorkItem == null) {
            this.logger.warning("linkName[{}], deliveryTag[{}]. No pending update for delivery. Delivery: {}", new Object[]{getLinkName(), str, delivery});
            return;
        }
        if (remoteState.getType() == updateDispositionWorkItem.getDeliveryState().getType()) {
            completeWorkItem(str, delivery, updateDispositionWorkItem.getSink(), null);
            return;
        }
        this.logger.info("Received delivery '{}' state '{}' doesn't match expected state '{}'", new Object[]{str, remoteState, updateDispositionWorkItem.getDeliveryState()});
        switch (AnonymousClass1.$SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType[remoteState.getType().ordinal()]) {
            case 1:
                ErrorCondition error = ((Rejected) outcome).getError();
                Exception exception = ExceptionUtil.toException(error.getCondition().toString(), error.getDescription(), this.handler.getErrorContext(this.receiver));
                if (this.retryPolicy.calculateRetryDelay(exception, updateDispositionWorkItem.incrementRetry()) == null) {
                    this.logger.info("deliveryTag[{}], state[{}]. Retry attempts exhausted.", new Object[]{str, exception});
                    completeWorkItem(str, delivery, updateDispositionWorkItem.getSink(), exception);
                    return;
                }
                updateDispositionWorkItem.setLastException(exception);
                updateDispositionWorkItem.resetStartTime();
                try {
                    this.provider.getReactorDispatcher().invoke(() -> {
                        delivery.disposition(updateDispositionWorkItem.getDeliveryState());
                    });
                    return;
                } catch (IOException e) {
                    completeWorkItem(str, delivery, updateDispositionWorkItem.getSink(), this.logger.logExceptionAsError(new AmqpException(false, "linkName[%s], deliveryTag[%s]. Retrying updateDisposition failed to dispatch to Reactor.", e, this.handler.getErrorContext(this.receiver))));
                    return;
                }
            case 2:
                AmqpException amqpException = new AmqpException(false, AmqpErrorCondition.OPERATION_CANCELLED, "AMQP layer unexpectedly aborted or disconnected.", this.handler.getErrorContext(this.receiver));
                this.logger.info("deliveryTag[{}], state[{}]. Completing pending updateState operation with exception.", new Object[]{str, remoteState.getType(), amqpException});
                completeWorkItem(str, delivery, updateDispositionWorkItem.getSink(), amqpException);
                return;
            default:
                AmqpException amqpException2 = new AmqpException(false, outcome.toString(), this.handler.getErrorContext(this.receiver));
                this.logger.info("deliveryTag[{}], state[{}] Completing pending updateState operation with exception.", new Object[]{str, remoteState.getType(), amqpException2});
                completeWorkItem(str, delivery, updateDispositionWorkItem.getSink(), amqpException2);
                return;
        }
    }

    private void cleanupWorkItems() {
        this.logger.verbose("linkName[{}]: Cleaning timed out update work tasks.", new Object[]{getLinkName()});
        this.pendingUpdates.forEach((str, updateDispositionWorkItem) -> {
            if (updateDispositionWorkItem == null || !updateDispositionWorkItem.hasTimedout()) {
                return;
            }
            this.pendingUpdates.remove(str);
            completeWorkItem(str, null, updateDispositionWorkItem.getSink(), updateDispositionWorkItem.getLastException() != null ? updateDispositionWorkItem.getLastException() : new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, "Update disposition request timed out.", this.handler.getErrorContext(this.receiver)));
        });
    }

    private void completeWorkItem(String str, Delivery delivery, MonoSink<Void> monoSink, Throwable th) {
        boolean z = delivery != null && delivery.remotelySettled();
        if (z) {
            delivery.settle();
        }
        if (th != null) {
            monoSink.error(th instanceof RuntimeException ? this.logger.logExceptionAsError((RuntimeException) th) : th);
        } else {
            monoSink.success();
        }
        if (z) {
            this.pendingUpdates.remove(str);
            this.unsettledDeliveries.remove(str);
        }
    }
}
