package com.atlassian.stash.internal.topic;

import com.atlassian.bitbucket.topic.MessageEvent;
import com.atlassian.bitbucket.topic.Topic;
import com.atlassian.bitbucket.topic.TopicListener;
import com.atlassian.bitbucket.topic.TopicService;
import com.atlassian.bitbucket.topic.TopicSettings;
import com.atlassian.bitbucket.util.Timer;
import com.atlassian.bitbucket.util.TimerUtils;
import com.atlassian.bitbucket.util.concurrent.ExecutorUtils;
import com.atlassian.hazelcast.serialization.OsgiSafe;
import com.atlassian.stash.internal.cluster.HazelcastClusterNode;
import com.atlassian.stash.internal.server.InternalApplicationPropertiesService;
import com.google.common.base.Preconditions;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import java.io.Serializable;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import javax.annotation.PreDestroy;
import org.osgi.framework.AdminPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/bitbucket-service-impl-5.16.0.jar:com/atlassian/stash/internal/topic/HazelcastTopicService.class */
public class HazelcastTopicService implements TopicService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) HazelcastTopicService.class);
    private final int defaultTopicQueueSize;
    private final ExecutorService executorService;
    private final HazelcastInstance hazelcast;
    private final int maxTopicQueueSize;
    private final InternalApplicationPropertiesService propertiesService;
    private volatile boolean destroyed;
    private final Map<String, AtomicInteger> activeDispatchCountByTopic = new ConcurrentHashMap();
    private final Map<String, DefaultTopic> topics = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/bitbucket-service-impl-5.16.0.jar:com/atlassian/stash/internal/topic/HazelcastTopicService$DefaultTopic.class */
    public class DefaultTopic<T extends Serializable> implements Topic<T> {
        private final BlockingQueue<MessageEvent<T>> messages;
        private final TopicSettings<T> settings;
        private final String timerString;
        private final String topicName;
        private ITopic<OsgiSafe<T>> hazelcastTopic;
        private String registrationId;
        private volatile long nextLogTimestamp;
        private final AtomicLong droppedEventCount = new AtomicLong();
        private final Object lock = new Object();
        private final List<Subscription<T>> subscriptions = new CopyOnWriteArrayList();

        DefaultTopic(@Nonnull String str, @Nonnull TopicSettings<T> topicSettings) {
            this.settings = (TopicSettings) Objects.requireNonNull(topicSettings, "settings");
            this.topicName = (String) Objects.requireNonNull(str, "topicName");
            this.messages = new LinkedBlockingQueue(topicSettings.getQueueSize());
            this.timerString = "TopicListener.onMessage('" + str + "', T)";
        }

        @Override // com.atlassian.bitbucket.topic.Topic
        @Nonnull
        public TopicSettings<T> getSettings() {
            return this.settings;
        }

        @Override // com.atlassian.bitbucket.topic.Topic
        public void publish(@Nonnull T t) {
            HazelcastTopicService.this.checkNotDestroyed();
            getHazelcastTopic().publish(new OsgiSafe<>(t));
        }

        @Override // com.atlassian.bitbucket.topic.Topic
        @Nonnull
        public String subscribe(@Nonnull TopicListener<T> topicListener) {
            HazelcastTopicService.this.checkNotDestroyed();
            Subscription<T> subscription = new Subscription<>((TopicListener) Objects.requireNonNull(topicListener, AdminPermission.LISTENER));
            synchronized (this.lock) {
                this.subscriptions.add(subscription);
                if (this.subscriptions.size() == 1) {
                    register();
                }
            }
            return subscription.getId();
        }

        public String getName() {
            return this.topicName;
        }

        @Override // com.atlassian.bitbucket.topic.Topic
        public boolean unsubscribe(@Nonnull String str) {
            HazelcastTopicService.this.checkNotDestroyed();
            Objects.requireNonNull(str, "subscriptionId");
            synchronized (this.lock) {
                if (!this.subscriptions.removeIf(subscription -> {
                    return str.equals(subscription.getId());
                })) {
                    return false;
                }
                if (this.subscriptions.isEmpty()) {
                    unregister();
                    this.messages.clear();
                }
                return true;
            }
        }

        private T castMessage(Object obj) {
            return (T) this.settings.getMessageType().map(cls -> {
                return (Serializable) cls.cast(obj);
            }).orElse((Serializable) obj);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void destroy() {
            synchronized (this.lock) {
                unregister();
                int size = this.messages.size();
                this.messages.clear();
                this.subscriptions.clear();
                long j = this.droppedEventCount.get();
                if (j > 0 || size > 0) {
                    Logger logger = HazelcastTopicService.log;
                    Object[] objArr = new Object[3];
                    objArr[0] = this.topicName;
                    objArr[1] = Integer.valueOf(size);
                    objArr[2] = j > 0 ? " (" + j + " were dropped in the last minute because the queue was full)" : "";
                    logger.info("[{}] shutting down. {} messages were still in the queue{}.", objArr);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void dispatchNextMessage() {
            MessageEvent<T> poll = this.messages.poll();
            if (poll == null) {
                return;
            }
            int i = 0;
            Iterator<Subscription<T>> it = this.subscriptions.iterator();
            while (it.hasNext()) {
                TopicListener<T> listener = it.next().getListener();
                try {
                    Timer start = TimerUtils.start(this.timerString);
                    Throwable th = null;
                    try {
                        try {
                            listener.onMessage(poll);
                            i++;
                            if (start != null) {
                                if (0 != 0) {
                                    try {
                                        start.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    start.close();
                                }
                            }
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                            break;
                        }
                    } catch (Throwable th4) {
                        if (start != null) {
                            if (th != null) {
                                try {
                                    start.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                start.close();
                            }
                        }
                        throw th4;
                        break;
                    }
                } catch (Exception e) {
                    HazelcastTopicService.log.warn("[{}] error dispatching message to listener {}", this.topicName, listener.getClass(), e);
                }
            }
            if (!HazelcastTopicService.log.isTraceEnabled() || i <= 0) {
                return;
            }
            Logger logger = HazelcastTopicService.log;
            Object[] objArr = new Object[4];
            objArr[0] = this.topicName;
            objArr[1] = poll.getSource().getAddress();
            objArr[2] = Integer.valueOf(i);
            objArr[3] = i > 1 ? "s" : "";
            logger.trace("[{}] successfully dispatched a message from {} to {} listener{}", objArr);
        }

        private ITopic<OsgiSafe<T>> getHazelcastTopic() {
            if (this.hazelcastTopic == null) {
                this.hazelcastTopic = HazelcastTopicService.this.getHazelcastTopic(this.topicName);
            }
            return this.hazelcastTopic;
        }

        private void maybeLogFullQueueWarning() {
            long j = this.droppedEventCount.get();
            long nanoTime = System.nanoTime();
            if (j <= 0 || nanoTime <= this.nextLogTimestamp) {
                return;
            }
            synchronized (this.lock) {
                if (nanoTime <= this.nextLogTimestamp) {
                    return;
                }
                this.nextLogTimestamp = nanoTime + TimeUnit.MINUTES.toNanos(1L);
                HazelcastTopicService.log.warn("[{}] Message queue is full. Dropped {} events in the last minute.", this.topicName, Long.valueOf(this.droppedEventCount.getAndSet(0L)));
            }
        }

        private void register() {
            if (this.registrationId != null) {
                unregister();
            }
            this.registrationId = getHazelcastTopic().addMessageListener(message -> {
                T castMessage = castMessage(((OsgiSafe) message.getMessageObject()).getValue());
                if (this.settings.isDedupePendingMessages() && this.messages.stream().anyMatch(messageEvent -> {
                    return Objects.equals(messageEvent.getMessage(), castMessage);
                })) {
                    HazelcastTopicService.log.trace("[{}] not queueing message because an identical message is already in the queue", this.topicName);
                    return;
                }
                if (this.messages.offer(new DefaultMessageEvent(HazelcastClusterNode.transform(message.getPublishingMember()), message.getPublishTime(), this.topicName, castMessage))) {
                    HazelcastTopicService.log.trace("[{}] queued a message from {}", this.topicName, message.getPublishingMember());
                    HazelcastTopicService.this.scheduleDispatch(this);
                } else {
                    this.droppedEventCount.incrementAndGet();
                }
                maybeLogFullQueueWarning();
            });
        }

        private void unregister() {
            if (this.registrationId == null || this.hazelcastTopic == null) {
                return;
            }
            this.hazelcastTopic.removeMessageListener(this.registrationId);
            this.registrationId = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/bitbucket-service-impl-5.16.0.jar:com/atlassian/stash/internal/topic/HazelcastTopicService$Subscription.class */
    public class Subscription<T extends Serializable> {
        private final String id;
        private final TopicListener<T> listener;

        private Subscription(TopicListener<T> topicListener) {
            this.listener = topicListener;
            this.id = UUID.randomUUID().toString();
        }

        public String getId() {
            return this.id;
        }

        public TopicListener<T> getListener() {
            return this.listener;
        }
    }

    public HazelcastTopicService(ExecutorService executorService, HazelcastInstance hazelcastInstance, InternalApplicationPropertiesService internalApplicationPropertiesService, int i, int i2) {
        this.defaultTopicQueueSize = i;
        this.executorService = executorService;
        this.hazelcast = hazelcastInstance;
        this.maxTopicQueueSize = i2;
        this.propertiesService = internalApplicationPropertiesService;
    }

    @Override // com.atlassian.bitbucket.topic.TopicService
    @Nonnull
    public <T extends Serializable> Topic<T> getTopic(@Nonnull String str, @Nonnull TopicSettings<T> topicSettings) {
        return this.topics.computeIfAbsent(str, str2 -> {
            return new DefaultTopic(str, new TopicSettings.Builder(topicSettings).queueSize(getQueueSize(str2, topicSettings.getQueueSize())).build());
        });
    }

    @PreDestroy
    public void destroy() {
        this.destroyed = true;
        ExecutorUtils.shutdown(this.executorService, log);
        this.topics.values().forEach(obj -> {
            ((DefaultTopic) obj).destroy();
        });
        this.topics.clear();
        this.activeDispatchCountByTopic.clear();
    }

    @Override // com.atlassian.bitbucket.topic.TopicService
    public <T extends Serializable> void publish(@Nonnull String str, @Nonnull T t) {
        checkNotDestroyed();
        getHazelcastTopic(str).publish(new OsgiSafe<>(t));
    }

    @Override // com.atlassian.bitbucket.topic.TopicService
    @Nonnull
    public <T extends Serializable> String subscribe(@Nonnull String str, @Nonnull TopicListener<T> topicListener) {
        checkNotDestroyed();
        return getTopic(str, new TopicSettings.Builder(getMessageType(topicListener)).build()).subscribe(topicListener);
    }

    @Override // com.atlassian.bitbucket.topic.TopicService
    public boolean unsubscribe(@Nonnull String str) {
        checkNotDestroyed();
        Iterator<DefaultTopic> it = this.topics.values().iterator();
        while (it.hasNext()) {
            if (it.next().unsubscribe(str)) {
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkNotDestroyed() {
        Preconditions.checkState(!this.destroyed, "The TopicService has already been destroyed");
    }

    private void dispatchNextMessage(DefaultTopic<?> defaultTopic) {
        AtomicInteger computeIfAbsent = this.activeDispatchCountByTopic.computeIfAbsent(defaultTopic.getName(), str -> {
            return new AtomicInteger(0);
        });
        int incrementAndGet = computeIfAbsent.incrementAndGet();
        if (incrementAndGet > 1) {
            return;
        }
        while (incrementAndGet > 0 && !this.destroyed) {
            defaultTopic.dispatchNextMessage();
            incrementAndGet = computeIfAbsent.decrementAndGet();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T extends Serializable> ITopic<OsgiSafe<T>> getHazelcastTopic(String str) {
        return this.hazelcast.getTopic("topicService:" + str);
    }

    private int getQueueSize(String str, int i) {
        if (i != -1) {
            return limitQueueSize(str, i);
        }
        int min = Math.min(this.propertiesService.getProperty("topic." + str.replaceAll("\\s+", "_") + ".message.max.queue", Integer.MIN_VALUE), this.maxTopicQueueSize);
        if (min == Integer.MIN_VALUE) {
            return this.defaultTopicQueueSize;
        }
        log.info("[{}] set message queue size to {} as configured in the application properties", str, Integer.valueOf(min));
        return limitQueueSize(str, min);
    }

    private int limitQueueSize(String str, int i) {
        if (i <= this.maxTopicQueueSize) {
            return i;
        }
        log.info("[{}] reducing message queue size to {} because the configured value ({}) exceeds the maximum allowed queue size", str, Integer.valueOf(this.maxTopicQueueSize), Integer.valueOf(i));
        return this.maxTopicQueueSize;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleDispatch(DefaultTopic<?> defaultTopic) {
        this.executorService.submit(() -> {
            dispatchNextMessage(defaultTopic);
        });
    }

    private static <T extends Serializable> Class<T> getMessageType(TopicListener<T> topicListener) {
        for (Type type : topicListener.getClass().getGenericInterfaces()) {
            if (type instanceof ParameterizedType) {
                ParameterizedType parameterizedType = (ParameterizedType) type;
                if (parameterizedType.getRawType().equals(TopicListener.class)) {
                    return (Class) parameterizedType.getActualTypeArguments()[0];
                }
            }
        }
        return null;
    }
}
