/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.pubsub;

import io.netty.channel.ChannelFuture;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EventListener;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.PubSubPatternStatusListener;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.SubscribeListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.CountDownLatchPubSub;
import org.redisson.pubsub.LockPubSub;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.SemaphorePubSub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishSubscribeService {
    private static final Logger log = LoggerFactory.getLogger(PublishSubscribeService.class);
    private final ConnectionManager connectionManager;
    private final MasterSlaveServersConfig config;
    private final AsyncSemaphore[] locks = new AsyncSemaphore[50];
    private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
    private final ConcurrentMap<PubSubKey, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<PubSubKey, PubSubConnectionEntry>();
    private final ConcurrentMap<MasterSlaveEntry, PubSubEntry> entry2PubSubConnection = new ConcurrentHashMap<MasterSlaveEntry, PubSubEntry>();
    private final Queue<PubSubConnectionEntry> emptyQueue = new LinkedList<PubSubConnectionEntry>();
    private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
    private final CountDownLatchPubSub countDownLatchPubSub = new CountDownLatchPubSub(this);
    private final LockPubSub lockPubSub = new LockPubSub(this);

    public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
        this.connectionManager = connectionManager;
        this.config = config;
        for (int i = 0; i < this.locks.length; ++i) {
            this.locks[i] = new AsyncSemaphore(1);
        }
    }

    public LockPubSub getLockPubSub() {
        return this.lockPubSub;
    }

    public CountDownLatchPubSub getCountDownLatchPubSub() {
        return this.countDownLatchPubSub;
    }

    public SemaphorePubSub getSemaphorePubSub() {
        return this.semaphorePubSub;
    }

    public PubSubConnectionEntry getPubSubEntry(ChannelName channelName) {
        return (PubSubConnectionEntry)this.name2PubSubConnection.get(this.createKey(channelName));
    }

    public RFuture<Collection<PubSubConnectionEntry>> psubscribe(ChannelName channelName, Codec codec, RedisPubSubListener<?> ... listeners) {
        if (this.isMultiEntity(channelName)) {
            Collection<MasterSlaveEntry> entrySet = this.connectionManager.getEntrySet();
            final AtomicInteger statusCounter = new AtomicInteger(entrySet.size());
            RedisPubSubListener[] ls = (RedisPubSubListener[])Arrays.stream(listeners).map(l -> {
                if (l instanceof PubSubPatternStatusListener) {
                    return new PubSubPatternStatusListener((PubSubPatternStatusListener)l){

                        @Override
                        public boolean onStatus(PubSubType type, CharSequence channel) {
                            if (statusCounter.decrementAndGet() == 0) {
                                return super.onStatus(type, channel);
                            }
                            return true;
                        }
                    };
                }
                return l;
            }).toArray(RedisPubSubListener[]::new);
            RedissonPromise<Collection<PubSubConnectionEntry>> result = new RedissonPromise<Collection<PubSubConnectionEntry>>();
            ConcurrentLinkedQueue entries = new ConcurrentLinkedQueue();
            AtomicInteger counter = new AtomicInteger(entrySet.size());
            for (MasterSlaveEntry entry : entrySet) {
                RFuture<PubSubConnectionEntry> future = this.subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, ls);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        result.tryFailure((Throwable)e);
                        return;
                    }
                    entries.add(res);
                    if (counter.decrementAndGet() == 0) {
                        result.trySuccess(entries);
                    }
                });
            }
            return result;
        }
        RedissonPromise<Collection<PubSubConnectionEntry>> result = new RedissonPromise<Collection<PubSubConnectionEntry>>();
        RFuture<PubSubConnectionEntry> f = this.subscribe(PubSubType.PSUBSCRIBE, codec, channelName, this.getEntry(channelName), listeners);
        f.onComplete((res, e) -> {
            if (e != null) {
                result.tryFailure((Throwable)e);
                return;
            }
            result.trySuccess(Collections.singletonList(res));
        });
        return result;
    }

    private boolean isMultiEntity(ChannelName channelName) {
        return this.connectionManager.isClusterMode() && (channelName.toString().startsWith("__keyspace@") || channelName.toString().startsWith("__keyevent@"));
    }

    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?> ... listeners) {
        return this.subscribe(PubSubType.SUBSCRIBE, codec, channelName, this.getEntry(channelName), listeners);
    }

    private RFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName, MasterSlaveEntry entry, RedisPubSubListener<?> ... listeners) {
        RedissonPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
        AsyncSemaphore lock = this.getSemaphore(channelName);
        lock.acquire(() -> {
            if (promise.isDone()) {
                lock.release();
                return;
            }
            this.subscribe(codec, channelName, entry, promise, type, lock, listeners);
        });
        return promise;
    }

    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?> ... listeners) {
        RedissonPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
        this.subscribe(codec, new ChannelName(channelName), this.getEntry(new ChannelName(channelName)), promise, PubSubType.SUBSCRIBE, semaphore, listeners);
        return promise;
    }

    public AsyncSemaphore getSemaphore(ChannelName channelName) {
        return this.locks[Math.abs(channelName.hashCode() % this.locks.length)];
    }

    private PubSubKey createKey(ChannelName channelName) {
        MasterSlaveEntry entry = this.getEntry(channelName);
        return new PubSubKey(channelName, entry);
    }

    private void subscribe(Codec codec, ChannelName channelName, MasterSlaveEntry entry, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?> ... listeners) {
        PubSubConnectionEntry connEntry = (PubSubConnectionEntry)this.name2PubSubConnection.get(new PubSubKey(channelName, entry));
        if (connEntry != null) {
            this.addListeners(channelName, promise, type, lock, connEntry, listeners);
            return;
        }
        this.freePubSubLock.acquire(() -> {
            if (promise.isDone()) {
                lock.release();
                this.freePubSubLock.release();
                return;
            }
            MasterSlaveEntry msEntry = Optional.of(this.connectionManager.getEntry(entry.getClient())).filter(e -> e != entry).orElse(entry);
            PubSubEntry freePubSubConnections = this.entry2PubSubConnection.getOrDefault(msEntry, new PubSubEntry());
            PubSubConnectionEntry freeEntry = freePubSubConnections.getEntries().peek();
            if (freeEntry == null) {
                this.connect(codec, channelName, msEntry, promise, type, lock, listeners);
                return;
            }
            int remainFreeAmount = freeEntry.tryAcquire();
            if (remainFreeAmount == -1) {
                throw new IllegalStateException();
            }
            PubSubKey key = new PubSubKey(channelName, msEntry);
            PubSubConnectionEntry oldEntry = this.name2PubSubConnection.putIfAbsent(key, freeEntry);
            if (oldEntry != null) {
                freeEntry.release();
                this.freePubSubLock.release();
                this.addListeners(channelName, promise, type, lock, oldEntry, listeners);
                return;
            }
            if (remainFreeAmount == 0) {
                freePubSubConnections.getEntries().poll();
            }
            this.freePubSubLock.release();
            RFuture<Void> subscribeFuture = this.addListeners(channelName, promise, type, lock, freeEntry, listeners);
            ChannelFuture future = PubSubType.PSUBSCRIBE == type ? freeEntry.psubscribe(codec, channelName) : freeEntry.subscribe(codec, channelName);
            future.addListener(f -> {
                if (!f.isSuccess()) {
                    if (!promise.isDone()) {
                        subscribeFuture.cancel(false);
                    }
                    return;
                }
                this.connectionManager.newTimeout(timeout -> subscribeFuture.cancel(false), this.config.getTimeout(), TimeUnit.MILLISECONDS);
            });
        });
    }

    private MasterSlaveEntry getEntry(ChannelName channelName) {
        int slot = this.connectionManager.calcSlot(channelName.getName());
        return this.connectionManager.getEntry(slot);
    }

    private RFuture<Void> addListeners(ChannelName channelName, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, PubSubConnectionEntry connEntry, RedisPubSubListener<?> ... listeners) {
        for (RedisPubSubListener<?> listener : listeners) {
            connEntry.addListener(channelName, listener);
        }
        SubscribeListener list = connEntry.getSubscribeFuture(channelName, type);
        RFuture<Void> subscribeFuture = list.getSuccessFuture();
        subscribeFuture.onComplete((res, e) -> {
            if (!promise.trySuccess(connEntry)) {
                for (RedisPubSubListener listener : listeners) {
                    connEntry.removeListener(channelName, listener);
                }
                if (!connEntry.hasListeners(channelName)) {
                    this.unsubscribe(type, channelName).onComplete((r, ex) -> lock.release());
                } else {
                    lock.release();
                }
            } else {
                lock.release();
            }
        });
        return subscribeFuture;
    }

    private RFuture<RedisPubSubConnection> nextPubSubConnection(MasterSlaveEntry entry, ChannelName channelName) {
        if (entry == null) {
            int slot = this.connectionManager.calcSlot(channelName.getName());
            RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for slot: " + slot + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
            return RedissonPromise.newFailedFuture(ex);
        }
        return entry.nextPubSubConnection();
    }

    private void connect(Codec codec, ChannelName channelName, MasterSlaveEntry msEntry, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?> ... listeners) {
        RFuture<RedisPubSubConnection> connFuture = this.nextPubSubConnection(msEntry, channelName);
        promise.onComplete((res, e) -> {
            if (e != null) {
                ((RPromise)connFuture).tryFailure((Throwable)e);
            }
        });
        connFuture.onComplete((conn, ex) -> {
            if (ex != null) {
                this.freePubSubLock.release();
                lock.release();
                promise.tryFailure((Throwable)ex);
                return;
            }
            PubSubConnectionEntry entry = new PubSubConnectionEntry((RedisPubSubConnection)conn, this.config.getSubscriptionsPerConnection());
            int remainFreeAmount = entry.tryAcquire();
            PubSubKey key = new PubSubKey(channelName, msEntry);
            PubSubConnectionEntry oldEntry = this.name2PubSubConnection.putIfAbsent(key, entry);
            if (oldEntry != null) {
                msEntry.returnPubSubConnection(entry);
                this.freePubSubLock.release();
                this.addListeners(channelName, promise, type, lock, oldEntry, listeners);
                return;
            }
            if (remainFreeAmount > 0) {
                this.addFreeConnectionEntry(channelName, entry);
            }
            this.freePubSubLock.release();
            RFuture<Void> subscribeFuture = this.addListeners(channelName, promise, type, lock, entry, listeners);
            ChannelFuture future = PubSubType.PSUBSCRIBE == type ? entry.psubscribe(codec, channelName) : entry.subscribe(codec, channelName);
            future.addListener(future1 -> {
                if (!future1.isSuccess()) {
                    if (!promise.isDone()) {
                        subscribeFuture.cancel(false);
                    }
                    return;
                }
                this.connectionManager.newTimeout(timeout -> subscribeFuture.cancel(false), this.config.getTimeout(), TimeUnit.MILLISECONDS);
            });
        });
    }

    public RFuture<Void> unsubscribe(final PubSubType topicType, final ChannelName channelName) {
        final PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.remove(this.createKey(channelName));
        if (entry == null || this.connectionManager.isShuttingDown()) {
            return RedissonPromise.newSucceededFuture(null);
        }
        final AtomicBoolean executed = new AtomicBoolean();
        final RedissonPromise<Void> result = new RedissonPromise<Void>();
        BaseRedisPubSubListener listener = new BaseRedisPubSubListener(){

            @Override
            public boolean onStatus(PubSubType type, CharSequence channel) {
                if (type == topicType && channel.equals(channelName)) {
                    executed.set(true);
                    if (entry.release() == 1) {
                        PublishSubscribeService.this.addFreeConnectionEntry(channelName, entry);
                    }
                    result.trySuccess(null);
                    return true;
                }
                return false;
            }
        };
        ChannelFuture future = topicType == PubSubType.UNSUBSCRIBE ? entry.unsubscribe(channelName, listener) : entry.punsubscribe(channelName, listener);
        future.addListener(f -> {
            if (!f.isSuccess()) {
                return;
            }
            this.connectionManager.newTimeout(timeout -> {
                if (executed.get()) {
                    return;
                }
                entry.getConnection().onMessage(new PubSubStatusMessage(topicType, channelName));
            }, this.config.getTimeout(), TimeUnit.MILLISECONDS);
        });
        return result;
    }

    public void remove(MasterSlaveEntry entry) {
        this.entry2PubSubConnection.remove(entry);
    }

    public RFuture<Codec> unsubscribe(ChannelName channelName, PubSubType topicType) {
        return this.unsubscribe(channelName, this.getEntry(channelName), topicType);
    }

    private RFuture<Codec> unsubscribe(final ChannelName channelName, MasterSlaveEntry e, final PubSubType topicType) {
        if (this.connectionManager.isShuttingDown()) {
            return RedissonPromise.newSucceededFuture(null);
        }
        final RedissonPromise<Codec> result = new RedissonPromise<Codec>();
        final AsyncSemaphore lock = this.getSemaphore(channelName);
        lock.acquire(() -> {
            PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.remove(new PubSubKey(channelName, e));
            if (entry == null) {
                lock.release();
                result.trySuccess(null);
                return;
            }
            this.freePubSubLock.acquire(() -> {
                PubSubEntry ee = this.entry2PubSubConnection.getOrDefault(e, new PubSubEntry());
                Queue<PubSubConnectionEntry> freePubSubConnections = ee.getEntries();
                freePubSubConnections.remove(entry);
                this.freePubSubLock.release();
                final Codec entryCodec = topicType == PubSubType.PUNSUBSCRIBE ? entry.getConnection().getPatternChannels().get(channelName) : entry.getConnection().getChannels().get(channelName);
                final AtomicBoolean executed = new AtomicBoolean();
                BaseRedisPubSubListener listener = new BaseRedisPubSubListener(){

                    @Override
                    public boolean onStatus(PubSubType type, CharSequence channel) {
                        if (type == topicType && channel.equals(channelName)) {
                            executed.set(true);
                            lock.release();
                            result.trySuccess(entryCodec);
                            return true;
                        }
                        return false;
                    }
                };
                ChannelFuture future = topicType == PubSubType.PUNSUBSCRIBE ? entry.punsubscribe(channelName, listener) : entry.unsubscribe(channelName, listener);
                future.addListener(f -> {
                    if (!f.isSuccess()) {
                        return;
                    }
                    this.connectionManager.newTimeout(timeout -> {
                        if (executed.get()) {
                            return;
                        }
                        entry.getConnection().onMessage(new PubSubStatusMessage(topicType, channelName));
                    }, this.config.getTimeout(), TimeUnit.MILLISECONDS);
                });
            });
        });
        return result;
    }

    private void addFreeConnectionEntry(ChannelName channelName, PubSubConnectionEntry entry) {
        MasterSlaveEntry me = this.getEntry(channelName);
        PubSubEntry psEntry = this.entry2PubSubConnection.computeIfAbsent(me, e -> new PubSubEntry());
        psEntry.getEntries().add(entry);
    }

    public void reattachPubSub(int slot) {
        this.name2PubSubConnection.entrySet().stream().filter(e -> this.connectionManager.calcSlot(((PubSubKey)e.getKey()).getChannelName().getName()) == slot).forEach(entry -> {
            Codec patternCodec;
            PubSubConnectionEntry pubSubEntry = (PubSubConnectionEntry)entry.getValue();
            MasterSlaveEntry ee = ((PubSubKey)entry.getKey()).getEntry();
            Codec codec = pubSubEntry.getConnection().getChannels().get(((PubSubKey)entry.getKey()).getChannelName());
            if (codec != null) {
                Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(((PubSubKey)entry.getKey()).getChannelName());
                this.unsubscribe(((PubSubKey)entry.getKey()).getChannelName(), ee, PubSubType.UNSUBSCRIBE);
                this.subscribe(codec, ((PubSubKey)entry.getKey()).getChannelName(), listeners.toArray(new RedisPubSubListener[0]));
            }
            if ((patternCodec = pubSubEntry.getConnection().getPatternChannels().get(((PubSubKey)entry.getKey()).getChannelName())) != null) {
                Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(((PubSubKey)entry.getKey()).getChannelName());
                this.unsubscribe(((PubSubKey)entry.getKey()).getChannelName(), ee, PubSubType.PUNSUBSCRIBE);
                this.psubscribe(((PubSubKey)entry.getKey()).getChannelName(), patternCodec, listeners.toArray(new RedisPubSubListener[0]));
            }
        });
    }

    public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
        for (Map.Entry e : this.entry2PubSubConnection.entrySet()) {
            for (PubSubConnectionEntry entry : ((PubSubEntry)e.getValue()).getEntries()) {
                Queue<RedisPubSubListener<?>> listeners;
                if (!entry.getConnection().equals(redisPubSubConnection)) continue;
                this.freePubSubLock.acquire(() -> {
                    ((PubSubEntry)e.getValue()).getEntries().remove(entry);
                    this.freePubSubLock.release();
                });
                for (ChannelName channelName : redisPubSubConnection.getChannels().keySet()) {
                    listeners = entry.getListeners(channelName);
                    this.reattachPubSubListeners(channelName, (MasterSlaveEntry)e.getKey(), listeners, PubSubType.UNSUBSCRIBE);
                }
                for (ChannelName channelName : redisPubSubConnection.getPatternChannels().keySet()) {
                    listeners = entry.getListeners(channelName);
                    this.reattachPubSubListeners(channelName, (MasterSlaveEntry)e.getKey(), listeners, PubSubType.PUNSUBSCRIBE);
                }
                return;
            }
        }
    }

    private void reattachPubSubListeners(ChannelName channelName, MasterSlaveEntry en, Collection<RedisPubSubListener<?>> listeners, PubSubType topicType) {
        RFuture<Codec> subscribeCodecFuture = this.unsubscribe(channelName, en, topicType);
        if (listeners.isEmpty()) {
            return;
        }
        subscribeCodecFuture.onComplete((subscribeCodec, e) -> {
            if (subscribeCodec == null) {
                return;
            }
            if (topicType == PubSubType.PUNSUBSCRIBE) {
                this.psubscribe(channelName, listeners, (Codec)subscribeCodec);
            } else {
                this.subscribe(channelName, listeners, (Codec)subscribeCodec);
            }
        });
    }

    private void subscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> listeners, Codec subscribeCodec) {
        RFuture<PubSubConnectionEntry> subscribeFuture = this.subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[0]));
        subscribeFuture.onComplete((res, e) -> {
            if (e != null) {
                this.connectionManager.newTimeout(task -> this.subscribe(channelName, listeners, subscribeCodec), 1L, TimeUnit.SECONDS);
                return;
            }
            log.info("listeners of '{}' channel to '{}' have been resubscribed", (Object)channelName, (Object)res.getConnection().getRedisClient());
        });
    }

    private void psubscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> listeners, Codec subscribeCodec) {
        RFuture<Collection<PubSubConnectionEntry>> subscribeFuture = this.psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[0]));
        subscribeFuture.onComplete((res, e) -> {
            if (e != null) {
                this.connectionManager.newTimeout(task -> this.psubscribe(channelName, listeners, subscribeCodec), 1L, TimeUnit.SECONDS);
                return;
            }
            log.info("listeners of '{}' channel-pattern to '{}' have been resubscribed", (Object)channelName, res);
        });
    }

    public RFuture<Void> removeListenerAsync(PubSubType type, ChannelName channelName, EventListener listener) {
        RedissonPromise<Void> promise = new RedissonPromise<Void>();
        AsyncSemaphore semaphore = this.getSemaphore(channelName);
        semaphore.acquire(() -> {
            Collection<MasterSlaveEntry> entries = Collections.singletonList(this.getEntry(channelName));
            if (this.isMultiEntity(channelName)) {
                entries = this.connectionManager.getEntrySet();
            }
            AtomicInteger counter = new AtomicInteger(entries.size());
            for (MasterSlaveEntry e : entries) {
                PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.get(new PubSubKey(channelName, e));
                if (entry == null) {
                    if (counter.decrementAndGet() != 0) continue;
                    semaphore.release();
                    promise.trySuccess(null);
                    continue;
                }
                entry.removeListener(channelName, listener);
                if (!entry.hasListeners(channelName)) {
                    this.unsubscribe(type, channelName).onComplete((r, ex) -> {
                        if (counter.decrementAndGet() == 0) {
                            semaphore.release();
                            promise.trySuccess(null);
                        }
                    });
                    continue;
                }
                if (counter.decrementAndGet() != 0) continue;
                semaphore.release();
                promise.trySuccess(null);
            }
        });
        return promise;
    }

    public RFuture<Void> removeListenerAsync(PubSubType type, ChannelName channelName, Integer ... listenerIds) {
        RedissonPromise<Void> promise = new RedissonPromise<Void>();
        AsyncSemaphore semaphore = this.getSemaphore(channelName);
        semaphore.acquire(() -> {
            Collection<MasterSlaveEntry> entries = Collections.singletonList(this.getEntry(channelName));
            if (this.isMultiEntity(channelName)) {
                entries = this.connectionManager.getEntrySet();
            }
            AtomicInteger counter = new AtomicInteger(entries.size());
            for (MasterSlaveEntry e : entries) {
                PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.get(new PubSubKey(channelName, e));
                if (entry == null) {
                    if (counter.decrementAndGet() != 0) continue;
                    semaphore.release();
                    promise.trySuccess(null);
                    continue;
                }
                Integer[] integerArray = listenerIds;
                int n = integerArray.length;
                for (int i = 0; i < n; ++i) {
                    int id = integerArray[i];
                    entry.removeListener(channelName, id);
                }
                if (!entry.hasListeners(channelName)) {
                    this.unsubscribe(type, channelName).onComplete((r, ex) -> {
                        if (counter.decrementAndGet() == 0) {
                            semaphore.release();
                            promise.trySuccess(null);
                        }
                    });
                    continue;
                }
                if (counter.decrementAndGet() != 0) continue;
                semaphore.release();
                promise.trySuccess(null);
            }
        });
        return promise;
    }

    public String toString() {
        return "PublishSubscribeService [name2PubSubConnection=" + this.name2PubSubConnection + ", entry2PubSubConnection=" + this.entry2PubSubConnection + "]";
    }

    public static class PubSubEntry {
        Set<PubSubKey> keys = Collections.newSetFromMap(new ConcurrentHashMap());
        Queue<PubSubConnectionEntry> entries = new ConcurrentLinkedQueue<PubSubConnectionEntry>();

        public Set<PubSubKey> getKeys() {
            return this.keys;
        }

        public Queue<PubSubConnectionEntry> getEntries() {
            return this.entries;
        }
    }

    public static class PubSubKey {
        private final ChannelName channelName;
        private final MasterSlaveEntry entry;

        public PubSubKey(ChannelName channelName, MasterSlaveEntry entry) {
            this.channelName = channelName;
            this.entry = entry;
        }

        public ChannelName getChannelName() {
            return this.channelName;
        }

        public MasterSlaveEntry getEntry() {
            return this.entry;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            PubSubKey key = (PubSubKey)o;
            return Objects.equals(this.channelName, key.channelName) && Objects.equals(this.entry, key.entry);
        }

        public int hashCode() {
            return Objects.hash(this.channelName, this.entry);
        }
    }
}

