/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.azure.servicebus;

import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.SubQueue;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.azure.servicebus.ServiceBusConfiguration;
import org.apache.camel.component.azure.servicebus.ServiceBusConsumerOperationDefinition;
import org.apache.camel.component.azure.servicebus.ServiceBusEndpoint;
import org.apache.camel.component.azure.servicebus.client.ServiceBusClientFactory;
import org.apache.camel.component.azure.servicebus.client.ServiceBusReceiverAsyncClientWrapper;
import org.apache.camel.component.azure.servicebus.operations.ServiceBusReceiverOperations;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.scheduler.Schedulers;

public class ServiceBusConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceBusConsumer.class);
    private ServiceBusReceiverAsyncClientWrapper clientWrapper;
    private ServiceBusReceiverOperations operations;
    private final Map<ServiceBusConsumerOperationDefinition, Runnable> operationsToExecute = new EnumMap<ServiceBusConsumerOperationDefinition, Runnable>(ServiceBusConsumerOperationDefinition.class);

    public ServiceBusConsumer(ServiceBusEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.bind(ServiceBusConsumerOperationDefinition.peekMessages, this::peekMessages);
        this.bind(ServiceBusConsumerOperationDefinition.receiveMessages, this::receiveMessages);
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.createAndConnectClient();
    }

    protected void createAndConnectClient() {
        LOG.debug("Creating connection to Azure ServiceBus");
        ServiceBusReceiverAsyncClient client = this.getConfiguration().getReceiverAsyncClient() != null ? this.getConfiguration().getReceiverAsyncClient() : ServiceBusClientFactory.createServiceBusReceiverAsyncClient(this.getConfiguration());
        this.clientWrapper = new ServiceBusReceiverAsyncClientWrapper(client);
        this.operations = new ServiceBusReceiverOperations(this.clientWrapper);
        ServiceBusConsumerOperationDefinition chosenOperation = this.getConfiguration().getConsumerOperation();
        this.invokeOperation(chosenOperation);
    }

    protected void doStop() throws Exception {
        if (this.clientWrapper != null) {
            this.clientWrapper.close();
        }
        super.doStop();
    }

    public ServiceBusConfiguration getConfiguration() {
        return this.getEndpoint().getConfiguration();
    }

    public ServiceBusEndpoint getEndpoint() {
        return (ServiceBusEndpoint)super.getEndpoint();
    }

    private void bind(ServiceBusConsumerOperationDefinition operation, Runnable fn) {
        this.operationsToExecute.put(operation, fn);
    }

    private void invokeOperation(ServiceBusConsumerOperationDefinition operation) {
        ServiceBusConsumerOperationDefinition operationsToInvoke = ObjectHelper.isEmpty((Object)((Object)operation)) ? ServiceBusConsumerOperationDefinition.receiveMessages : operation;
        Runnable fnToInvoke = this.operationsToExecute.get((Object)operationsToInvoke);
        if (fnToInvoke == null) {
            throw new RuntimeCamelException("Operation not supported. Value: " + operationsToInvoke);
        }
        fnToInvoke.run();
    }

    private void receiveMessages() {
        this.operations.receiveMessages().subscribe(this::onEventListener, this::onErrorListener, () -> {});
    }

    private void peekMessages() {
        this.operations.peekMessages(this.getConfiguration().getPeekNumMaxMessages()).subscribe(this::onEventListener, this::onErrorListener, () -> {});
    }

    private void onEventListener(ServiceBusReceivedMessage message) {
        Exchange exchange = this.createServiceBusExchange(message);
        ConsumerOnCompletion onCompletion = new ConsumerOnCompletion(message);
        exchange.getExchangeExtension().addOnCompletion((Synchronization)onCompletion);
        AsyncCallback cb = this.defaultConsumerCallback(exchange, true);
        this.getAsyncProcessor().process(exchange, cb);
    }

    private Exchange createServiceBusExchange(ServiceBusReceivedMessage receivedMessage) {
        Exchange exchange = this.createExchange(true);
        Message message = exchange.getIn();
        message.setBody((Object)receivedMessage.getBody());
        message.setHeader("CamelAzureServiceBusApplicationProperties", (Object)receivedMessage.getApplicationProperties());
        message.setHeader("CamelAzureServiceBusContentType", (Object)receivedMessage.getContentType());
        message.setHeader("CamelAzureServiceBusMessageId", (Object)receivedMessage.getMessageId());
        message.setHeader("CamelAzureServiceBusCorrelationId", (Object)receivedMessage.getCorrelationId());
        message.setHeader("CamelAzureServiceBusDeadLetterErrorDescription", (Object)receivedMessage.getDeadLetterErrorDescription());
        message.setHeader("CamelAzureServiceBusDeadLetterReason", (Object)receivedMessage.getDeadLetterReason());
        message.setHeader("CamelAzureServiceBusDeadLetterSource", (Object)receivedMessage.getDeadLetterSource());
        message.setHeader("CamelAzureServiceBusDeliveryCount", (Object)receivedMessage.getDeliveryCount());
        message.setHeader("CamelAzureServiceBusScheduledEnqueueTime", (Object)receivedMessage.getScheduledEnqueueTime());
        message.setHeader("CamelAzureServiceBusEnqueuedSequenceNumber", (Object)receivedMessage.getEnqueuedSequenceNumber());
        message.setHeader("CamelAzureServiceBusEnqueuedTime", (Object)receivedMessage.getEnqueuedTime());
        message.setHeader("CamelAzureServiceBusExpiresAt", (Object)receivedMessage.getExpiresAt());
        message.setHeader("CamelAzureServiceBusLockToken", (Object)receivedMessage.getLockToken());
        message.setHeader("CamelAzureServiceBusLockedUntil", (Object)receivedMessage.getLockedUntil());
        message.setHeader("CamelAzureServiceBusPartitionKey", (Object)receivedMessage.getPartitionKey());
        message.setHeader("CamelAzureServiceBusRawAmqpMessage", (Object)receivedMessage.getRawAmqpMessage());
        message.setHeader("CamelAzureServiceBusReplyTo", (Object)receivedMessage.getReplyTo());
        message.setHeader("CamelAzureServiceBusReplyToSessionId", (Object)receivedMessage.getReplyToSessionId());
        message.setHeader("CamelAzureServiceBusSequenceNumber", (Object)receivedMessage.getSequenceNumber());
        message.setHeader("CamelAzureServiceBusSessionId", (Object)receivedMessage.getSessionId());
        message.setHeader("CamelAzureServiceBusSubject", (Object)receivedMessage.getSubject());
        message.setHeader("CamelAzureServiceBusTimeToLive", (Object)receivedMessage.getTimeToLive());
        message.setHeader("CamelAzureServiceBusTo", (Object)receivedMessage.getTo());
        HeaderFilterStrategy headerFilterStrategy = this.getConfiguration().getHeaderFilterStrategy();
        message.setHeaders(receivedMessage.getApplicationProperties().entrySet().stream().filter(entry -> !headerFilterStrategy.applyFilterToExternalHeaders((String)entry.getKey(), entry.getValue(), exchange)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
        return exchange;
    }

    private void onErrorListener(Throwable errorContext) {
        LOG.warn("Error from Azure ServiceBus due to {} - Reconnecting in {} seconds", (Object)errorContext.getMessage(), (Object)this.getConfiguration().getReconnectDelay());
        if (LOG.isDebugEnabled()) {
            LOG.warn("Error from Azure ServiceBus (incl stacktrace)", errorContext);
        }
        if (this.getConfiguration().getReconnectDelay() > 0) {
            try {
                Thread.sleep(this.getConfiguration().getReconnectDelay());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        try {
            this.clientWrapper.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.createAndConnectClient();
    }

    private class ConsumerOnCompletion
    extends SynchronizationAdapter {
        private final ServiceBusReceivedMessage message;

        public ConsumerOnCompletion(ServiceBusReceivedMessage message) {
            this.message = message;
        }

        public void onComplete(Exchange exchange) {
            super.onComplete(exchange);
            if (!ServiceBusConsumer.this.getConfiguration().isDisableAutoComplete()) {
                ServiceBusConsumer.this.clientWrapper.complete(this.message).subscribeOn(Schedulers.boundedElastic()).subscribe();
            }
        }

        public void onFailure(Exchange exchange) {
            Exception cause = exchange.getException();
            if (cause != null) {
                ServiceBusConsumer.this.getExceptionHandler().handleException("Error during processing exchange.", exchange, (Throwable)cause);
            }
            if (!ServiceBusConsumer.this.getConfiguration().isDisableAutoComplete()) {
                if (ServiceBusConsumer.this.getConfiguration().isEnableDeadLettering() && (ObjectHelper.isEmpty((Object)ServiceBusConsumer.this.getConfiguration().getSubQueue()) || ObjectHelper.equal((Object)ServiceBusConsumer.this.getConfiguration().getSubQueue(), (Object)SubQueue.NONE))) {
                    DeadLetterOptions deadLetterOptions = new DeadLetterOptions();
                    if (cause != null) {
                        deadLetterOptions.setDeadLetterReason(String.format("%s: %s", cause.getClass().getName(), cause.getMessage()));
                        deadLetterOptions.setDeadLetterErrorDescription(Arrays.stream(cause.getStackTrace()).map(StackTraceElement::toString).collect(Collectors.joining("\n")));
                    }
                    ServiceBusConsumer.this.clientWrapper.deadLetter(this.message, deadLetterOptions).subscribeOn(Schedulers.boundedElastic()).subscribe();
                } else {
                    ServiceBusConsumer.this.clientWrapper.abandon(this.message).subscribeOn(Schedulers.boundedElastic()).subscribe();
                }
            }
        }
    }
}

