package com.atlassian.jira.cluster;

import com.atlassian.event.api.EventPublisher;
import com.atlassian.jira.util.concurrent.ThreadFactories;
import com.atlassian.jira.util.dbc.Assertions;
import com.atlassian.jira.util.thread.JiraThreadLocalUtils;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/atlassian/jira/cluster/OfBizMessageHandlerService.class */
public class OfBizMessageHandlerService implements MessageHandlerService {
    private static final int INITIAL_DELAY = 3;
    private static final int PERIOD = 3;
    private static final int CHANNEL_MAX_LENGTH = 20;
    private static final int MESSAGE_MAX_LENGTH = 200;
    private static final Logger log = LoggerFactory.getLogger(OfBizMessageHandlerService.class);
    private final ClusterNodes clusterNodes;
    private final OfBizClusterMessageStore clusterMessageStore;
    private final EventMessageConsumer eventMessageConsumer;

    @Nullable
    private volatile ScheduledFuture<?> messageHandlerService;
    private final Map<String, Long> lastMessageProcessedByNodeId = new HashMap();
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, ThreadFactories.namedThreadFactory("ClusterMessageHandlerServiceThread"));
    private final Map<String, List<WeakReference<ClusterMessageConsumer>>> listeners = new HashMap();

    public OfBizMessageHandlerService(ClusterNodes clusterNodes, OfBizClusterMessageStore ofBizClusterMessageStore, EventPublisher eventPublisher) {
        this.clusterMessageStore = ofBizClusterMessageStore;
        this.clusterNodes = clusterNodes;
        this.eventMessageConsumer = new EventMessageConsumer(eventPublisher);
        registerListener(EventMessageConsumer.IMPORT_STARTED, this.eventMessageConsumer);
        registerListener(EventMessageConsumer.IMPORT_FINISHED, this.eventMessageConsumer);
        if (getCurrentNode().isClustered()) {
            for (Node node : clusterNodes.all()) {
                if (node.isClustered()) {
                    this.lastMessageProcessedByNodeId.put(node.getNodeId(), ofBizClusterMessageStore.getLatestMessageByNodeId(node.getNodeId()));
                }
            }
        }
    }

    @Override // com.atlassian.jira.cluster.MessageHandlerService
    @Nullable
    public ClusterMessage sendMessage(String str, Message message) {
        ClusterMessage clusterMessage = null;
        if (getCurrentNode().isClustered()) {
            clusterMessage = this.clusterMessageStore.createMessage(getCurrentNode().getNodeId(), str, message.toString());
        }
        return clusterMessage;
    }

    @Override // com.atlassian.jira.cluster.MessageHandlerService
    public List<ClusterMessage> receiveMessages() {
        ArrayList arrayList = new ArrayList();
        Node currentNode = getCurrentNode();
        if (currentNode.isClustered()) {
            for (Node node : this.clusterNodes.all()) {
                if (node.isClustered() && !node.getNodeId().equals(currentNode.getNodeId())) {
                    arrayList.addAll(this.clusterMessageStore.getMessages(node, currentNode, this.lastMessageProcessedByNodeId.get(node.getNodeId())));
                }
            }
        }
        return arrayList;
    }

    @Override // com.atlassian.jira.cluster.MessageHandlerService
    public void start() {
        this.messageHandlerService = this.scheduler.scheduleAtFixedRate(JiraThreadLocalUtils.wrap(this::handleReceivedMessages), 3L, 3L, TimeUnit.SECONDS);
    }

    @Override // com.atlassian.jira.cluster.MessageHandlerService
    public void stop() {
        if (this.messageHandlerService != null) {
            this.messageHandlerService.cancel(false);
        }
        this.scheduler.shutdown();
    }

    private Node getCurrentNode() {
        return this.clusterNodes.current();
    }

    private void handleReceivedMessages() {
        try {
            String nodeId = getCurrentNode().getNodeId();
            Iterator<ClusterMessage> it = receiveMessages().iterator();
            while (it.hasNext()) {
                tryHandleMessage(nodeId, it.next());
            }
        } catch (Throwable th) {
            log.error("There was a problem handling cluster messages", th);
        }
    }

    private void tryHandleMessage(String str, ClusterMessage clusterMessage) {
        Long id = clusterMessage.getId();
        String sourceNode = clusterMessage.getSourceNode();
        String channel = clusterMessage.getMessage().getChannel();
        String supplementalInformation = clusterMessage.getMessage().getSupplementalInformation();
        log.debug("Starting handling cluster message id: `{}`, current node: `{}`, source node: `{}`, channel: `{}`, supplemental information: `{}`", new Object[]{id, str, sourceNode, channel, supplementalInformation});
        try {
            try {
                sendLocalFromNode(channel, supplementalInformation, sourceNode);
                log.debug("Finished handling cluster message id: `{}`, current node: `{}`, source node: `{}`, channel: `{}`, supplemental information: `{}`", new Object[]{id, str, sourceNode, channel, supplementalInformation});
                this.lastMessageProcessedByNodeId.put(sourceNode, id);
            } catch (Throwable th) {
                log.error("There was a problem handling a cluster message", th);
                log.debug("Finished handling cluster message id: `{}`, current node: `{}`, source node: `{}`, channel: `{}`, supplemental information: `{}`", new Object[]{id, str, sourceNode, channel, supplementalInformation});
                this.lastMessageProcessedByNodeId.put(sourceNode, id);
            }
        } catch (Throwable th2) {
            log.debug("Finished handling cluster message id: `{}`, current node: `{}`, source node: `{}`, channel: `{}`, supplemental information: `{}`", new Object[]{id, str, sourceNode, channel, supplementalInformation});
            this.lastMessageProcessedByNodeId.put(sourceNode, id);
            throw th2;
        }
    }

    public void registerListener(String str, ClusterMessageConsumer clusterMessageConsumer) {
        synchronized (this.listeners) {
            this.listeners.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            }).add(new WeakReference<>(clusterMessageConsumer));
        }
    }

    public void unregisterListener(String str, ClusterMessageConsumer clusterMessageConsumer) {
        synchronized (this.listeners) {
            List<WeakReference<ClusterMessageConsumer>> list = this.listeners.get(str);
            if (list != null) {
                removeRef(list, clusterMessageConsumer);
                if (list.isEmpty()) {
                    this.listeners.remove(str);
                }
            }
        }
    }

    public void unregisterListener(ClusterMessageConsumer clusterMessageConsumer) {
        synchronized (this.listeners) {
            Iterator<List<WeakReference<ClusterMessageConsumer>>> it = this.listeners.values().iterator();
            while (it.hasNext()) {
                List<WeakReference<ClusterMessageConsumer>> next = it.next();
                removeRef(next, clusterMessageConsumer);
                if (next.isEmpty()) {
                    it.remove();
                }
            }
        }
    }

    public void sendRemote(String str, String str2) {
        Assertions.notNull("channel", str);
        Assertions.is("channel exceeds max length", str.length() <= 20);
        Assertions.is("message exceeds max length", str2.length() <= MESSAGE_MAX_LENGTH);
        sendMessage(ClusterManager.ALL_NODES, new Message(str, str2));
    }

    private void sendLocalFromNode(String str, String str2, String str3) {
        ArrayList arrayList = new ArrayList();
        synchronized (this.listeners) {
            List<WeakReference<ClusterMessageConsumer>> list = this.listeners.get(str);
            if (list == null) {
                return;
            }
            Iterator<WeakReference<ClusterMessageConsumer>> it = list.iterator();
            while (it.hasNext()) {
                ClusterMessageConsumer clusterMessageConsumer = it.next().get();
                if (clusterMessageConsumer != null) {
                    arrayList.add(clusterMessageConsumer);
                } else {
                    it.remove();
                }
            }
            if (list.isEmpty()) {
                this.listeners.remove(str);
            }
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((ClusterMessageConsumer) it2.next()).receive(str, str2, str3);
            }
        }
    }

    private <T> void removeRef(List<WeakReference<T>> list, T t) {
        list.removeIf(weakReference -> {
            return weakReference.get() == t;
        });
    }
}
