/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbit;

import com.rabbitmq.client.Channel;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIllegalStateException;
import org.springframework.amqp.core.AmqpMessageReturnedException;
import org.springframework.amqp.core.AmqpReplyTimeoutException;
import org.springframework.amqp.core.AsyncAmqpTemplate;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannel;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SmartMessageConverter;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.expression.Expression;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

public class AsyncRabbitTemplate
implements AsyncAmqpTemplate,
ChannelAwareMessageListener,
RabbitTemplate.ReturnCallback,
RabbitTemplate.ConfirmCallback,
BeanNameAware,
SmartLifecycle {
    public static final int DEFAULT_RECEIVE_TIMEOUT = 30000;
    private final Log logger = LogFactory.getLog(this.getClass());
    private final RabbitTemplate template;
    private final AbstractMessageListenerContainer container;
    private final DirectReplyToMessageListenerContainer directReplyToContainer;
    private final String replyAddress;
    private final ConcurrentMap<String, RabbitFuture<?>> pending = new ConcurrentHashMap();
    private final CorrelationMessagePostProcessor<?> messagePostProcessor = new CorrelationMessagePostProcessor();
    private volatile boolean running;
    private volatile boolean enableConfirms;
    private volatile long receiveTimeout = 30000L;
    private int phase;
    private boolean autoStartup = true;
    private String beanName;
    private TaskScheduler taskScheduler;
    private boolean internalTaskScheduler = true;

    public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey, String replyQueue) {
        this(connectionFactory, exchange, routingKey, replyQueue, null);
    }

    public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey, String replyQueue, String replyAddress) {
        Assert.notNull((Object)connectionFactory, (String)"'connectionFactory' cannot be null");
        Assert.notNull((Object)routingKey, (String)"'routingKey' cannot be null");
        Assert.notNull((Object)replyQueue, (String)"'replyQueue' cannot be null");
        this.template = new RabbitTemplate(connectionFactory);
        this.template.setExchange(exchange == null ? "" : exchange);
        this.template.setRoutingKey(routingKey);
        this.container = new SimpleMessageListenerContainer(connectionFactory);
        this.container.setQueueNames(replyQueue);
        this.container.setMessageListener(this);
        this.container.afterPropertiesSet();
        this.directReplyToContainer = null;
        this.replyAddress = replyAddress == null ? replyQueue : replyAddress;
    }

    public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container) {
        this(template, container, null);
    }

    public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container, String replyAddress) {
        Assert.notNull((Object)template, (String)"'template' cannot be null");
        Assert.notNull((Object)container, (String)"'container' cannot be null");
        this.template = template;
        this.container = container;
        this.container.setMessageListener(this);
        this.directReplyToContainer = null;
        this.replyAddress = replyAddress == null ? container.getQueueNames()[0] : replyAddress;
    }

    public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey) {
        Assert.notNull((Object)connectionFactory, (String)"'connectionFactory' cannot be null");
        Assert.notNull((Object)routingKey, (String)"'routingKey' cannot be null");
        this.template = new RabbitTemplate(connectionFactory);
        this.template.setExchange(exchange == null ? "" : exchange);
        this.template.setRoutingKey(routingKey);
        this.container = null;
        this.replyAddress = null;
        this.directReplyToContainer = new DirectReplyToMessageListenerContainer(this.template.getConnectionFactory());
        this.directReplyToContainer.setMessageListener(this);
    }

    public AsyncRabbitTemplate(RabbitTemplate template) {
        Assert.notNull((Object)template, (String)"'template' cannot be null");
        this.template = template;
        this.container = null;
        this.replyAddress = null;
        this.directReplyToContainer = new DirectReplyToMessageListenerContainer(this.template.getConnectionFactory());
        this.directReplyToContainer.setMessageListener(this);
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public void setMandatory(boolean mandatory) {
        this.template.setReturnCallback(this);
        this.template.setMandatory(mandatory);
    }

    public void setMandatoryExpression(Expression mandatoryExpression) {
        this.template.setReturnCallback(this);
        this.template.setMandatoryExpression(mandatoryExpression);
    }

    public void setMandatoryExpressionString(String mandatoryExpression) {
        this.template.setReturnCallback(this);
        this.template.setMandatoryExpressionString(mandatoryExpression);
    }

    public void setEnableConfirms(boolean enableConfirms) {
        this.enableConfirms = enableConfirms;
        if (enableConfirms) {
            this.template.setConfirmCallback(this);
        }
    }

    public String getBeanName() {
        return this.beanName;
    }

    public void setBeanName(String beanName) {
        this.beanName = beanName;
    }

    public ConnectionFactory getConnectionFactory() {
        return this.template.getConnectionFactory();
    }

    public void setReceiveTimeout(long receiveTimeout) {
        this.receiveTimeout = receiveTimeout;
    }

    public synchronized void setTaskScheduler(TaskScheduler taskScheduler) {
        Assert.notNull((Object)taskScheduler, (String)"'taskScheduler' cannot be null");
        this.internalTaskScheduler = false;
        this.taskScheduler = taskScheduler;
    }

    public MessageConverter getMessageConverter() {
        return this.template.getMessageConverter();
    }

    public RabbitMessageFuture sendAndReceive(Message message) {
        return this.sendAndReceive(this.template.getExchange(), this.template.getRoutingKey(), message);
    }

    public RabbitMessageFuture sendAndReceive(String routingKey, Message message) {
        return this.sendAndReceive(this.template.getExchange(), routingKey, message);
    }

    public RabbitMessageFuture sendAndReceive(String exchange, String routingKey, Message message) {
        String correlationId = this.getOrSetCorrelationIdAndSetReplyTo(message);
        RabbitMessageFuture future = new RabbitMessageFuture(correlationId, message);
        CorrelationData correlationData = null;
        if (this.enableConfirms) {
            correlationData = new CorrelationData(correlationId);
            future.setConfirm((ListenableFuture<Boolean>)new SettableListenableFuture());
        }
        this.pending.put(correlationId, future);
        if (this.container != null) {
            this.template.send(exchange, routingKey, message, correlationData);
        } else {
            DirectReplyToMessageListenerContainer.ChannelHolder channelHolder = this.directReplyToContainer.getChannelHolder();
            future.setChannelHolder(channelHolder);
            this.sendDirect(channelHolder.getChannel(), exchange, routingKey, message, correlationData);
        }
        future.startTimer();
        return future;
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(Object object) {
        return this.convertSendAndReceive(this.template.getExchange(), this.template.getRoutingKey(), object, null);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(String routingKey, Object object) {
        return this.convertSendAndReceive(this.template.getExchange(), routingKey, object, null);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object) {
        return this.convertSendAndReceive(exchange, routingKey, object, null);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(Object object, MessagePostProcessor messagePostProcessor) {
        return this.convertSendAndReceive(this.template.getExchange(), this.template.getRoutingKey(), object, messagePostProcessor);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(String routingKey, Object object, MessagePostProcessor messagePostProcessor) {
        return this.convertSendAndReceive(this.template.getExchange(), routingKey, object, messagePostProcessor);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object, MessagePostProcessor messagePostProcessor) {
        return this.convertSendAndReceive(exchange, routingKey, object, messagePostProcessor, null);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(Object object, ParameterizedTypeReference<C> responseType) {
        return this.convertSendAndReceiveAsType(this.template.getExchange(), this.template.getRoutingKey(), object, (MessagePostProcessor)null, (ParameterizedTypeReference)responseType);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(String routingKey, Object object, ParameterizedTypeReference<C> responseType) {
        return this.convertSendAndReceiveAsType(this.template.getExchange(), routingKey, object, (MessagePostProcessor)null, (ParameterizedTypeReference)responseType);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(String exchange, String routingKey, Object object, ParameterizedTypeReference<C> responseType) {
        return this.convertSendAndReceiveAsType(exchange, routingKey, object, (MessagePostProcessor)null, (ParameterizedTypeReference)responseType);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType) {
        return this.convertSendAndReceiveAsType(this.template.getExchange(), this.template.getRoutingKey(), object, messagePostProcessor, (ParameterizedTypeReference)responseType);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(String routingKey, Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType) {
        return this.convertSendAndReceiveAsType(this.template.getExchange(), routingKey, object, messagePostProcessor, (ParameterizedTypeReference)responseType);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(String exchange, String routingKey, Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType) {
        Assert.state((boolean)(this.template.getMessageConverter() instanceof SmartMessageConverter), (String)"template's message converter must be a SmartMessageConverter");
        return this.convertSendAndReceive(exchange, routingKey, object, messagePostProcessor, responseType);
    }

    private <C> RabbitConverterFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType) {
        AsyncCorrelationData<C> correlationData = new AsyncCorrelationData<C>(messagePostProcessor, responseType, this.enableConfirms);
        if (this.container != null) {
            this.template.convertAndSend(exchange, routingKey, object, this.messagePostProcessor, correlationData);
        } else {
            MessageConverter converter = this.template.getMessageConverter();
            if (converter == null) {
                throw new AmqpIllegalStateException("No 'messageConverter' specified. Check configuration of RabbitTemplate.");
            }
            Message message = converter.toMessage(object, new MessageProperties());
            this.messagePostProcessor.postProcessMessage(message, correlationData);
            DirectReplyToMessageListenerContainer.ChannelHolder channelHolder = this.directReplyToContainer.getChannelHolder();
            ((AsyncCorrelationData)correlationData).future.setChannelHolder(channelHolder);
            this.sendDirect(channelHolder.getChannel(), exchange, routingKey, message, correlationData);
        }
        RabbitConverterFuture future = ((AsyncCorrelationData)correlationData).future;
        future.startTimer();
        return future;
    }

    private void sendDirect(Channel channel, String exchange, String routingKey, Message message, CorrelationData correlationData) {
        message.getMessageProperties().setReplyTo("amq.rabbitmq.reply-to");
        try {
            if (channel instanceof PublisherCallbackChannel) {
                this.template.addListener(channel);
            }
            this.template.doSend(channel, exchange, routingKey, message, this.template.isMandatoryFor(message), correlationData);
        }
        catch (Exception e) {
            throw new AmqpException("Failed to send request", (Throwable)e);
        }
    }

    public synchronized void start() {
        if (!this.running) {
            if (this.internalTaskScheduler) {
                ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
                scheduler.setThreadNamePrefix(this.getBeanName() == null ? "asyncTemplate-" : this.getBeanName() + "-");
                scheduler.afterPropertiesSet();
                this.taskScheduler = scheduler;
            }
            if (this.container != null) {
                this.container.start();
            }
            if (this.directReplyToContainer != null) {
                this.directReplyToContainer.setTaskScheduler(this.taskScheduler);
                this.directReplyToContainer.start();
            }
        }
        this.running = true;
    }

    public synchronized void stop() {
        if (this.running) {
            if (this.container != null) {
                this.container.stop();
            }
            if (this.directReplyToContainer != null) {
                this.directReplyToContainer.stop();
            }
            for (RabbitFuture future : this.pending.values()) {
                future.setNackCause("AsyncRabbitTemplate was stopped while waiting for reply");
                future.cancel(true);
            }
            if (this.internalTaskScheduler) {
                ((ThreadPoolTaskScheduler)this.taskScheduler).destroy();
                this.taskScheduler = null;
            }
        }
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return this.phase;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    @Override
    public void onMessage(Message message, Channel channel) {
        String correlationId;
        MessageProperties messageProperties = message.getMessageProperties();
        if (messageProperties != null && StringUtils.hasText((String)(correlationId = messageProperties.getCorrelationId()))) {
            RabbitFuture future;
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("onMessage: " + message));
            }
            if ((future = (RabbitFuture)((Object)this.pending.remove(correlationId))) != null) {
                if (future instanceof RabbitConverterFuture) {
                    MessageConverter messageConverter = this.template.getMessageConverter();
                    RabbitConverterFuture rabbitFuture = (RabbitConverterFuture)future;
                    Object converted = rabbitFuture.getReturnType() != null && messageConverter instanceof SmartMessageConverter ? ((SmartMessageConverter)messageConverter).fromMessage(message, rabbitFuture.getReturnType()) : messageConverter.fromMessage(message);
                    rabbitFuture.set(converted);
                } else {
                    ((RabbitMessageFuture)future).set(message);
                }
            } else if (this.logger.isWarnEnabled()) {
                this.logger.warn((Object)("No pending reply - perhaps timed out: " + message));
            }
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        MessageProperties messageProperties = message.getMessageProperties();
        String correlationId = messageProperties.getCorrelationId();
        if (StringUtils.hasText((String)correlationId)) {
            RabbitFuture future = (RabbitFuture)((Object)this.pending.remove(correlationId));
            if (future != null) {
                future.setException((Throwable)new AmqpMessageReturnedException("Message returned", message, replyCode, replyText, exchange, routingKey));
            } else if (this.logger.isWarnEnabled()) {
                this.logger.warn((Object)("No pending reply - perhaps timed out? Message returned: " + message));
            }
        }
    }

    @Override
    public void confirm(@NonNull CorrelationData correlationData, boolean ack, @Nullable String cause) {
        String correlationId;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Confirm: " + correlationData + ", ack=" + ack + (cause == null ? "" : ", cause: " + cause)));
        }
        if ((correlationId = correlationData.getId()) != null) {
            RabbitFuture future = (RabbitFuture)((Object)this.pending.get(correlationId));
            if (future != null) {
                future.setNackCause(cause);
                ((SettableListenableFuture)future.getConfirm()).set((Object)ack);
            } else if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Confirm: " + correlationData + ", ack=" + ack + (cause == null ? "" : ", cause: " + cause) + " no pending future - either canceled or the reply is already received"));
            }
        }
    }

    private String getOrSetCorrelationIdAndSetReplyTo(Message message) {
        String correlationId;
        MessageProperties messageProperties = message.getMessageProperties();
        Assert.notNull((Object)messageProperties, (String)"the message properties cannot be null");
        String currentCorrelationId = messageProperties.getCorrelationId();
        if (!StringUtils.hasText((String)currentCorrelationId)) {
            correlationId = UUID.randomUUID().toString();
            messageProperties.setCorrelationId(correlationId);
            Assert.isNull((Object)messageProperties.getReplyTo(), (String)"'replyTo' property must be null");
        } else {
            correlationId = currentCorrelationId;
        }
        messageProperties.setReplyTo(this.replyAddress);
        return correlationId;
    }

    public String toString() {
        return this.beanName == null ? super.toString() : this.getClass().getSimpleName() + ": " + this.beanName;
    }

    private static class AsyncCorrelationData<C>
    extends CorrelationData {
        private final MessagePostProcessor userPostProcessor;
        private final ParameterizedTypeReference<C> returnType;
        private final boolean enableConfirms;
        private volatile RabbitConverterFuture<C> future;

        AsyncCorrelationData(MessagePostProcessor userPostProcessor, ParameterizedTypeReference<C> returnType, boolean enableConfirms) {
            this.userPostProcessor = userPostProcessor;
            this.returnType = returnType;
            this.enableConfirms = enableConfirms;
        }
    }

    private final class CorrelationMessagePostProcessor<C>
    implements MessagePostProcessor {
        CorrelationMessagePostProcessor() {
        }

        public Message postProcessMessage(Message message) throws AmqpException {
            throw new UnsupportedOperationException();
        }

        public Message postProcessMessage(Message message, Correlation correlation) throws AmqpException {
            Message messageToSend = message;
            AsyncCorrelationData correlationData = (AsyncCorrelationData)correlation;
            if (correlationData.userPostProcessor != null) {
                messageToSend = correlationData.userPostProcessor.postProcessMessage(message);
            }
            String correlationId = AsyncRabbitTemplate.this.getOrSetCorrelationIdAndSetReplyTo(messageToSend);
            correlationData.future = new RabbitConverterFuture(correlationId, message);
            if (correlationData.enableConfirms && correlationData.getId() == null) {
                correlationData.setId(correlationId);
                correlationData.future.setConfirm((ListenableFuture<Boolean>)new SettableListenableFuture());
            }
            correlationData.future.setReturnType(correlationData.returnType);
            AsyncRabbitTemplate.this.pending.put(correlationId, correlationData.future);
            return messageToSend;
        }
    }

    public class RabbitConverterFuture<C>
    extends RabbitFuture<C>
    implements ListenableFuture<C> {
        private volatile ParameterizedTypeReference<C> returnType;

        public RabbitConverterFuture(String correlationId, Message requestMessage) {
            super(correlationId, requestMessage);
        }

        public ParameterizedTypeReference<C> getReturnType() {
            return this.returnType;
        }

        public void setReturnType(ParameterizedTypeReference<C> returnType) {
            this.returnType = returnType;
        }
    }

    public class RabbitMessageFuture
    extends RabbitFuture<Message>
    implements ListenableFuture<Message> {
        public RabbitMessageFuture(String correlationId, Message requestMessage) {
            super(correlationId, requestMessage);
        }
    }

    public abstract class RabbitFuture<T>
    extends SettableListenableFuture<T> {
        private final String correlationId;
        private final Message requestMessage;
        private ScheduledFuture<?> timeoutTask;
        private volatile ListenableFuture<Boolean> confirm;
        private String nackCause;
        private DirectReplyToMessageListenerContainer.ChannelHolder channelHolder;

        public RabbitFuture(String correlationId, Message requestMessage) {
            this.correlationId = correlationId;
            this.requestMessage = requestMessage;
        }

        void setChannelHolder(DirectReplyToMessageListenerContainer.ChannelHolder channel) {
            this.channelHolder = channel;
        }

        public boolean cancel(boolean mayInterruptIfRunning) {
            if (this.timeoutTask != null) {
                this.timeoutTask.cancel(true);
            }
            AsyncRabbitTemplate.this.pending.remove(this.correlationId);
            if (this.channelHolder != null && AsyncRabbitTemplate.this.directReplyToContainer != null) {
                AsyncRabbitTemplate.this.directReplyToContainer.releaseConsumerFor(this.channelHolder, false, null);
            }
            return super.cancel(mayInterruptIfRunning);
        }

        public ListenableFuture<Boolean> getConfirm() {
            return this.confirm;
        }

        void setConfirm(ListenableFuture<Boolean> confirm) {
            this.confirm = confirm;
        }

        public String getNackCause() {
            return this.nackCause;
        }

        void setNackCause(String nackCause) {
            this.nackCause = nackCause;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void startTimer() {
            if (AsyncRabbitTemplate.this.receiveTimeout > 0L) {
                AsyncRabbitTemplate asyncRabbitTemplate = AsyncRabbitTemplate.this;
                synchronized (asyncRabbitTemplate) {
                    if (!AsyncRabbitTemplate.this.running) {
                        AsyncRabbitTemplate.this.pending.remove(this.correlationId);
                        throw new IllegalStateException("'AsyncRabbitTemplate' must be started.");
                    }
                    this.timeoutTask = AsyncRabbitTemplate.this.taskScheduler.schedule((Runnable)new TimeoutTask(), new Date(System.currentTimeMillis() + AsyncRabbitTemplate.this.receiveTimeout));
                }
            } else {
                this.timeoutTask = null;
            }
        }

        private class TimeoutTask
        implements Runnable {
            private TimeoutTask() {
            }

            @Override
            public void run() {
                AsyncRabbitTemplate.this.pending.remove(RabbitFuture.this.correlationId);
                if (RabbitFuture.this.channelHolder != null && AsyncRabbitTemplate.this.directReplyToContainer != null) {
                    AsyncRabbitTemplate.this.directReplyToContainer.releaseConsumerFor(RabbitFuture.this.channelHolder, false, null);
                }
                RabbitFuture.this.setException((Throwable)new AmqpReplyTimeoutException("Reply timed out", RabbitFuture.this.requestMessage));
            }
        }
    }
}

