package org.springframework.xd.dirt.plugins;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ThreadUtils;
import org.springframework.integration.channel.ChannelInterceptorAware;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.interceptor.WireTap;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.util.Assert;
import org.springframework.xd.dirt.integration.bus.BusUtils;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.dirt.server.container.ContainerServerApplication;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener;
import org.springframework.xd.dirt.zookeeper.ZooKeeperUtils;
import org.springframework.xd.module.ModuleType;
import org.springframework.xd.module.core.Module;

/* loaded from: input_file:org/springframework/xd/dirt/plugins/AbstractMessageBusBinderPlugin.class */
public abstract class AbstractMessageBusBinderPlugin extends AbstractPlugin {
    protected static final String MODULE_INPUT_CHANNEL = "input";
    protected static final String MODULE_OUTPUT_CHANNEL = "output";
    protected static final String JOB_CHANNEL_PREFIX = "job:";
    protected final MessageBus messageBus;
    private volatile PathChildrenCache taps;
    private final TapListener tapListener;
    private final Map<String, MessageChannel> tappableChannels;

    /* renamed from: org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin$2, reason: invalid class name */
    /* loaded from: input_file:org/springframework/xd/dirt/plugins/AbstractMessageBusBinderPlugin$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.INITIALIZED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/springframework/xd/dirt/plugins/AbstractMessageBusBinderPlugin$TapLifecycleConnectionListener.class */
    class TapLifecycleConnectionListener implements ZooKeeperConnectionListener {
        TapLifecycleConnectionListener() {
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onDisconnect(CuratorFramework curatorFramework) {
            AbstractMessageBusBinderPlugin.this.taps.getListenable().removeListener(AbstractMessageBusBinderPlugin.this.tapListener);
            try {
                AbstractMessageBusBinderPlugin.this.taps.close();
            } catch (Exception e) {
                throw ZooKeeperUtils.wrapThrowable(e);
            }
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onSuspend(CuratorFramework curatorFramework) {
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onConnect(CuratorFramework curatorFramework) {
            AbstractMessageBusBinderPlugin.this.startTapListener(curatorFramework);
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onResume(CuratorFramework curatorFramework) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/xd/dirt/plugins/AbstractMessageBusBinderPlugin$TapListener.class */
    public class TapListener implements PathChildrenCacheListener {
        TapListener() {
        }

        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
            ZooKeeperUtils.logCacheEvent(AbstractMessageBusBinderPlugin.this.logger, pathChildrenCacheEvent);
            switch (AnonymousClass2.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                case 1:
                default:
                    return;
                case 2:
                    AbstractMessageBusBinderPlugin.this.onTapAdded(pathChildrenCacheEvent.getData());
                    return;
                case ZooKeeperConnection.DEFAULT_MAX_RETRY_ATTEMPTS /* 3 */:
                    AbstractMessageBusBinderPlugin.this.onTapRemoved(pathChildrenCacheEvent.getData());
                    return;
            }
        }
    }

    public AbstractMessageBusBinderPlugin(MessageBus messageBus) {
        this(messageBus, null);
    }

    public AbstractMessageBusBinderPlugin(MessageBus messageBus, ZooKeeperConnection zooKeeperConnection) {
        this.tapListener = new TapListener();
        this.tappableChannels = new HashMap();
        Assert.notNull(messageBus, "MessageBus must not be null.");
        this.messageBus = messageBus;
        if (zooKeeperConnection != null) {
            if (zooKeeperConnection.isConnected()) {
                startTapListener(zooKeeperConnection.getClient());
            }
            zooKeeperConnection.addListener(new TapLifecycleConnectionListener());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startTapListener(CuratorFramework curatorFramework) {
        String build = Paths.build(Paths.TAPS);
        Paths.ensurePath(curatorFramework, build);
        this.taps = new PathChildrenCache(curatorFramework, build, true, ThreadUtils.newThreadFactory("TapsPathChildrenCache"));
        this.taps.getListenable().addListener(this.tapListener);
        try {
            this.taps.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        } catch (Exception e) {
            throw ZooKeeperUtils.wrapThrowable(e, "failed to start TapListener");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void bindConsumerAndProducers(Module module) {
        boolean trackHistory = module.getDeploymentProperties() != null ? module.getDeploymentProperties().getTrackHistory() : false;
        Properties[] extractConsumerProducerProperties = extractConsumerProducerProperties(module);
        Map<String, Object> map = null;
        if (trackHistory) {
            map = extractHistoryProperties(module);
            addHistoryTag(module, map);
        }
        MessageChannel messageChannel = (MessageChannel) module.getComponent(MODULE_OUTPUT_CHANNEL, MessageChannel.class);
        if (messageChannel != null) {
            bindMessageProducer(messageChannel, getOutputChannelName(module), extractConsumerProducerProperties[1]);
            String buildTapChannelName = buildTapChannelName(module);
            this.tappableChannels.put(buildTapChannelName, messageChannel);
            if (isTapActive(buildTapChannelName)) {
                createAndBindTapChannel(buildTapChannelName, messageChannel);
            }
            if (trackHistory) {
                track(module, messageChannel, map);
            }
        }
        MessageChannel messageChannel2 = (MessageChannel) module.getComponent(MODULE_INPUT_CHANNEL, MessageChannel.class);
        if (messageChannel2 != null) {
            bindMessageConsumer(messageChannel2, getInputChannelName(module), module.getDescriptor().getGroup(), extractConsumerProducerProperties[0]);
            if (trackHistory && module.getType().equals(ModuleType.sink)) {
                track(module, messageChannel2, map);
            }
        }
    }

    private void addHistoryTag(Module module, Map<String, Object> map) {
        String moduleLabel = module.getDescriptor().getModuleLabel();
        if (module.getDescriptor().getSinkChannelName() != null) {
            moduleLabel = moduleLabel + ">" + module.getDescriptor().getSinkChannelName();
        }
        if (module.getDescriptor().getSourceChannelName() != null) {
            moduleLabel = module.getDescriptor().getSourceChannelName() + ">" + moduleLabel;
        }
        map.put("module", moduleLabel);
    }

    private void track(Module module, MessageChannel messageChannel, final Map<String, Object> map) {
        final DefaultMessageBuilderFactory defaultMessageBuilderFactory = module.getComponent("messageBuilderFactory", MessageBuilderFactory.class) == null ? new DefaultMessageBuilderFactory() : (MessageBuilderFactory) module.getComponent("messageBuilderFactory", MessageBuilderFactory.class);
        if (messageChannel instanceof ChannelInterceptorAware) {
            ((ChannelInterceptorAware) messageChannel).addInterceptor(new ChannelInterceptorAdapter() { // from class: org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.1
                public Message<?> preSend(Message<?> message, MessageChannel messageChannel2) {
                    Collection collection = (Collection) message.getHeaders().get("xdHistory");
                    ArrayList arrayList = collection == null ? new ArrayList(1) : new ArrayList(collection);
                    LinkedHashMap linkedHashMap = new LinkedHashMap();
                    linkedHashMap.putAll(map);
                    linkedHashMap.put("thread", Thread.currentThread().getName());
                    arrayList.add(linkedHashMap);
                    Message<?> build = defaultMessageBuilderFactory.fromMessage(message).setHeader("xdHistory", arrayList).build();
                    linkedHashMap.put("timestamp", build.getHeaders().getTimestamp());
                    return build;
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Properties[] extractConsumerProducerProperties(Module module) {
        Properties properties = new Properties();
        Properties properties2 = new Properties();
        if (module.getDeploymentProperties() != null) {
            for (Map.Entry entry : module.getDeploymentProperties().entrySet()) {
                if (((String) entry.getKey()).startsWith("consumer.")) {
                    properties.put(((String) entry.getKey()).substring("consumer.".length()), entry.getValue());
                } else if (((String) entry.getKey()).startsWith("producer.")) {
                    properties2.put(((String) entry.getKey()).substring("producer.".length()), entry.getValue());
                }
            }
        }
        return new Properties[]{properties, properties2};
    }

    protected final Map<String, Object> extractHistoryProperties(Module module) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        if (module.getProperties() != null) {
            for (Map.Entry entry : module.getProperties().entrySet()) {
                if (entry.getKey() instanceof String) {
                    String str = (String) entry.getKey();
                    if (str.startsWith(ContainerServerApplication.CONTAINER_ATTRIBUTES_PREFIX)) {
                        String substring = str.substring(ContainerServerApplication.CONTAINER_ATTRIBUTES_PREFIX.length());
                        if (substring.equals("id")) {
                            substring = "container.id";
                        }
                        linkedHashMap.put(substring, entry.getValue());
                    } else if (str.equals("xd.stream.name")) {
                        linkedHashMap.put(str.substring(3), entry.getValue());
                    }
                }
            }
        }
        return linkedHashMap;
    }

    @Override // org.springframework.xd.dirt.plugins.AbstractPlugin
    public void beforeShutdown(Module module) {
        unbindConsumer(module);
    }

    @Override // org.springframework.xd.dirt.plugins.AbstractPlugin
    public void removeModule(Module module) {
        super.removeModule(module);
        unbindProducers(module);
    }

    protected abstract String getInputChannelName(Module module);

    protected abstract String getOutputChannelName(Module module);

    protected abstract String buildTapChannelName(Module module);

    private void bindMessageConsumer(MessageChannel messageChannel, String str, String str2, Properties properties) {
        if (!BusUtils.isChannelPubSub(str)) {
            this.messageBus.bindConsumer(str, messageChannel, properties);
            return;
        }
        String str3 = str;
        if (this.messageBus.isCapable(MessageBus.Capability.DURABLE_PUBSUB)) {
            str3 = BusUtils.addGroupToPubSub(str2, str);
        }
        this.messageBus.bindPubSubConsumer(str3, messageChannel, properties);
    }

    private void bindMessageProducer(MessageChannel messageChannel, String str, Properties properties) {
        if (BusUtils.isChannelPubSub(str)) {
            this.messageBus.bindPubSubProducer(str, messageChannel, properties);
        } else {
            this.messageBus.bindProducer(str, messageChannel, properties);
        }
    }

    private void createAndBindTapChannel(String str, MessageChannel messageChannel) {
        this.logger.info("creating and binding tap channel for {}", str);
        if (!(messageChannel instanceof ChannelInterceptorAware)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("output channel is not interceptor aware. Tap will not be created.");
            }
        } else {
            DirectChannel directChannel = new DirectChannel();
            directChannel.setBeanName(str + ".tap.bridge");
            this.messageBus.bindPubSubProducer(str, directChannel, (Properties) null);
            tapOutputChannel(directChannel, (ChannelInterceptorAware) messageChannel);
        }
    }

    private MessageChannel tapOutputChannel(MessageChannel messageChannel, ChannelInterceptorAware channelInterceptorAware) {
        channelInterceptorAware.addInterceptor(new WireTap(messageChannel));
        return messageChannel;
    }

    protected void unbindConsumer(Module module) {
        MessageChannel messageChannel = (MessageChannel) module.getComponent(MODULE_INPUT_CHANNEL, MessageChannel.class);
        if (messageChannel != null) {
            String inputChannelName = getInputChannelName(module);
            if (this.messageBus.isCapable(MessageBus.Capability.DURABLE_PUBSUB)) {
                inputChannelName = BusUtils.addGroupToPubSub(module.getDescriptor().getGroup(), inputChannelName);
            }
            this.messageBus.unbindConsumer(inputChannelName, messageChannel);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Unbound consumer for " + module.toString());
            }
        }
    }

    protected void unbindProducers(Module module) {
        MessageChannel messageChannel = (MessageChannel) module.getComponent(MODULE_OUTPUT_CHANNEL, MessageChannel.class);
        if (messageChannel != null) {
            this.messageBus.unbindProducer(getOutputChannelName(module), messageChannel);
            String buildTapChannelName = buildTapChannelName(module);
            unbindTapChannel(buildTapChannelName);
            this.tappableChannels.remove(buildTapChannelName);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Unbound producer(s) for " + module.toString());
            }
        }
    }

    private void unbindTapChannel(String str) {
        ChannelInterceptorAware channelInterceptorAware = (MessageChannel) this.tappableChannels.get(str);
        if (channelInterceptorAware instanceof ChannelInterceptorAware) {
            ChannelInterceptorAware channelInterceptorAware2 = channelInterceptorAware;
            ArrayList arrayList = new ArrayList();
            for (WireTap wireTap : channelInterceptorAware2.getChannelInterceptors()) {
                if (wireTap instanceof WireTap) {
                    wireTap.stop();
                } else {
                    arrayList.add(wireTap);
                }
            }
            channelInterceptorAware2.setInterceptors(arrayList);
            this.messageBus.unbindProducers(str);
        }
    }

    @Override // org.springframework.xd.dirt.plugins.AbstractPlugin
    public int getOrder() {
        return 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onTapAdded(ChildData childData) {
        String buildTapChannelNameFromPath = buildTapChannelNameFromPath(childData.getPath());
        MessageChannel messageChannel = this.tappableChannels.get(buildTapChannelNameFromPath);
        if (messageChannel != null) {
            createAndBindTapChannel(buildTapChannelNameFromPath, messageChannel);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onTapRemoved(ChildData childData) {
        unbindTapChannel(buildTapChannelNameFromPath(childData.getPath()));
    }

    private boolean isTapActive(String str) {
        Assert.state(this.taps != null, "tap cache not started");
        Iterator it = this.taps.getCurrentData().iterator();
        while (it.hasNext()) {
            if (buildTapChannelNameFromPath(((ChildData) it.next()).getPath()).equals(str)) {
                return true;
            }
        }
        return false;
    }

    private String buildTapChannelNameFromPath(String str) {
        return "tap:" + Paths.stripPath(str);
    }
}
