/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.pubsub;

import io.netty.util.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EventListener;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.redisson.PubSubPatternStatusListener;
import org.redisson.PubSubStatusListener;
import org.redisson.api.RFuture;
import org.redisson.api.listener.FlushListener;
import org.redisson.api.listener.TrackingListener;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.config.ShardedSubscriptionMode;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.AsyncSemaphore;
import org.redisson.misc.Tuple;
import org.redisson.pubsub.CountDownLatchPubSub;
import org.redisson.pubsub.LockPubSub;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.SemaphorePubSub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishSubscribeService {
    private static final Logger log = LoggerFactory.getLogger(PublishSubscribeService.class);
    private final ConnectionManager connectionManager;
    private final MasterSlaveServersConfig config;
    private final AsyncSemaphore[] locks = new AsyncSemaphore[50];
    private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
    private final Map<ChannelName, Collection<PubSubConnectionEntry>> name2entry = new ConcurrentHashMap<ChannelName, Collection<PubSubConnectionEntry>>();
    private final ConcurrentMap<PubSubKey, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<PubSubKey, PubSubConnectionEntry>();
    private final ConcurrentMap<MasterSlaveEntry, PubSubEntry> entry2PubSubConnection = new ConcurrentHashMap<MasterSlaveEntry, PubSubEntry>();
    private final Map<Tuple<ChannelName, ClientConnectionsEntry>, PubSubConnectionEntry> key2connection = new ConcurrentHashMap<Tuple<ChannelName, ClientConnectionsEntry>, PubSubConnectionEntry>();
    private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
    private final CountDownLatchPubSub countDownLatchPubSub = new CountDownLatchPubSub(this);
    private final LockPubSub lockPubSub = new LockPubSub(this);
    private final Set<PubSubConnectionEntry> trackedEntries = Collections.newSetFromMap(new ConcurrentHashMap());
    private boolean shardingSupported = false;
    private boolean patternSupported = true;
    private final Map<Integer, Collection<Integer>> flushListeners = new ConcurrentHashMap<Integer, Collection<Integer>>();

    public PublishSubscribeService(ConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
        this.config = connectionManager.getServiceManager().getConfig();
        for (int i = 0; i < this.locks.length; ++i) {
            this.locks[i] = new AsyncSemaphore(1);
        }
    }

    public LockPubSub getLockPubSub() {
        return this.lockPubSub;
    }

    public CountDownLatchPubSub getCountDownLatchPubSub() {
        return this.countDownLatchPubSub;
    }

    public SemaphorePubSub getSemaphorePubSub() {
        return this.semaphorePubSub;
    }

    public int countListeners(List<ChannelName> channelNames) {
        int result = 0;
        for (ChannelName channelName : channelNames) {
            Collection entries = this.name2entry.getOrDefault(channelName, Collections.emptySet());
            Iterator it = entries.iterator();
            if (!it.hasNext()) continue;
            result += ((PubSubConnectionEntry)it.next()).countListeners(channelName);
        }
        return result;
    }

    public boolean hasEntry(ChannelName channelName) {
        return this.name2entry.containsKey(channelName);
    }

    public CompletableFuture<Collection<PubSubConnectionEntry>> psubscribe(ChannelName channelName, Codec codec, RedisPubSubListener<?> ... listeners) {
        if (this.isMultiEntity(channelName)) {
            Collection<MasterSlaveEntry> entrySet = this.connectionManager.getEntrySet();
            final AtomicInteger statusCounter = new AtomicInteger(entrySet.size());
            RedisPubSubListener[] ls = (RedisPubSubListener[])Arrays.stream(listeners).map(l -> {
                if (l instanceof PubSubPatternStatusListener) {
                    return new PubSubPatternStatusListener((PubSubPatternStatusListener)l){

                        @Override
                        public void onStatus(PubSubType type, CharSequence channel) {
                            if (statusCounter.get() == 0 || statusCounter.decrementAndGet() == 0) {
                                super.onStatus(type, channel);
                            }
                        }
                    };
                }
                return l;
            }).toArray(RedisPubSubListener[]::new);
            ArrayList<CompletableFuture<PubSubConnectionEntry>> futures = new ArrayList<CompletableFuture<PubSubConnectionEntry>>();
            for (MasterSlaveEntry entry : entrySet) {
                CompletableFuture<PubSubConnectionEntry> future = this.subscribe(PubSubType.PSUBSCRIBE, codec, ChannelName.newList(channelName), entry, entry.getEntry(), ls);
                futures.add(future);
            }
            CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
            return future.thenApply(r -> futures.stream().map(v -> v.getNow(null)).collect(Collectors.toList()));
        }
        MasterSlaveEntry entry = this.getEntry(channelName);
        if (entry == null) {
            int slot = this.connectionManager.calcSlot(channelName.getName());
            return this.connectionManager.getServiceManager().createNodeNotFoundFuture(channelName.toString(), slot);
        }
        CompletableFuture<PubSubConnectionEntry> f = this.subscribe(PubSubType.PSUBSCRIBE, codec, ChannelName.newList(channelName), entry, null, listeners);
        return f.thenApply(res -> Collections.singletonList(res));
    }

    public boolean isMultiEntity(ChannelName channelName) {
        return !this.connectionManager.getServiceManager().getCfg().isSingleConfig() && channelName.isKeyspace();
    }

    public CompletableFuture<Integer> subscribe(CommandAsyncExecutor commandExecutor, final FlushListener listener) {
        int listenerId = System.identityHashCode(listener);
        ArrayList<CompletableFuture<PubSubConnectionEntry>> ffs = new ArrayList<CompletableFuture<PubSubConnectionEntry>>();
        for (final MasterSlaveEntry entry : this.connectionManager.getEntrySet()) {
            RedisPubSubListener<Object> entryListener = new RedisPubSubListener<Object>(){

                @Override
                public void onMessage(CharSequence channel, Object msg) {
                    if (msg == null && channel.equals(ChannelName.TRACKING.toString())) {
                        listener.onFlush(entry.getClient().getAddr());
                    }
                }
            };
            int entryListenerId = System.identityHashCode(entryListener);
            Collection listeners = this.flushListeners.computeIfAbsent(listenerId, k -> new HashSet());
            listeners.add(entryListenerId);
            CompletableFuture<PubSubConnectionEntry> future = this.subscribe(PubSubType.SUBSCRIBE, StringCodec.INSTANCE, ChannelName.newList(ChannelName.TRACKING), entry, entry.getEntry(), entryListener);
            ffs.add(future);
        }
        return this.registerClientTrackingListener(commandExecutor, ffs, listenerId, null);
    }

    public CompletableFuture<Integer> subscribe(CommandAsyncExecutor commandExecutor, final TrackingListener listener) {
        int listenerId = System.identityHashCode(listener);
        ArrayList<CompletableFuture<PubSubConnectionEntry>> ffs = new ArrayList<CompletableFuture<PubSubConnectionEntry>>();
        for (MasterSlaveEntry entry : this.connectionManager.getEntrySet()) {
            RedisPubSubListener<Object> entryListener = new RedisPubSubListener<Object>(){

                @Override
                public void onMessage(CharSequence channel, Object msg) {
                    if (msg != null && channel.equals(ChannelName.TRACKING.toString())) {
                        listener.onChange((String)msg);
                    }
                }
            };
            int entryListenerId = System.identityHashCode(entryListener);
            Collection listeners = this.flushListeners.computeIfAbsent(listenerId, k -> new HashSet());
            listeners.add(entryListenerId);
            CompletableFuture<PubSubConnectionEntry> future = this.subscribe(PubSubType.SUBSCRIBE, StringCodec.INSTANCE, ChannelName.newList(ChannelName.TRACKING), entry, entry.getEntry(), entryListener);
            ffs.add(future);
        }
        return this.registerClientTrackingListener(commandExecutor, ffs, listenerId, null);
    }

    private CompletableFuture<Integer> registerClientTrackingListener(CommandAsyncExecutor commandExecutor, List<CompletableFuture<PubSubConnectionEntry>> ffs, int listenerId, String key) {
        CompletableFuture<Void> future = CompletableFuture.allOf(ffs.toArray(new CompletableFuture[0]));
        return future.thenCompose(r -> {
            List ees = ffs.stream().map(v -> (PubSubConnectionEntry)v.join()).filter(e -> !this.trackedEntries.contains(e)).collect(Collectors.toList());
            if (ees.isEmpty()) {
                return CompletableFuture.completedFuture(listenerId);
            }
            this.trackedEntries.addAll(ees);
            ArrayList futures = new ArrayList();
            for (PubSubConnectionEntry ee : ees) {
                RedisPubSubConnection c = ee.getConnection();
                RFuture idFuture = c.async(RedisCommands.CLIENT_ID, new Object[0]);
                CompletionStage f = idFuture.thenCompose(id -> {
                    if (key != null) {
                        return commandExecutor.readAsync(c.getRedisClient(), key, (Codec)StringCodec.INSTANCE, RedisCommands.CLIENT_TRACKING, "ON", "REDIRECT", id);
                    }
                    return commandExecutor.readAsync(c.getRedisClient(), (Codec)StringCodec.INSTANCE, RedisCommands.CLIENT_TRACKING, "ON", "REDIRECT", id);
                });
                futures.add(f.toCompletableFuture());
            }
            CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
            return f.thenApply(r2 -> listenerId);
        });
    }

    public CompletableFuture<Void> removeFlushListenerAsync(int listenerId) {
        Collection<Integer> ids = this.flushListeners.remove(listenerId);
        if (ids == null) {
            return CompletableFuture.completedFuture(null);
        }
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
        for (Integer id : ids) {
            CompletableFuture<Void> f = this.removeListenerAsync(PubSubType.UNSUBSCRIBE, ChannelName.newList(ChannelName.TRACKING), id);
            futures.add(f);
        }
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    }

    public CompletableFuture<Integer> subscribe(final String key, Codec codec, CommandAsyncExecutor commandExecutor, final TrackingListener listener) {
        MasterSlaveEntry entry = this.connectionManager.getEntry(key);
        RedisPubSubListener<Object> redisPubSubListener = new RedisPubSubListener<Object>(){

            @Override
            public void onMessage(CharSequence channel, Object msg) {
                if (channel.equals(ChannelName.TRACKING.toString()) && key.equals(msg)) {
                    listener.onChange((String)msg);
                }
            }
        };
        int listenerId = System.identityHashCode(redisPubSubListener);
        Collection entries = entry.getAllEntries();
        if (this.config.getReadMode() != ReadMode.MASTER_SLAVE) {
            entries = entry.getAllEntries().stream().filter(e -> !e.isFreezed()).collect(Collectors.toList());
        }
        ArrayList<CompletableFuture<PubSubConnectionEntry>> ffs = new ArrayList<CompletableFuture<PubSubConnectionEntry>>();
        for (ClientConnectionsEntry ee : entries) {
            CompletableFuture<PubSubConnectionEntry> future = this.subscribe(PubSubType.SUBSCRIBE, codec, ChannelName.newList(ChannelName.TRACKING), entry, ee, redisPubSubListener);
            ffs.add(future);
        }
        return this.registerClientTrackingListener(commandExecutor, ffs, listenerId, key);
    }

    private Tuple<AsyncSemaphore, Set<AsyncSemaphore>> acquire(List<ChannelName> channelNames) {
        AsyncSemaphore result = new AsyncSemaphore(0);
        HashSet<AsyncSemaphore> locks = new HashSet<AsyncSemaphore>();
        for (ChannelName channelName : channelNames) {
            AsyncSemaphore lock = this.getSemaphore(channelName);
            locks.add(lock);
        }
        HashSet<CompletableFuture<Void>> lockFutures = new HashSet<CompletableFuture<Void>>();
        for (AsyncSemaphore lock : locks) {
            CompletableFuture<Void> f = lock.acquire();
            lockFutures.add(f);
        }
        CompletableFuture<Void> completableFuture = CompletableFuture.allOf(lockFutures.toArray(new CompletableFuture[0]));
        completableFuture.thenAccept(r -> result.release());
        return new Tuple<AsyncSemaphore, Set<AsyncSemaphore>>(result, locks);
    }

    public CompletableFuture<List<PubSubConnectionEntry>> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?> ... listeners) {
        return this.subscribe(codec, ChannelName.newList(channelName), listeners);
    }

    public CompletableFuture<List<PubSubConnectionEntry>> subscribe(Codec codec, List<ChannelName> channelNames, RedisPubSubListener<?> ... listeners) {
        if (this.isMultiEntity(channelNames.get(0))) {
            Collection<MasterSlaveEntry> entrySet = this.connectionManager.getEntrySet();
            final AtomicInteger statusCounter = new AtomicInteger(entrySet.size());
            RedisPubSubListener[] ls = (RedisPubSubListener[])Arrays.stream(listeners).map(l -> {
                if (l instanceof PubSubStatusListener) {
                    return new PubSubStatusListener(((PubSubStatusListener)l).getListener(), ((PubSubStatusListener)l).getNames()){

                        @Override
                        public void onStatus(PubSubType type, CharSequence channel) {
                            if (statusCounter.get() == 0 || statusCounter.decrementAndGet() == 0) {
                                super.onStatus(type, channel);
                            }
                        }
                    };
                }
                return l;
            }).toArray(RedisPubSubListener[]::new);
            ArrayList<CompletableFuture<PubSubConnectionEntry>> futures = new ArrayList<CompletableFuture<PubSubConnectionEntry>>();
            for (MasterSlaveEntry entry : entrySet) {
                CompletableFuture<PubSubConnectionEntry> future = this.subscribe(PubSubType.SUBSCRIBE, codec, new ArrayList<ChannelName>(channelNames), entry, entry.getEntry(), ls);
                futures.add(future);
            }
            CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
            return future.thenApply(r -> futures.stream().map(v -> v.getNow(null)).collect(Collectors.toList()));
        }
        MasterSlaveEntry entry = this.getEntry(channelNames.get(0));
        if (entry == null) {
            int slot = this.connectionManager.calcSlot(channelNames.get(0).getName());
            return this.connectionManager.getServiceManager().createNodeNotFoundFuture(channelNames.get(0).toString(), slot);
        }
        CompletableFuture<PubSubConnectionEntry> f = this.subscribe(PubSubType.SUBSCRIBE, codec, new ArrayList<ChannelName>(channelNames), entry, null, listeners);
        return f.thenApply(res -> Collections.singletonList(res));
    }

    public CompletableFuture<PubSubConnectionEntry> ssubscribe(Codec codec, List<ChannelName> channelNames, RedisPubSubListener<?> ... listeners) {
        MasterSlaveEntry entry = this.getEntry(channelNames.get(0));
        if (entry == null) {
            int slot = this.connectionManager.calcSlot(channelNames.get(0).getName());
            return this.connectionManager.getServiceManager().createNodeNotFoundFuture(channelNames.get(0).toString(), slot);
        }
        return this.subscribe(PubSubType.SSUBSCRIBE, codec, new ArrayList<ChannelName>(channelNames), entry, null, listeners);
    }

    private CompletableFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, List<ChannelName> channelNames, MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, RedisPubSubListener<?> ... listeners) {
        CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<PubSubConnectionEntry>();
        Tuple<AsyncSemaphore, Set<AsyncSemaphore>> locks = this.acquire(channelNames);
        AsyncSemaphore lock = locks.getT1();
        int timeout = this.config.getSubscriptionTimeout();
        long start = System.nanoTime();
        Timeout lockTimeout = this.connectionManager.getServiceManager().newTimeout(t -> promise.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + timeout + "ms. Try to increase 'subscriptionTimeout', 'subscriptionsPerConnection', 'subscriptionConnectionPoolSize' parameters.")), timeout, TimeUnit.MILLISECONDS);
        lock.acquire().thenAccept(r -> {
            if (!lockTimeout.cancel() || promise.isDone()) {
                lock.release();
                return;
            }
            long newTimeout = (long)timeout - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
            this.subscribeNoTimeout(codec, channelNames, entry, clientEntry, promise, type, lock, new AtomicInteger(), listeners);
            this.timeout(promise, newTimeout);
        });
        lock.acquire().thenAccept(rr -> ((Set)locks.getT2()).forEach(l -> l.release()));
        return promise;
    }

    CompletableFuture<PubSubConnectionEntry> subscribeNoTimeout(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?> ... listeners) {
        MasterSlaveEntry entry = this.getEntry(new ChannelName(channelName));
        if (entry == null) {
            int slot = this.connectionManager.calcSlot(channelName);
            return this.connectionManager.getServiceManager().createNodeNotFoundFuture(channelName, slot);
        }
        PubSubType type = this.shardingSupported ? PubSubType.SSUBSCRIBE : PubSubType.SUBSCRIBE;
        CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<PubSubConnectionEntry>();
        this.subscribeNoTimeout(codec, ChannelName.newList(channelName), entry, null, promise, type, semaphore, new AtomicInteger(), listeners);
        return promise;
    }

    AsyncSemaphore getSemaphore(ChannelName channelName) {
        return this.locks[Math.abs(channelName.hashCode() % this.locks.length)];
    }

    void timeout(CompletableFuture<?> promise) {
        this.timeout(promise, this.config.getSubscriptionTimeout());
    }

    void timeout(CompletableFuture<?> promise, long timeout) {
        Timeout task = this.connectionManager.getServiceManager().newTimeout(t -> promise.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + timeout + "ms. Try to increase 'subscriptionTimeout', 'subscriptionsPerConnection', 'subscriptionConnectionPoolSize' parameters. ")), timeout, TimeUnit.MILLISECONDS);
        promise.whenComplete((r, e) -> task.cancel());
    }

    private void trySubscribe(Codec codec, List<ChannelName> channelNames, CompletableFuture<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener<?> ... listeners) {
        ChannelName channelName = channelNames.get(0);
        if (attempts.get() == this.config.getRetryAttempts()) {
            lock.release();
            MasterSlaveEntry entry = this.getEntry(channelName);
            if (entry == null) {
                RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
                promise.completeExceptionally(ex);
                return;
            }
            promise.completeExceptionally(new RedisTimeoutException("Unable to acquire connection for subscription after " + attempts.get() + " attempts. Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
            return;
        }
        attempts.incrementAndGet();
        MasterSlaveEntry entry = this.getEntry(channelName);
        if (entry == null) {
            this.connectionManager.getServiceManager().newTimeout(tt -> this.trySubscribe(codec, channelNames, promise, type, lock, attempts, listeners), this.config.getRetryInterval(), TimeUnit.MILLISECONDS);
            return;
        }
        this.subscribeNoTimeout(codec, channelNames, entry, null, promise, type, lock, attempts, listeners);
    }

    private void subscribeNoTimeout(Codec codec, List<ChannelName> channelNames, MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, CompletableFuture<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener<?> ... listeners) {
        CompletableFuture<Boolean> future = this.addListeners(channelNames, entry, clientEntry, type, listeners, null, () -> {}, promise, lock);
        future.thenAccept(r1 -> {
            if (r1.booleanValue()) {
                return;
            }
            this.freePubSubLock.acquire().thenAccept(c -> {
                if (promise.isDone()) {
                    lock.release();
                    this.freePubSubLock.release();
                    return;
                }
                PubSubEntry freePubSubConnections = this.entry2PubSubConnection.getOrDefault(entry, new PubSubEntry());
                PubSubConnectionEntry freeEntry = freePubSubConnections.getEntries().peek();
                if (freeEntry != null && clientEntry != null && !clientEntry.getClient().equals(freeEntry.getConnection().getRedisClient())) {
                    freeEntry = null;
                }
                if (freeEntry == null) {
                    this.freePubSubLock.release();
                    this.connect(codec, channelNames, entry, clientEntry, promise, type, lock, attempts, listeners);
                    return;
                }
                int remainFreeAmount = freeEntry.tryAcquire();
                if (remainFreeAmount == -1) {
                    throw new IllegalStateException();
                }
                PubSubConnectionEntry fe = freeEntry;
                CompletableFuture<Boolean> listenersFuture = this.addListeners(channelNames, entry, clientEntry, type, listeners, freeEntry, () -> {
                    fe.release();
                    this.freePubSubLock.release();
                }, promise, lock);
                listenersFuture.thenAccept(r2 -> {
                    if (r2.booleanValue()) {
                        return;
                    }
                    for (ChannelName channelName : channelNames) {
                        Collection coll = this.name2entry.computeIfAbsent(channelName, k -> Collections.newSetFromMap(new ConcurrentHashMap()));
                        coll.add(fe);
                    }
                    if (remainFreeAmount == 0) {
                        freePubSubConnections.getEntries().poll();
                    }
                    this.freePubSubLock.release();
                    fe.subscribe(codec, channelNames, promise, type, lock, listeners);
                });
            });
        });
    }

    private CompletableFuture<Boolean> addListeners(List<ChannelName> channelNames, MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, PubSubType type, RedisPubSubListener<?>[] listeners, PubSubConnectionEntry freeEntry, Runnable releaser, CompletableFuture<PubSubConnectionEntry> promise, AsyncSemaphore lock) {
        HashMap<CompletableFuture<Void>, Tuple<ChannelName, PubSubConnectionEntry>> releaseFutures = new HashMap<CompletableFuture<Void>, Tuple<ChannelName, PubSubConnectionEntry>>();
        AtomicReference<PubSubConnectionEntry> ref = new AtomicReference<PubSubConnectionEntry>();
        for (ChannelName channelName : channelNames.toArray(new ChannelName[0])) {
            Object key;
            PubSubConnectionEntry oldEntry = null;
            if (clientEntry != null) {
                key = new Tuple<ChannelName, ClientConnectionsEntry>(channelName, clientEntry);
                oldEntry = freeEntry == null ? this.key2connection.get(key) : this.key2connection.putIfAbsent((Tuple<ChannelName, ClientConnectionsEntry>)key, freeEntry);
                if (channelName.isTracking()) {
                    clientEntry.getTrackedConnectionsHolder().incUsage();
                }
            }
            key = new PubSubKey(channelName, entry);
            PubSubConnectionEntry oe = freeEntry == null ? (PubSubConnectionEntry)this.name2PubSubConnection.get(key) : this.name2PubSubConnection.putIfAbsent((PubSubKey)key, freeEntry);
            if (clientEntry == null) {
                oldEntry = oe;
            }
            if (oldEntry == null) continue;
            ref.compareAndSet(null, oldEntry);
            channelNames.remove(channelName);
            CompletableFuture<Void> f = oldEntry.addListeners(channelName, type, listeners);
            releaseFutures.put(f, new Tuple<ChannelName, PubSubConnectionEntry>(channelName, oldEntry));
        }
        CompletableFuture<Void> ff = CompletableFuture.allOf(releaseFutures.keySet().toArray(new CompletableFuture[0]));
        return ff.handle((r, ex) -> {
            if (ex != null) {
                releaser.run();
                promise.completeExceptionally((Throwable)ex);
                CompletableFuture[] fff = (CompletableFuture[])releaseFutures.values().stream().map(t -> ((PubSubConnectionEntry)t.getT2()).release(type, (ChannelName)t.getT1(), listeners)).toArray(CompletableFuture[]::new);
                CompletableFuture<Void> f1 = CompletableFuture.allOf(fff);
                f1.whenComplete((r1, e) -> lock.release());
                return true;
            }
            if (channelNames.isEmpty()) {
                releaser.run();
                if (!promise.complete((PubSubConnectionEntry)ref.get())) {
                    CompletableFuture[] fff = (CompletableFuture[])releaseFutures.values().stream().map(t -> ((PubSubConnectionEntry)t.getT2()).release(type, (ChannelName)t.getT1(), listeners)).toArray(CompletableFuture[]::new);
                    CompletableFuture<Void> f1 = CompletableFuture.allOf(fff);
                    f1.whenComplete((r1, e) -> lock.release());
                } else {
                    lock.release();
                }
                return true;
            }
            return false;
        });
    }

    private MasterSlaveEntry getEntry(ChannelName channelName) {
        int slot = this.connectionManager.calcSlot(channelName.getName());
        return this.connectionManager.getWriteEntry(slot);
    }

    private void connect(Codec codec, List<ChannelName> channelNames, MasterSlaveEntry msEntry, ClientConnectionsEntry clientEntry, CompletableFuture<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener<?> ... listeners) {
        CompletableFuture<RedisPubSubConnection> connFuture = msEntry.nextPubSubConnection(clientEntry);
        this.connectionManager.getServiceManager().newTimeout(t -> {
            if (!connFuture.cancel(false) && !connFuture.isCompletedExceptionally()) {
                return;
            }
            this.trySubscribe(codec, channelNames, promise, type, lock, attempts, listeners);
        }, this.config.getRetryInterval(), TimeUnit.MILLISECONDS);
        promise.whenComplete((res, e) -> {
            if (e != null) {
                connFuture.completeExceptionally((Throwable)e);
            }
        });
        connFuture.thenAccept(conn -> this.freePubSubLock.acquire().thenAccept(c -> {
            PubSubConnectionEntry entry = new PubSubConnectionEntry((RedisPubSubConnection)conn, this.connectionManager, msEntry);
            int remainFreeAmount = entry.tryAcquire();
            CompletableFuture<Boolean> listenerFuture = this.addListeners(channelNames, msEntry, clientEntry, type, listeners, entry, () -> {
                msEntry.returnPubSubConnection(entry.getConnection());
                this.freePubSubLock.release();
            }, promise, lock);
            listenerFuture.thenAccept(r -> {
                if (r.booleanValue()) {
                    return;
                }
                for (ChannelName channelName : channelNames) {
                    Collection coll = this.name2entry.computeIfAbsent(channelName, k -> Collections.newSetFromMap(new ConcurrentHashMap()));
                    coll.add(entry);
                }
                if (remainFreeAmount > 0) {
                    PubSubEntry psEntry = this.entry2PubSubConnection.computeIfAbsent(msEntry, e -> new PubSubEntry());
                    psEntry.getEntries().add(entry);
                }
                this.freePubSubLock.release();
                entry.subscribe(codec, channelNames, promise, type, lock, listeners);
            });
        }));
    }

    CompletableFuture<Void> unsubscribeLocked(ChannelName channelName) {
        Collection<PubSubConnectionEntry> coll = this.name2entry.get(channelName);
        if (coll == null || coll.isEmpty()) {
            RedisException ex = new RedisException("Channel: " + channelName + " is not registered");
            CompletableFuture<Void> promise = new CompletableFuture<Void>();
            promise.completeExceptionally(ex);
            return promise;
        }
        PubSubType topicType = PubSubType.UNSUBSCRIBE;
        if (this.shardingSupported) {
            topicType = PubSubType.SUNSUBSCRIBE;
        }
        return this.unsubscribeLocked(topicType, channelName, coll.iterator().next());
    }

    CompletableFuture<Void> unsubscribeLocked(final PubSubType topicType, final ChannelName channelName, final PubSubConnectionEntry ce) {
        this.remove(channelName, ce);
        if (this.connectionManager.getServiceManager().isShuttingDown()) {
            return CompletableFuture.completedFuture(null);
        }
        final CompletableFuture<Void> result = new CompletableFuture<Void>();
        BaseRedisPubSubListener listener = new BaseRedisPubSubListener(){

            @Override
            public void onStatus(PubSubType type, CharSequence channel) {
                if (type == topicType && channel.equals(channelName)) {
                    PublishSubscribeService.this.freePubSubLock.acquire().thenAccept(c -> {
                        try {
                            PublishSubscribeService.this.release(ce);
                        }
                        catch (Exception e) {
                            result.completeExceptionally(e);
                        }
                        finally {
                            PublishSubscribeService.this.freePubSubLock.release();
                        }
                        result.complete(null);
                    });
                }
            }
        };
        ce.unsubscribe(topicType, channelName, listener);
        return result;
    }

    private void remove(ChannelName channelName, PubSubConnectionEntry entry) {
        this.name2PubSubConnection.remove(new PubSubKey(channelName, entry.getEntry()));
        ClientConnectionsEntry e = entry.getEntry().getEntry(entry.getConnection().getRedisClient());
        if (e != null) {
            Tuple<ChannelName, ClientConnectionsEntry> key = new Tuple<ChannelName, ClientConnectionsEntry>(channelName, e);
            this.key2connection.remove(key);
            if (e.getTrackedConnectionsHolder().decUsage() == 0) {
                e.getTrackedConnectionsHolder().reset();
                this.trackedEntries.remove(entry);
            }
        }
        this.name2entry.computeIfPresent(channelName, (name, entries) -> {
            entries.remove(entry);
            if (entries.isEmpty()) {
                return null;
            }
            return entries;
        });
    }

    private void release(PubSubConnectionEntry entry) {
        entry.release();
        if (entry.isFree()) {
            PubSubEntry ee = (PubSubEntry)this.entry2PubSubConnection.get(entry.getEntry());
            if (ee != null) {
                ee.getEntries().remove(entry);
            }
            entry.getEntry().returnPubSubConnection(entry.getConnection());
            return;
        }
        PubSubEntry ee = this.entry2PubSubConnection.computeIfAbsent(entry.getEntry(), e -> new PubSubEntry());
        if (entry.getConnection().isClosed()) {
            ee.getEntries().remove(entry);
        } else if (!ee.getEntries().contains(entry)) {
            ee.getEntries().add(entry);
        }
    }

    public void remove(MasterSlaveEntry entry) {
        this.entry2PubSubConnection.remove(entry);
        this.name2entry.values().removeIf(v -> {
            v.removeIf(e -> e.getEntry().equals(entry));
            return v.isEmpty();
        });
    }

    public CompletableFuture<Codec> unsubscribe(ChannelName channelName, PubSubType topicType) {
        Collection<PubSubConnectionEntry> coll = this.name2entry.get(channelName);
        if (coll == null || coll.isEmpty()) {
            RedisException ex = new RedisException("Channel: " + channelName + " is not registered");
            CompletableFuture<Codec> promise = new CompletableFuture<Codec>();
            promise.completeExceptionally(ex);
            return promise;
        }
        return this.unsubscribe(channelName, coll.iterator().next(), topicType);
    }

    CompletableFuture<Codec> unsubscribe(ChannelName channelName, PubSubConnectionEntry entry, PubSubType topicType) {
        if (this.connectionManager.getServiceManager().isShuttingDown()) {
            return CompletableFuture.completedFuture(null);
        }
        AsyncSemaphore lock = this.getSemaphore(channelName);
        CompletableFuture<Void> f = lock.acquire();
        return f.thenCompose(v -> {
            Codec entryCodec = topicType == PubSubType.PUNSUBSCRIBE ? entry.getConnection().getPatternChannels().get(channelName) : (topicType == PubSubType.SUNSUBSCRIBE ? entry.getConnection().getShardedChannels().get(channelName) : entry.getConnection().getChannels().get(channelName));
            CompletableFuture<Void> result = this.unsubscribeLocked(topicType, channelName, entry);
            return ((CompletableFuture)result.whenComplete((r, e) -> lock.release())).thenApply(r -> entryCodec);
        });
    }

    public void reattachPubSub(int slot) {
        this.name2PubSubConnection.entrySet().stream().filter(e -> this.connectionManager.calcSlot(((PubSubKey)e.getKey()).getChannelName().getName()) == slot).forEach(entry -> {
            Codec patternCodec;
            Codec scodec;
            PubSubConnectionEntry pubSubEntry = (PubSubConnectionEntry)entry.getValue();
            Codec codec = pubSubEntry.getConnection().getChannels().get(((PubSubKey)entry.getKey()).getChannelName());
            if (codec != null) {
                Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(((PubSubKey)entry.getKey()).getChannelName());
                this.unsubscribe(((PubSubKey)entry.getKey()).getChannelName(), pubSubEntry, PubSubType.UNSUBSCRIBE);
                this.subscribe(codec, ChannelName.newList(((PubSubKey)entry.getKey()).getChannelName()), listeners.toArray(new RedisPubSubListener[0]));
            }
            if ((scodec = pubSubEntry.getConnection().getShardedChannels().get(((PubSubKey)entry.getKey()).getChannelName())) != null) {
                Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(((PubSubKey)entry.getKey()).getChannelName());
                this.unsubscribe(((PubSubKey)entry.getKey()).getChannelName(), pubSubEntry, PubSubType.SUNSUBSCRIBE);
                this.ssubscribe(codec, ChannelName.newList(((PubSubKey)entry.getKey()).getChannelName()), listeners.toArray(new RedisPubSubListener[0]));
            }
            if ((patternCodec = pubSubEntry.getConnection().getPatternChannels().get(((PubSubKey)entry.getKey()).getChannelName())) != null) {
                Queue<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(((PubSubKey)entry.getKey()).getChannelName());
                this.unsubscribe(((PubSubKey)entry.getKey()).getChannelName(), pubSubEntry, PubSubType.PUNSUBSCRIBE);
                this.psubscribe(((PubSubKey)entry.getKey()).getChannelName(), patternCodec, listeners.toArray(new RedisPubSubListener[0]));
            }
        });
    }

    public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
        MasterSlaveEntry en = this.connectionManager.getEntry(redisPubSubConnection.getRedisClient());
        if (en == null) {
            return;
        }
        this.reattachPubSubListeners(redisPubSubConnection.getChannels().keySet(), en, PubSubType.UNSUBSCRIBE);
        this.reattachPubSubListeners(redisPubSubConnection.getShardedChannels().keySet(), en, PubSubType.SUNSUBSCRIBE);
        this.reattachPubSubListeners(redisPubSubConnection.getPatternChannels().keySet(), en, PubSubType.PUNSUBSCRIBE);
    }

    private void reattachPubSubListeners(Set<ChannelName> channels, MasterSlaveEntry en, PubSubType topicType) {
        for (ChannelName channelName : channels) {
            PubSubConnectionEntry entry = (PubSubConnectionEntry)this.name2PubSubConnection.get(new PubSubKey(channelName, en));
            if (entry == null) continue;
            Queue<RedisPubSubListener<?>> listeners = entry.getListeners(channelName);
            CompletableFuture<Codec> subscribeCodecFuture = this.unsubscribe(channelName, entry, topicType);
            if (listeners.isEmpty()) continue;
            subscribeCodecFuture.whenComplete((subscribeCodec, e) -> {
                if (e != null) {
                    log.error(e.getMessage(), (Throwable)e);
                    return;
                }
                if (subscribeCodec == null) {
                    return;
                }
                if (topicType == PubSubType.PUNSUBSCRIBE) {
                    this.psubscribe(en, channelName, (Collection<RedisPubSubListener<?>>)listeners, (Codec)subscribeCodec);
                } else if (topicType == PubSubType.SUNSUBSCRIBE) {
                    this.ssubscribe(channelName, (Collection<RedisPubSubListener<?>>)listeners, (Codec)subscribeCodec);
                } else {
                    this.subscribe(channelName, (Collection<RedisPubSubListener<?>>)listeners, (Codec)subscribeCodec);
                }
            });
        }
    }

    private void subscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> listeners, Codec subscribeCodec) {
        if (this.connectionManager.getServiceManager().isShuttingDown()) {
            log.warn("listeners of '{}' channel haven't been resubscribed due to Redisson shutdown process", (Object)channelName);
            return;
        }
        MasterSlaveEntry entry = this.getEntry(channelName);
        if (this.isMultiEntity(channelName)) {
            entry = this.connectionManager.getEntrySet().stream().filter(e -> !this.name2PubSubConnection.containsKey(new PubSubKey(channelName, (MasterSlaveEntry)e))).findFirst().orElse(null);
        }
        CompletableFuture<PubSubConnectionEntry> subscribeFuture = entry != null ? this.subscribe(PubSubType.SUBSCRIBE, subscribeCodec, ChannelName.newList(channelName), entry, null, listeners.toArray(new RedisPubSubListener[0])) : this.subscribe(subscribeCodec, ChannelName.newList(channelName), listeners.toArray(new RedisPubSubListener[0])).thenApply(r -> (PubSubConnectionEntry)r.iterator().next());
        subscribeFuture.whenComplete((res, e) -> {
            if (e != null) {
                this.connectionManager.getServiceManager().newTimeout(task -> this.subscribe(channelName, listeners, subscribeCodec), 1L, TimeUnit.SECONDS);
                return;
            }
            log.info("listeners of '{}' channel have been resubscribed to '{}'", (Object)channelName, res);
        });
    }

    private void ssubscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> listeners, Codec subscribeCodec) {
        if (this.connectionManager.getServiceManager().isShuttingDown()) {
            log.warn("listeners of '{}' channel haven't been resubscribed due to Redisson shutdown process", (Object)channelName);
            return;
        }
        CompletableFuture<PubSubConnectionEntry> subscribeFuture = this.ssubscribe(subscribeCodec, ChannelName.newList(channelName), listeners.toArray(new RedisPubSubListener[0]));
        subscribeFuture.whenComplete((res, e) -> {
            if (e != null) {
                this.connectionManager.getServiceManager().newTimeout(task -> this.ssubscribe(channelName, listeners, subscribeCodec), 1L, TimeUnit.SECONDS);
                return;
            }
            log.info("listeners of '{}' sharded-channel have been resubscribed to '{}'", (Object)channelName, res);
        });
    }

    private void psubscribe(MasterSlaveEntry oldEntry, ChannelName channelName, Collection<RedisPubSubListener<?>> listeners, Codec subscribeCodec) {
        if (this.connectionManager.getServiceManager().isShuttingDown()) {
            log.warn("listeners of '{}' channel-pattern haven't been resubscribed due to Redisson shutdown process", (Object)channelName);
            return;
        }
        MasterSlaveEntry entry = this.getEntry(channelName);
        if (this.isMultiEntity(channelName)) {
            entry = this.connectionManager.getEntrySet().stream().filter(e -> !this.name2PubSubConnection.containsKey(new PubSubKey(channelName, (MasterSlaveEntry)e)) && e != oldEntry).findFirst().orElse(null);
        }
        if (entry == null) {
            this.connectionManager.getServiceManager().newTimeout(task -> this.psubscribe(oldEntry, channelName, listeners, subscribeCodec), 1L, TimeUnit.SECONDS);
            return;
        }
        CompletableFuture<PubSubConnectionEntry> subscribeFuture = this.subscribe(PubSubType.PSUBSCRIBE, subscribeCodec, ChannelName.newList(channelName), entry, null, listeners.toArray(new RedisPubSubListener[0]));
        subscribeFuture.whenComplete((res, e) -> {
            if (e != null) {
                this.connectionManager.getServiceManager().newTimeout(task -> this.psubscribe(oldEntry, channelName, listeners, subscribeCodec), 1L, TimeUnit.SECONDS);
                return;
            }
            log.info("listeners of '{}' channel-pattern have been resubscribed to '{}'", (Object)channelName, res);
        });
    }

    public CompletableFuture<Void> removeListenerAsync(PubSubType type, List<ChannelName> channelNames, EventListener listener) {
        return this.removeListenerAsync(type, channelNames, (ChannelName channelName, PubSubConnectionEntry entry) -> entry.removeListener((ChannelName)channelName, listener));
    }

    public CompletableFuture<Void> removeListenerAsync(PubSubType type, List<ChannelName> channelNames, Integer ... listenerIds) {
        return this.removeListenerAsync(type, channelNames, (ChannelName channelName, PubSubConnectionEntry entry) -> {
            Integer[] integerArray = listenerIds;
            int n = integerArray.length;
            for (int i = 0; i < n; ++i) {
                int id = integerArray[i];
                entry.removeListener((ChannelName)channelName, id);
            }
        });
    }

    private CompletableFuture<Void> removeListenerAsync(PubSubType type, List<ChannelName> names, BiConsumer<ChannelName, PubSubConnectionEntry> consumer) {
        if (this.connectionManager.getServiceManager().isShuttingDown()) {
            return CompletableFuture.completedFuture(null);
        }
        List<ChannelName> channelNames = names.stream().filter(cn -> this.name2entry.containsKey(cn)).collect(Collectors.toList());
        Tuple<AsyncSemaphore, Set<AsyncSemaphore>> locks = this.acquire(channelNames);
        AsyncSemaphore semaphore = locks.getT1();
        CompletableFuture<Void> sf = semaphore.acquire();
        int timeout = this.config.getSubscriptionTimeout();
        Exception stackTrace = new Exception("Stack trace");
        Timeout r = this.connectionManager.getServiceManager().newTimeout(t -> {
            RedisTimeoutException ee = new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + channelNames + " topic");
            ee.addSuppressed(stackTrace);
            sf.completeExceptionally(ee);
        }, timeout, TimeUnit.MILLISECONDS);
        CompletionStage result = sf.thenCompose(res -> {
            r.cancel();
            if (this.connectionManager.getServiceManager().isShuttingDown()) {
                semaphore.release();
                return CompletableFuture.completedFuture(null);
            }
            Map<ChannelName, Collection> name2entries = channelNames.stream().filter(cn -> {
                Collection<PubSubConnectionEntry> entries = this.name2entry.get(cn);
                return entries != null && !entries.isEmpty();
            }).collect(Collectors.toMap(cn -> cn, cn -> this.name2entry.get(cn)));
            if (name2entries.isEmpty()) {
                semaphore.release();
                return CompletableFuture.completedFuture(null);
            }
            ArrayList futures = new ArrayList();
            name2entries.forEach((channelName, entries) -> {
                for (PubSubConnectionEntry entry : entries) {
                    consumer.accept((ChannelName)channelName, entry);
                    CompletionStage<Object> f = !entry.hasListeners((ChannelName)channelName) ? this.unsubscribeLocked(type, (ChannelName)channelName, entry).exceptionally(ex -> null) : CompletableFuture.completedFuture(null);
                    futures.add(f);
                }
            });
            CompletableFuture<Void> ff = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
            return ff.whenComplete((v, e) -> semaphore.release());
        });
        semaphore.acquire().thenAccept(rr -> ((Set)locks.getT2()).forEach(l -> l.release()));
        return result;
    }

    public CompletableFuture<Void> removeAllListenersAsync(PubSubType type, ChannelName ... channelNames) {
        ArrayList<CompletionStage> fs = new ArrayList<CompletionStage>();
        for (ChannelName channelName : channelNames) {
            if (!this.name2entry.containsKey(channelName)) continue;
            AsyncSemaphore semaphore = this.getSemaphore(channelName);
            CompletableFuture<Void> sf = semaphore.acquire();
            int timeout = this.config.getSubscriptionTimeout();
            this.connectionManager.getServiceManager().newTimeout(t -> sf.completeExceptionally(new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + channelName + " topic")), timeout, TimeUnit.MILLISECONDS);
            CompletionStage f = sf.thenCompose(r -> {
                Collection entries = this.name2entry.getOrDefault(channelName, Collections.emptySet());
                if (entries.isEmpty()) {
                    semaphore.release();
                    return CompletableFuture.completedFuture(null);
                }
                ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
                for (PubSubConnectionEntry entry : entries) {
                    if (!entry.hasListeners(channelName)) continue;
                    CompletableFuture<Void> ff = this.unsubscribeLocked(type, channelName, entry);
                    futures.add(ff);
                }
                if (!futures.isEmpty()) {
                    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((res, e) -> semaphore.release());
                }
                semaphore.release();
                return CompletableFuture.completedFuture(null);
            });
            fs.add(f);
        }
        return CompletableFuture.allOf(fs.toArray(new CompletableFuture[0]));
    }

    public void checkPatternSupport(RedisConnection connection) {
        try {
            connection.sync(RedisCommands.PUBSUB_NUMPAT, new Object[0]);
        }
        catch (Exception e) {
            this.setPatternSupported(false);
        }
    }

    public void checkShardingSupport(ShardedSubscriptionMode mode, RedisConnection connection) {
        if (mode == ShardedSubscriptionMode.AUTO) {
            try {
                connection.sync(RedisCommands.PUBSUB_SHARDNUMSUB, new Object[0]);
                this.setShardingSupported(true);
            }
            catch (Exception exception) {}
        } else if (mode == ShardedSubscriptionMode.ON) {
            this.setShardingSupported(true);
        }
    }

    public boolean isPatternSupported() {
        return this.patternSupported;
    }

    public void setPatternSupported(boolean patternSupported) {
        this.patternSupported = patternSupported;
    }

    public void setShardingSupported(boolean shardingSupported) {
        this.shardingSupported = shardingSupported;
    }

    public boolean isShardingSupported() {
        return this.shardingSupported;
    }

    public String getPublishCommand() {
        if (this.shardingSupported) {
            return RedisCommands.SPUBLISH.getName();
        }
        return RedisCommands.PUBLISH.getName();
    }

    public String toString() {
        return "PublishSubscribeService [name2PubSubConnection=" + this.name2PubSubConnection + ", entry2PubSubConnection=" + this.entry2PubSubConnection + "]";
    }

    public static class PubSubKey {
        private final ChannelName channelName;
        private final MasterSlaveEntry entry;

        public PubSubKey(ChannelName channelName, MasterSlaveEntry entry) {
            this.channelName = channelName;
            this.entry = entry;
        }

        public ChannelName getChannelName() {
            return this.channelName;
        }

        public MasterSlaveEntry getEntry() {
            return this.entry;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            PubSubKey key = (PubSubKey)o;
            return Objects.equals(this.channelName, key.channelName) && Objects.equals(this.entry, key.entry);
        }

        public int hashCode() {
            return Objects.hash(this.channelName, this.entry);
        }

        public String toString() {
            return "PubSubKey{channelName=" + this.channelName + ", entry=" + this.entry + '}';
        }
    }

    public static class PubSubEntry {
        Queue<PubSubConnectionEntry> entries = new ConcurrentLinkedQueue<PubSubConnectionEntry>();

        public Queue<PubSubConnectionEntry> getEntries() {
            return this.entries;
        }
    }
}

