/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.xd.dirt.integration.bus;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.Lifecycle;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.codec.Codec;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.AlternativeJdkIdGenerator;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.IdGenerator;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.StringUtils;
import org.springframework.xd.dirt.integration.bus.AbstractBusPropertiesAccessor;
import org.springframework.xd.dirt.integration.bus.Binding;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.dirt.integration.bus.MessageBusException;
import org.springframework.xd.dirt.integration.bus.MessageValues;
import org.springframework.xd.dirt.integration.bus.PartitionKeyExtractorStrategy;
import org.springframework.xd.dirt.integration.bus.PartitionSelectorStrategy;
import org.springframework.xd.dirt.integration.bus.SerializationException;
import org.springframework.xd.dirt.integration.bus.StringConvertingContentTypeResolver;

public abstract class MessageBusSupport
implements MessageBus,
ApplicationContextAware,
InitializingBean {
    protected static final String P2P_NAMED_CHANNEL_TYPE_PREFIX = "queue:";
    protected static final String TAP_TYPE_PREFIX = "tap:";
    protected static final String PUBSUB_NAMED_CHANNEL_TYPE_PREFIX = "topic:";
    protected static final String JOB_CHANNEL_TYPE_PREFIX = "job:";
    protected static final String PARTITION_HEADER = "partition";
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    private volatile AbstractApplicationContext applicationContext;
    private volatile Codec codec;
    private final StringConvertingContentTypeResolver contentTypeResolver = new StringConvertingContentTypeResolver();
    private final ThreadLocal<Boolean> revertingDirectBinding = new ThreadLocal();
    protected static final List<MimeType> MEDIATYPES_MEDIATYPE_ALL = Collections.singletonList(MimeTypeUtils.ALL);
    private static final int DEFAULT_BACKOFF_INITIAL_INTERVAL = 1000;
    private static final int DEFAULT_BACKOFF_MAX_INTERVAL = 10000;
    private static final double DEFAULT_BACKOFF_MULTIPLIER = 2.0;
    private static final int DEFAULT_CONCURRENCY = 1;
    private static final int DEFAULT_MAX_ATTEMPTS = 3;
    private static final int DEFAULT_BATCH_SIZE = 50;
    private static final int DEFAULT_BATCH_BUFFER_LIMIT = 10000;
    private static final int DEFAULT_BATCH_TIMEOUT = 0;
    protected static final Set<Object> CONSUMER_STANDARD_PROPERTIES = new SetBuilder().add("count").add("sequence").build();
    protected static final Set<Object> PRODUCER_STANDARD_PROPERTIES = new HashSet<String>(Arrays.asList("next.module.count", "next.module.concurrency"));
    protected static final Set<Object> CONSUMER_RETRY_PROPERTIES = new HashSet<String>(Arrays.asList("backOffInitialInterval", "backOffMaxInterval", "backOffMultiplier", "maxAttempts"));
    protected static final Set<Object> PRODUCER_PARTITIONING_PROPERTIES = new HashSet<String>(Arrays.asList("partitionKeyExpression", "partitionKeyExtractorClass", "partitionSelectorClass", "partitionSelectorExpression"));
    protected static final Set<Object> PRODUCER_BATCHING_BASIC_PROPERTIES = new HashSet<String>(Arrays.asList("batchingEnabled", "batchSize", "batchTimeout"));
    protected static final Set<Object> PRODUCER_BATCHING_ADVANCED_PROPERTIES = new HashSet<String>(Arrays.asList("batchBufferLimit"));
    private final List<Binding> bindings = Collections.synchronizedList(new ArrayList());
    private final IdGenerator idGenerator = new AlternativeJdkIdGenerator();
    protected volatile EvaluationContext evaluationContext;
    private volatile PartitionSelectorStrategy partitionSelector = new DefaultPartitionSelector();
    protected final SharedChannelProvider<DirectChannel> directChannelProvider = new SharedChannelProvider<DirectChannel>(DirectChannel.class){

        @Override
        protected DirectChannel createSharedChannel(String name) {
            return new DirectChannel();
        }
    };
    protected volatile long defaultBackOffInitialInterval = 1000L;
    protected volatile long defaultBackOffMaxInterval = 10000L;
    protected volatile double defaultBackOffMultiplier = 2.0;
    protected volatile int defaultConcurrency = 1;
    protected volatile int defaultMaxAttempts = 3;
    protected volatile boolean defaultBatchingEnabled = false;
    protected volatile int defaultBatchSize = 50;
    protected volatile int defaultBatchBufferLimit = 10000;
    protected volatile long defaultBatchTimeout = 0L;
    protected volatile boolean defaultCompress = false;
    protected volatile boolean defaultDurableSubscription = false;
    private volatile Map<String, Class<?>> payloadTypeCache = new ConcurrentHashMap();

    public static String applyPrefix(String prefix, String name) {
        return prefix + name;
    }

    public static String applyPubSub(String name) {
        return "topic." + name;
    }

    public static String applyRequests(String name) {
        return name + ".requests";
    }

    public static String constructDLQName(String name) {
        return name + ".dlq";
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Assert.isInstanceOf(AbstractApplicationContext.class, (Object)applicationContext);
        this.applicationContext = (AbstractApplicationContext)applicationContext;
    }

    protected AbstractApplicationContext getApplicationContext() {
        return this.applicationContext;
    }

    protected ConfigurableListableBeanFactory getBeanFactory() {
        return this.applicationContext.getBeanFactory();
    }

    public void setCodec(Codec codec) {
        this.codec = codec;
    }

    protected IdGenerator getIdGenerator() {
        return this.idGenerator;
    }

    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    public void setPartitionSelector(PartitionSelectorStrategy partitionSelector) {
        this.partitionSelector = partitionSelector;
    }

    public void setDefaultBackOffInitialInterval(long defaultBackOffInitialInterval) {
        this.defaultBackOffInitialInterval = defaultBackOffInitialInterval;
    }

    public void setDefaultBackOffMultiplier(double defaultBackOffMultiplier) {
        this.defaultBackOffMultiplier = defaultBackOffMultiplier;
    }

    public void setDefaultBackOffMaxInterval(long defaultBackOffMaxInterval) {
        this.defaultBackOffMaxInterval = defaultBackOffMaxInterval;
    }

    public void setDefaultConcurrency(int defaultConcurrency) {
        this.defaultConcurrency = defaultConcurrency;
    }

    public void setDefaultMaxAttempts(int defaultMaxAttempts) {
        this.defaultMaxAttempts = defaultMaxAttempts;
    }

    public void setDefaultBatchingEnabled(boolean defaultBatchingEnabled) {
        this.defaultBatchingEnabled = defaultBatchingEnabled;
    }

    public void setDefaultBatchSize(int defaultBatchSize) {
        this.defaultBatchSize = defaultBatchSize;
    }

    public void setDefaultBatchBufferLimit(int defaultBatchBufferLimit) {
        this.defaultBatchBufferLimit = defaultBatchBufferLimit;
    }

    public void setDefaultBatchTimeout(long defaultBatchTimeout) {
        this.defaultBatchTimeout = defaultBatchTimeout;
    }

    public void setDefaultCompress(boolean defaultCompress) {
        this.defaultCompress = defaultCompress;
    }

    public void setDefaultDurableSubscription(boolean defaultDurableSubscription) {
        this.defaultDurableSubscription = defaultDurableSubscription;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull((Object)this.applicationContext, (String)"The 'applicationContext' property cannot be null");
        if (this.evaluationContext == null) {
            this.evaluationContext = IntegrationContextUtils.getEvaluationContext((BeanFactory)this.getBeanFactory());
        }
        this.onInit();
    }

    protected void onInit() {
    }

    @Override
    public MessageChannel bindDynamicProducer(String name, Properties properties) {
        return this.doBindDynamicProducer(name, name, properties);
    }

    protected synchronized MessageChannel doBindDynamicProducer(String name, String channelName, Properties properties) {
        DirectChannel channel = this.directChannelProvider.lookupSharedChannel(channelName);
        if (channel == null) {
            try {
                channel = this.directChannelProvider.createAndRegisterChannel(channelName);
                this.bindProducer(name, (MessageChannel)channel, properties);
            }
            catch (RuntimeException e) {
                this.destroyCreatedChannel(channelName, (MessageChannel)channel);
                throw new MessageBusException("Failed to bind dynamic channel '" + name + "' with properties " + properties, e);
            }
        }
        return channel;
    }

    @Override
    public MessageChannel bindDynamicPubSubProducer(String name, Properties properties) {
        return this.doBindDynamicPubSubProducer(name, name, properties);
    }

    protected synchronized MessageChannel doBindDynamicPubSubProducer(String name, String channelName, Properties properties) {
        DirectChannel channel = this.directChannelProvider.lookupSharedChannel(channelName);
        if (channel == null) {
            try {
                channel = this.directChannelProvider.createAndRegisterChannel(channelName);
                this.bindPubSubProducer(name, (MessageChannel)channel, properties);
            }
            catch (RuntimeException e) {
                this.destroyCreatedChannel(channelName, (MessageChannel)channel);
                throw new MessageBusException("Failed to bind dynamic channel '" + name + "' with properties " + properties, e);
            }
        }
        return channel;
    }

    private void destroyCreatedChannel(String name, MessageChannel channel) {
        ConfigurableListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
        if (beanFactory.containsBean(name) && beanFactory instanceof DefaultListableBeanFactory) {
            ((DefaultListableBeanFactory)beanFactory).destroySingleton(name);
        }
    }

    @Override
    public void unbindConsumers(String name) {
        this.deleteBindings("inbound." + name);
    }

    @Override
    public void unbindProducers(String name) {
        this.deleteBindings("outbound." + name);
    }

    @Override
    public void unbindConsumer(String name, MessageChannel channel) {
        this.deleteBinding("inbound." + name, channel);
    }

    @Override
    public void unbindProducer(String name, MessageChannel channel) {
        this.deleteBinding("outbound." + name, channel);
    }

    @Override
    public boolean isCapable(MessageBus.Capability capability) {
        return false;
    }

    protected void addBinding(Binding binding) {
        this.bindings.add(binding);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void deleteBindings(String name) {
        Assert.hasText((String)name, (String)"a valid name is required to remove bindings");
        ArrayList<Binding> bindingsToRemove = new ArrayList<Binding>();
        List<Binding> list = this.bindings;
        synchronized (list) {
            for (Binding binding : this.bindings) {
                if (!binding.getEndpoint().getComponentName().equals(name)) continue;
                bindingsToRemove.add(binding);
            }
            for (Binding binding : bindingsToRemove) {
                this.doDeleteBinding(binding);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void deleteBinding(String name, MessageChannel channel) {
        Assert.hasText((String)name, (String)"a valid name is required to remove a binding");
        Assert.notNull((Object)channel, (String)"a valid channel is required to remove a binding");
        Binding bindingToRemove = null;
        List<Binding> list = this.bindings;
        synchronized (list) {
            for (Binding binding : this.bindings) {
                if (!binding.getChannel().equals(channel) || !binding.getEndpoint().getComponentName().equals(name)) continue;
                bindingToRemove = binding;
                break;
            }
            if (bindingToRemove != null) {
                this.doDeleteBinding(bindingToRemove);
            }
        }
    }

    private void doDeleteBinding(Binding binding) {
        if ("consumer".equals(binding.getType())) {
            this.revertDirectBindingIfNecessary(binding);
        }
        binding.stop();
        this.bindings.remove(binding);
    }

    protected void stopBindings() {
        for (Lifecycle lifecycle : this.bindings) {
            try {
                lifecycle.stop();
            }
            catch (Exception e) {
                if (!this.logger.isWarnEnabled()) continue;
                this.logger.warn("failed to stop adapter", (Throwable)e);
            }
        }
    }

    protected final MessageValues serializePayloadIfNecessary(Message<?> message) {
        Object originalPayload = message.getPayload();
        Object originalContentType = message.getHeaders().get((Object)"contentType");
        String contentType = JavaClassMimeTypeConversion.mimeTypeFromObject(originalPayload).toString();
        byte[] payload = this.serializePayloadIfNecessary(originalPayload);
        MessageValues messageValues = new MessageValues(message);
        messageValues.setPayload(payload);
        messageValues.put("contentType", (Object)contentType);
        if (originalContentType != null) {
            messageValues.put("originalContentType", originalContentType);
        }
        return messageValues;
    }

    private byte[] serializePayloadIfNecessary(Object originalPayload) {
        if (originalPayload instanceof byte[]) {
            return (byte[])originalPayload;
        }
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try {
            if (originalPayload instanceof String) {
                return ((String)originalPayload).getBytes("UTF-8");
            }
            this.codec.encode(originalPayload, (OutputStream)bos);
            return bos.toByteArray();
        }
        catch (IOException e) {
            throw new SerializationException("unable to serialize payload [" + originalPayload.getClass().getName() + "]", e);
        }
    }

    protected final MessageValues deserializePayloadIfNecessary(Message<?> message) {
        return this.deserializePayloadIfNecessary(new MessageValues(message));
    }

    protected final MessageValues deserializePayloadIfNecessary(MessageValues message) {
        MimeType contentType;
        MessageValues messageToSend = message;
        Object originalPayload = message.getPayload();
        Object payload = this.deserializePayload(originalPayload, contentType = this.contentTypeResolver.resolve(messageToSend));
        if (payload != null) {
            messageToSend.setPayload(payload);
            Object originalContentType = messageToSend.get("originalContentType");
            messageToSend.put("contentType", originalContentType);
            messageToSend.put("originalContentType", (Object)null);
        }
        return messageToSend;
    }

    private Object deserializePayload(Object payload, MimeType contentType) {
        if (payload instanceof byte[]) {
            if (contentType == null || MimeTypeUtils.APPLICATION_OCTET_STREAM.equals((Object)contentType)) {
                return payload;
            }
            return this.deserializePayload((byte[])payload, contentType);
        }
        return payload;
    }

    private Object deserializePayload(byte[] bytes, MimeType contentType) {
        if (MimeTypeUtils.TEXT_PLAIN.equals((Object)contentType)) {
            try {
                return new String(bytes, "UTF-8");
            }
            catch (UnsupportedEncodingException e) {
                throw new SerializationException("unable to deserialize [java.lang.String]. Encoding not supported.", e);
            }
        }
        String className = JavaClassMimeTypeConversion.classNameFromMimeType(contentType);
        try {
            Class targetType = this.payloadTypeCache.get(className);
            if (targetType == null) {
                targetType = ClassUtils.forName((String)className, null);
                this.payloadTypeCache.put(className, targetType);
            }
            return this.codec.decode(bytes, targetType);
        }
        catch (ClassNotFoundException e) {
            throw new SerializationException("unable to deserialize [" + className + "]. Class not found.", e);
        }
        catch (IOException e) {
            throw new SerializationException("unable to deserialize [" + className + "]", e);
        }
    }

    protected int determinePartition(Message<?> message, PartitioningMetadata meta) {
        Object key = null;
        if (StringUtils.hasText((String)meta.partitionKeyExtractorClass)) {
            key = this.invokeExtractor(meta.partitionKeyExtractorClass, message);
        } else if (meta.partitionKeyExpression != null) {
            key = meta.partitionKeyExpression.getValue(this.evaluationContext, message);
        }
        Assert.notNull((Object)key, (String)"Partition key cannot be null");
        int partition = StringUtils.hasText((String)meta.partitionSelectorClass) ? this.invokePartitionSelector(meta.partitionSelectorClass, key, meta.partitionCount) : (meta.partitionSelectorExpression != null ? ((Integer)meta.partitionSelectorExpression.getValue(this.evaluationContext, key, Integer.class)).intValue() : this.partitionSelector.selectPartition(key, meta.partitionCount));
        if ((partition %= meta.partitionCount) < 0) {
            partition = Math.abs(partition);
        }
        return partition;
    }

    private Object invokeExtractor(String partitionKeyExtractorClassName, Message<?> message) {
        Class clazz;
        if (this.applicationContext.containsBean(partitionKeyExtractorClassName)) {
            return ((PartitionKeyExtractorStrategy)this.applicationContext.getBean(partitionKeyExtractorClassName, PartitionKeyExtractorStrategy.class)).extractKey(message);
        }
        try {
            clazz = ClassUtils.forName((String)partitionKeyExtractorClassName, (ClassLoader)this.applicationContext.getClassLoader());
        }
        catch (Exception e) {
            this.logger.error("Failed to load key extractor", (Throwable)e);
            throw new MessageBusException("Failed to load key extractor: " + partitionKeyExtractorClassName, e);
        }
        try {
            Object extractor = clazz.newInstance();
            Assert.isInstanceOf(PartitionKeyExtractorStrategy.class, extractor);
            this.applicationContext.getBeanFactory().registerSingleton(partitionKeyExtractorClassName, extractor);
            this.applicationContext.getBeanFactory().initializeBean(extractor, partitionKeyExtractorClassName);
            return ((PartitionKeyExtractorStrategy)extractor).extractKey(message);
        }
        catch (Exception e) {
            this.logger.error("Failed to instantiate key extractor", (Throwable)e);
            throw new MessageBusException("Failed to instantiate key extractor: " + partitionKeyExtractorClassName, e);
        }
    }

    private int invokePartitionSelector(String partitionSelectorClassName, Object key, int partitionCount) {
        Class clazz;
        if (this.applicationContext.containsBean(partitionSelectorClassName)) {
            return ((PartitionSelectorStrategy)this.applicationContext.getBean(partitionSelectorClassName, PartitionSelectorStrategy.class)).selectPartition(key, partitionCount);
        }
        try {
            clazz = ClassUtils.forName((String)partitionSelectorClassName, (ClassLoader)this.applicationContext.getClassLoader());
        }
        catch (Exception e) {
            this.logger.error("Failed to load partition selector", (Throwable)e);
            throw new MessageBusException("Failed to load partition selector: " + partitionSelectorClassName, e);
        }
        try {
            Object extractor = clazz.newInstance();
            Assert.isInstanceOf(PartitionKeyExtractorStrategy.class, extractor);
            this.applicationContext.getBeanFactory().registerSingleton(partitionSelectorClassName, extractor);
            this.applicationContext.getBeanFactory().initializeBean(extractor, partitionSelectorClassName);
            return ((PartitionSelectorStrategy)extractor).selectPartition(key, partitionCount);
        }
        catch (Exception e) {
            this.logger.error("Failed to instantiate partition selector", (Throwable)e);
            throw new MessageBusException("Failed to instantiate partition selector: " + partitionSelectorClassName, e);
        }
    }

    protected void validateConsumerProperties(String name, Properties properties, Set<Object> supported) {
        if (properties != null) {
            this.validateProperties(name, properties, supported, "consumer");
        }
    }

    protected void validateProducerProperties(String name, Properties properties, Set<Object> supported) {
        if (properties != null) {
            this.validateProperties(name, properties, supported, "producer");
            this.validatePartitioning(name, properties);
        }
    }

    private void validateProperties(String name, Properties properties, Set<Object> supported, String type) {
        StringBuilder builder = new StringBuilder();
        int errors = 0;
        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
            if (supported.contains(entry.getKey())) continue;
            builder.append(entry.getKey()).append(",");
            ++errors;
        }
        if (errors > 0) {
            throw new IllegalArgumentException(this.getClass().getSimpleName() + " does not support " + type + " propert" + (errors == 1 ? "y: " : "ies: ") + builder.substring(0, builder.length() - 1) + " for " + name + ".");
        }
    }

    private void validatePartitioning(String name, Properties properties) {
        if (!this.isCapable(MessageBus.Capability.NATIVE_PARTITIONING) && (StringUtils.hasText((String)properties.getProperty("partitionKeyExpression")) || StringUtils.hasText((String)properties.getProperty("partitionKeyExtractorClass")))) {
            String nextModuleCount = properties.getProperty("next.module.count");
            Assert.hasText((String)nextModuleCount, (String)String.format(this.getClass().getSimpleName() + " requires partitioned data to be sent to a module " + "having 'count' > 1 for '%s'", name));
            try {
                Assert.isTrue((Integer.parseInt(nextModuleCount) > 1 ? 1 : 0) != 0, (String)String.format(this.getClass().getSimpleName() + " requires that module '%s' sends partitioned data to a" + " module having 'count' > 1", name));
            }
            catch (NumberFormatException e) {
                throw new IllegalArgumentException(String.format("Property '%s' for module '%s' does not contain a valid integer, current value is '%s'", "next.module.count", name, nextModuleCount));
            }
        }
    }

    protected String buildPartitionRoutingExpression(String expressionRoot) {
        return "'" + expressionRoot + "-' + headers['" + PARTITION_HEADER + "']";
    }

    protected RetryTemplate buildRetryTemplateIfRetryEnabled(AbstractBusPropertiesAccessor properties) {
        int maxAttempts = properties.getMaxAttempts(this.defaultMaxAttempts);
        if (maxAttempts > 1) {
            RetryTemplate template = new RetryTemplate();
            SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
            retryPolicy.setMaxAttempts(maxAttempts);
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(properties.getBackOffInitialInterval(this.defaultBackOffInitialInterval));
            backOffPolicy.setMultiplier(properties.getBackOffMultiplier(this.defaultBackOffMultiplier));
            backOffPolicy.setMaxInterval(properties.getBackOffMaxInterval(this.defaultBackOffMaxInterval));
            template.setRetryPolicy((RetryPolicy)retryPolicy);
            template.setBackOffPolicy((BackOffPolicy)backOffPolicy);
            return template;
        }
        return null;
    }

    protected boolean isNamedChannel(String name) {
        return name.startsWith(PUBSUB_NAMED_CHANNEL_TYPE_PREFIX) || name.startsWith(P2P_NAMED_CHANNEL_TYPE_PREFIX) || name.startsWith(JOB_CHANNEL_TYPE_PREFIX);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean bindNewProducerDirectlyIfPossible(String name, SubscribableChannel moduleOutputChannel, AbstractBusPropertiesAccessor properties) {
        if (!properties.isDirectBindingAllowed()) {
            return false;
        }
        if (this.isNamedChannel(name)) {
            return false;
        }
        if (this.revertingDirectBinding.get() != null) {
            this.revertingDirectBinding.remove();
            return false;
        }
        Binding consumerBinding = null;
        List<Binding> list = this.bindings;
        synchronized (list) {
            for (Binding binding : this.bindings) {
                if (!binding.getName().equals(name) || !"consumer".equals(binding.getType())) continue;
                consumerBinding = binding;
                break;
            }
        }
        if (consumerBinding == null) {
            return false;
        }
        this.bindProducerDirectly(name, moduleOutputChannel, consumerBinding.getChannel(), properties);
        return true;
    }

    private void bindProducerDirectly(String name, SubscribableChannel producerChannel, MessageChannel consumerChannel, AbstractBusPropertiesAccessor properties) {
        DirectHandler handler = new DirectHandler(consumerChannel);
        EventDrivenConsumer consumer = new EventDrivenConsumer(producerChannel, (MessageHandler)handler);
        consumer.setBeanFactory((BeanFactory)this.getBeanFactory());
        consumer.setBeanName("outbound." + name);
        consumer.afterPropertiesSet();
        Binding binding = Binding.forDirectProducer(name, (MessageChannel)producerChannel, (AbstractEndpoint)consumer, properties);
        this.addBinding(binding);
        binding.start();
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Producer bound directly: " + binding);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void bindExistingProducerDirectlyIfPossible(String name, MessageChannel consumerChannel) {
        if (!this.isNamedChannel(name)) {
            Binding producerBinding = null;
            List<Binding> list = this.bindings;
            synchronized (list) {
                AbstractBusPropertiesAccessor properties;
                for (Binding binding : this.bindings) {
                    if (!binding.getName().equals(name) || !"producer".equals(binding.getType())) continue;
                    producerBinding = binding;
                    break;
                }
                if (producerBinding != null && producerBinding.getChannel() instanceof SubscribableChannel && (properties = producerBinding.getPropertiesAccessor()).isDirectBindingAllowed()) {
                    this.bindProducerDirectly(name, (SubscribableChannel)producerBinding.getChannel(), consumerChannel, properties);
                    producerBinding.stop();
                    this.bindings.remove(producerBinding);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void revertDirectBindingIfNecessary(Binding binding) {
        try {
            List<Binding> list = this.bindings;
            synchronized (list) {
                Binding directBinding = null;
                for (Binding producer : this.bindings) {
                    if (!"direct".equals(producer.getType()) || !binding.getName().equals(producer.getName())) continue;
                    this.revertingDirectBinding.set(Boolean.TRUE);
                    this.bindProducer(producer.getName(), producer.getChannel(), producer.getPropertiesAccessor().getProperties());
                    directBinding = producer;
                    break;
                }
                if (directBinding != null) {
                    directBinding.stop();
                    this.bindings.remove(directBinding);
                    if (this.logger.isInfoEnabled()) {
                        this.logger.info("direct binding reverted: " + directBinding);
                    }
                }
            }
        }
        catch (Exception e) {
            this.logger.error("Could not revert direct binding: " + binding, (Throwable)e);
        }
    }

    public void doManualAck(LinkedList<MessageHeaders> messageHeaders) {
    }

    public static class DirectHandler
    implements MessageHandler {
        private final MessageChannel outputChannel;

        public DirectHandler(MessageChannel outputChannel) {
            this.outputChannel = outputChannel;
        }

        public void handleMessage(Message<?> message) throws MessagingException {
            this.outputChannel.send(message);
        }
    }

    public static class SetBuilder {
        private final Set<Object> set = new HashSet<Object>();

        public SetBuilder add(Object o) {
            this.set.add(o);
            return this;
        }

        public SetBuilder addAll(Set<Object> set) {
            this.set.addAll(set);
            return this;
        }

        public Set<Object> build() {
            return this.set;
        }
    }

    static abstract class JavaClassMimeTypeConversion {
        public static final MimeType APPLICATION_OCTET_STREAM_MIME_TYPE = MimeType.valueOf((String)"application/octet-stream");
        public static final MimeType TEXT_PLAIN_MIME_TYPE = MimeType.valueOf((String)"text/plain");
        private static ConcurrentMap<String, MimeType> mimeTypesCache = new ConcurrentHashMap<String, MimeType>();

        JavaClassMimeTypeConversion() {
        }

        static MimeType mimeTypeFromObject(Object obj) {
            Assert.notNull((Object)obj, (String)"object cannot be null.");
            if (obj instanceof byte[]) {
                return APPLICATION_OCTET_STREAM_MIME_TYPE;
            }
            if (obj instanceof String) {
                return TEXT_PLAIN_MIME_TYPE;
            }
            String className = obj.getClass().getName();
            MimeType mimeType = (MimeType)mimeTypesCache.get(className);
            if (mimeType == null) {
                String modifiedClassName = className;
                if (obj.getClass().isArray()) {
                    if (modifiedClassName.endsWith(";")) {
                        modifiedClassName = modifiedClassName.substring(0, modifiedClassName.length() - 1);
                    }
                    modifiedClassName = "\"" + modifiedClassName + "\"";
                }
                mimeType = MimeType.valueOf((String)("application/x-java-object;type=" + modifiedClassName));
                mimeTypesCache.put(className, mimeType);
            }
            return mimeType;
        }

        static String classNameFromMimeType(MimeType mimeType) {
            Assert.notNull((Object)mimeType, (String)"mimeType cannot be null.");
            String className = mimeType.getParameter("type");
            if (className == null) {
                return null;
            }
            if ((className = className.replace("\"", "")).contains("[L")) {
                className = className + ";";
            }
            return className;
        }
    }

    protected abstract class SharedChannelProvider<T extends MessageChannel> {
        private final Class<T> requiredType;

        protected SharedChannelProvider(Class<T> clazz) {
            this.requiredType = clazz;
        }

        public final synchronized T lookupOrCreateSharedChannel(String name) {
            T channel = this.lookupSharedChannel(name);
            if (channel == null) {
                channel = this.createAndRegisterChannel(name);
            }
            return channel;
        }

        public T createAndRegisterChannel(String name) {
            Object channel = this.createSharedChannel(name);
            ConfigurableListableBeanFactory beanFactory = MessageBusSupport.this.applicationContext.getBeanFactory();
            beanFactory.registerSingleton(name, channel);
            channel = (MessageChannel)beanFactory.initializeBean(channel, name);
            if (MessageBusSupport.this.logger.isDebugEnabled()) {
                MessageBusSupport.this.logger.debug("Registered channel:" + name);
            }
            return channel;
        }

        protected abstract T createSharedChannel(String var1);

        public T lookupSharedChannel(String name) {
            MessageChannel channel = null;
            if (MessageBusSupport.this.applicationContext.containsBean(name)) {
                try {
                    channel = (MessageChannel)MessageBusSupport.this.applicationContext.getBean(name, this.requiredType);
                }
                catch (Exception e) {
                    throw new IllegalArgumentException("bean '" + name + "' is already registered but does not match the required type");
                }
            }
            return (T)channel;
        }
    }

    protected static class PartitioningMetadata {
        private final String partitionKeyExtractorClass;
        private final Expression partitionKeyExpression;
        private final String partitionSelectorClass;
        private final Expression partitionSelectorExpression;
        private final int partitionCount;

        public PartitioningMetadata(AbstractBusPropertiesAccessor properties, int partitionCount) {
            this.partitionCount = partitionCount;
            this.partitionKeyExtractorClass = properties.getPartitionKeyExtractorClass();
            this.partitionKeyExpression = properties.getPartitionKeyExpression();
            this.partitionSelectorClass = properties.getPartitionSelectorClass();
            this.partitionSelectorExpression = properties.getPartitionSelectorExpression();
        }

        public boolean isPartitionedModule() {
            return StringUtils.hasText((String)this.partitionKeyExtractorClass) || this.partitionKeyExpression != null;
        }

        public int getPartitionCount() {
            return this.partitionCount;
        }
    }

    private class DefaultPartitionSelector
    implements PartitionSelectorStrategy {
        private DefaultPartitionSelector() {
        }

        @Override
        public int selectPartition(Object key, int partitionCount) {
            int hashCode = key.hashCode();
            if (hashCode == Integer.MIN_VALUE) {
                hashCode = 0;
            }
            return Math.abs(hashCode);
        }
    }
}

