package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.LockContainer;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.temporal.TemporalAmount;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

/* loaded from: input_file:com/azure/messaging/servicebus/FluxAutoLockRenew.class */
final class FluxAutoLockRenew extends FluxOperator<ServiceBusMessageContext, ServiceBusMessageContext> {
    private final ClientLogger logger;
    private final Function<String, Mono<OffsetDateTime>> onRenewLock;
    private final LockContainer<LockRenewalOperation> messageLockContainer;
    private final ReceiverOptions receivingOptions;

    /* loaded from: input_file:com/azure/messaging/servicebus/FluxAutoLockRenew$LockRenewSubscriber.class */
    static final class LockRenewSubscriber extends BaseSubscriber<ServiceBusMessageContext> {
        private static final Consumer<ServiceBusMessageContext> LOCK_RENEW_NO_OP = serviceBusMessageContext -> {
        };
        private final ClientLogger logger = new ClientLogger(LockRenewSubscriber.class);
        private final Function<String, Mono<OffsetDateTime>> onRenewLock;
        private final Duration maxAutoLockRenewal;
        private final LockContainer<LockRenewalOperation> messageLockContainer;
        private final CoreSubscriber<? super ServiceBusMessageContext> actual;
        private final boolean isAutoCompleteEnabled;

        LockRenewSubscriber(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber, Duration duration, LockContainer<LockRenewalOperation> lockContainer, Function<String, Mono<OffsetDateTime>> function, boolean z) {
            this.onRenewLock = (Function) Objects.requireNonNull(function, "'onRenewLock' cannot be null.");
            this.actual = (CoreSubscriber) Objects.requireNonNull(coreSubscriber, "'downstream' cannot be null.");
            this.messageLockContainer = (LockContainer) Objects.requireNonNull(lockContainer, "'messageLockContainer' cannot be null.");
            this.maxAutoLockRenewal = (Duration) Objects.requireNonNull(duration, "'maxAutoLockRenewDuration' cannot be null.");
            this.isAutoCompleteEnabled = z;
        }

        protected void hookOnSubscribe(Subscription subscription) {
            Objects.requireNonNull(subscription, "'subscription' cannot be null.");
            this.actual.onSubscribe(subscription);
        }

        public void hookOnComplete() {
            this.logger.verbose("Upstream has completed.");
            this.actual.onComplete();
        }

        protected void hookOnError(Throwable th) {
            this.logger.error("Errors occurred upstream.", new Object[]{th});
            this.actual.onError(th);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(ServiceBusMessageContext serviceBusMessageContext) {
            Consumer<ServiceBusMessageContext> consumer;
            ServiceBusReceivedMessage message = serviceBusMessageContext.getMessage();
            if (message != null) {
                String lockToken = message.getLockToken();
                OffsetDateTime lockedUntil = message.getLockedUntil();
                if (Objects.isNull(lockToken)) {
                    this.logger.atWarning().addKeyValue(ServiceBusConstants.SEQUENCE_NUMBER_KEY, message.getSequenceNumber()).log("Unexpected, LockToken is not present in message.");
                    return;
                }
                if (Objects.isNull(lockedUntil)) {
                    this.logger.atWarning().addKeyValue(ServiceBusConstants.SEQUENCE_NUMBER_KEY, message.getSequenceNumber()).log("Unexpected, lockedUntil is not present in message.");
                    return;
                }
                LockRenewalOperation lockRenewalOperation = new LockRenewalOperation(lockToken, this.maxAutoLockRenewal, false, this.onRenewLock.andThen(mono -> {
                    return mono.map(offsetDateTime -> {
                        message.setLockedUntil(offsetDateTime);
                        return offsetDateTime;
                    });
                }), lockedUntil);
                try {
                    this.messageLockContainer.addOrUpdate(lockToken, OffsetDateTime.now().plus((TemporalAmount) this.maxAutoLockRenewal), lockRenewalOperation);
                } catch (Exception e) {
                    this.logger.atInfo().addKeyValue(ServiceBusConstants.LOCK_TOKEN_KEY, lockToken).log("Exception occurred while updating lockContainer.", new Object[]{e});
                }
                consumer = serviceBusMessageContext2 -> {
                    lockRenewalOperation.close();
                    this.messageLockContainer.remove(serviceBusMessageContext2.getMessage().getLockToken());
                };
            } else {
                consumer = LOCK_RENEW_NO_OP;
            }
            try {
                try {
                    this.actual.onNext(serviceBusMessageContext);
                    if (this.isAutoCompleteEnabled) {
                        consumer.accept(serviceBusMessageContext);
                    }
                } catch (Exception e2) {
                    this.logger.info("Exception occurred while handling downstream onNext operation.", new Object[]{e2});
                    if (this.isAutoCompleteEnabled) {
                        consumer.accept(serviceBusMessageContext);
                    }
                }
            } catch (Throwable th) {
                if (this.isAutoCompleteEnabled) {
                    consumer.accept(serviceBusMessageContext);
                }
                throw th;
            }
        }

        public Context currentContext() {
            return this.actual.currentContext();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FluxAutoLockRenew(Flux<? extends ServiceBusMessageContext> flux, ReceiverOptions receiverOptions, LockContainer<LockRenewalOperation> lockContainer, Function<String, Mono<OffsetDateTime>> function) {
        super(flux);
        this.logger = new ClientLogger(FluxAutoLockRenew.class);
        this.receivingOptions = (ReceiverOptions) Objects.requireNonNull(receiverOptions, "'receiverOptions' cannot be null.");
        this.onRenewLock = (Function) Objects.requireNonNull(function, "'onRenewLock' cannot be null.");
        this.messageLockContainer = (LockContainer) Objects.requireNonNull(lockContainer, "'messageLockContainer' cannot be null.");
        Duration maxLockRenewDuration = receiverOptions.getMaxLockRenewDuration();
        Objects.requireNonNull(maxLockRenewDuration, "'receivingOptions.maxAutoLockRenewDuration' cannot be null.");
        if (maxLockRenewDuration.isNegative() || maxLockRenewDuration.isZero()) {
            throw this.logger.logExceptionAsError(new IllegalArgumentException("'receivingOptions.maxLockRenewalDuration' should not be zero or negative."));
        }
    }

    public void subscribe(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");
        this.source.subscribe(new LockRenewSubscriber(coreSubscriber, this.receivingOptions.getMaxLockRenewDuration(), this.messageLockContainer, this.onRenewLock, this.receivingOptions.isEnableAutoComplete()));
    }
}
