package org.redisson.pubsub;

import io.netty.util.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EventListener;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import jodd.util.StringPool;
import org.redisson.PubSubPatternStatusListener;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/redisson/pubsub/PublishSubscribeService.class */
public class PublishSubscribeService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PublishSubscribeService.class);
    private final ConnectionManager connectionManager;
    private final MasterSlaveServersConfig config;
    private final AsyncSemaphore[] locks = new AsyncSemaphore[50];
    private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
    private final ConcurrentMap<PubSubKey, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap();
    private final ConcurrentMap<MasterSlaveEntry, PubSubEntry> entry2PubSubConnection = new ConcurrentHashMap();
    private final Queue<PubSubConnectionEntry> emptyQueue = new LinkedList();
    private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
    private final CountDownLatchPubSub countDownLatchPubSub = new CountDownLatchPubSub(this);
    private final LockPubSub lockPubSub = new LockPubSub(this);

    /* loaded from: input_file:org/redisson/pubsub/PublishSubscribeService$PubSubEntry.class */
    public static class PubSubEntry {
        Set<PubSubKey> keys = Collections.newSetFromMap(new ConcurrentHashMap());
        Queue<PubSubConnectionEntry> entries = new ConcurrentLinkedQueue();

        public Set<PubSubKey> getKeys() {
            return this.keys;
        }

        public Queue<PubSubConnectionEntry> getEntries() {
            return this.entries;
        }
    }

    /* loaded from: input_file:org/redisson/pubsub/PublishSubscribeService$PubSubKey.class */
    public static class PubSubKey {
        private final ChannelName channelName;
        private final MasterSlaveEntry entry;

        public PubSubKey(ChannelName channelName, MasterSlaveEntry masterSlaveEntry) {
            this.channelName = channelName;
            this.entry = masterSlaveEntry;
        }

        public ChannelName getChannelName() {
            return this.channelName;
        }

        public MasterSlaveEntry getEntry() {
            return this.entry;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            PubSubKey pubSubKey = (PubSubKey) obj;
            return Objects.equals(this.channelName, pubSubKey.channelName) && Objects.equals(this.entry, pubSubKey.entry);
        }

        public int hashCode() {
            return Objects.hash(this.channelName, this.entry);
        }
    }

    public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig masterSlaveServersConfig) {
        this.connectionManager = connectionManager;
        this.config = masterSlaveServersConfig;
        for (int i = 0; i < this.locks.length; i++) {
            this.locks[i] = new AsyncSemaphore(1);
        }
    }

    public ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    public LockPubSub getLockPubSub() {
        return this.lockPubSub;
    }

    public CountDownLatchPubSub getCountDownLatchPubSub() {
        return this.countDownLatchPubSub;
    }

    public SemaphorePubSub getSemaphorePubSub() {
        return this.semaphorePubSub;
    }

    public PubSubConnectionEntry getPubSubEntry(ChannelName channelName) {
        return this.name2PubSubConnection.get(createKey(channelName));
    }

    public CompletableFuture<Collection<PubSubConnectionEntry>> psubscribe(ChannelName channelName, Codec codec, RedisPubSubListener<?>... redisPubSubListenerArr) {
        if (!isMultiEntity(channelName)) {
            return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, getEntry(channelName), redisPubSubListenerArr).thenApply(pubSubConnectionEntry -> {
                return Collections.singletonList(pubSubConnectionEntry);
            });
        }
        Collection<MasterSlaveEntry> entrySet = this.connectionManager.getEntrySet();
        AtomicInteger atomicInteger = new AtomicInteger(entrySet.size());
        RedisPubSubListener<?>[] redisPubSubListenerArr2 = (RedisPubSubListener[]) Arrays.stream(redisPubSubListenerArr).map(redisPubSubListener -> {
            return redisPubSubListener instanceof PubSubPatternStatusListener ? new PubSubPatternStatusListener((PubSubPatternStatusListener) redisPubSubListener) { // from class: org.redisson.pubsub.PublishSubscribeService.1
                @Override // org.redisson.PubSubPatternStatusListener, org.redisson.client.RedisPubSubListener
                public boolean onStatus(PubSubType pubSubType, CharSequence charSequence) {
                    if (atomicInteger.decrementAndGet() == 0) {
                        return super.onStatus(pubSubType, charSequence);
                    }
                    return true;
                }
            } : redisPubSubListener;
        }).toArray(i -> {
            return new RedisPubSubListener[i];
        });
        ArrayList arrayList = new ArrayList();
        Iterator<MasterSlaveEntry> it = entrySet.iterator();
        while (it.hasNext()) {
            arrayList.add(subscribe(PubSubType.PSUBSCRIBE, codec, channelName, it.next(), redisPubSubListenerArr2));
        }
        return CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).thenApply(r4 -> {
            return (Collection) arrayList.stream().map(completableFuture -> {
                return (PubSubConnectionEntry) completableFuture.getNow(null);
            }).collect(Collectors.toList());
        });
    }

    private boolean isMultiEntity(ChannelName channelName) {
        return this.connectionManager.isClusterMode() && (channelName.toString().startsWith("__keyspace@") || channelName.toString().startsWith("__keyevent@"));
    }

    public CompletableFuture<PubSubConnectionEntry> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... redisPubSubListenerArr) {
        return subscribe(PubSubType.SUBSCRIBE, codec, channelName, getEntry(channelName), redisPubSubListenerArr);
    }

    public CompletableFuture<PubSubConnectionEntry> ssubscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... redisPubSubListenerArr) {
        return subscribe(PubSubType.SSUBSCRIBE, codec, channelName, getEntry(channelName), redisPubSubListenerArr);
    }

    private CompletableFuture<PubSubConnectionEntry> subscribe(PubSubType pubSubType, Codec codec, ChannelName channelName, MasterSlaveEntry masterSlaveEntry, RedisPubSubListener<?>... redisPubSubListenerArr) {
        CompletableFuture<PubSubConnectionEntry> completableFuture = new CompletableFuture<>();
        AsyncSemaphore semaphore = getSemaphore(channelName);
        int timeout = this.config.getTimeout() + (this.config.getRetryInterval() * this.config.getRetryAttempts());
        Timeout newTimeout = this.connectionManager.newTimeout(timeout2 -> {
            completableFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + timeout + "ms. Try to increase 'timeout', 'subscriptionsPerConnection', 'subscriptionConnectionPoolSize' parameters."));
        }, timeout, TimeUnit.MILLISECONDS);
        semaphore.acquire(() -> {
            if (!newTimeout.cancel() || completableFuture.isDone()) {
                semaphore.release();
            } else {
                subscribe(codec, channelName, masterSlaveEntry, completableFuture, pubSubType, semaphore, new AtomicInteger(), redisPubSubListenerArr);
            }
        });
        return completableFuture;
    }

    public CompletableFuture<PubSubConnectionEntry> subscribeNoTimeout(Codec codec, String str, AsyncSemaphore asyncSemaphore, RedisPubSubListener<?>... redisPubSubListenerArr) {
        CompletableFuture<PubSubConnectionEntry> completableFuture = new CompletableFuture<>();
        subscribeNoTimeout(codec, new ChannelName(str), getEntry(new ChannelName(str)), completableFuture, PubSubType.SUBSCRIBE, asyncSemaphore, new AtomicInteger(), redisPubSubListenerArr);
        return completableFuture;
    }

    public AsyncSemaphore getSemaphore(ChannelName channelName) {
        return this.locks[Math.abs(channelName.hashCode() % this.locks.length)];
    }

    private PubSubKey createKey(ChannelName channelName) {
        return new PubSubKey(channelName, getEntry(channelName));
    }

    private void subscribe(Codec codec, ChannelName channelName, MasterSlaveEntry masterSlaveEntry, CompletableFuture<PubSubConnectionEntry> completableFuture, PubSubType pubSubType, AsyncSemaphore asyncSemaphore, AtomicInteger atomicInteger, RedisPubSubListener<?>... redisPubSubListenerArr) {
        subscribeNoTimeout(codec, channelName, masterSlaveEntry, completableFuture, pubSubType, asyncSemaphore, atomicInteger, redisPubSubListenerArr);
        timeout(completableFuture);
    }

    public void timeout(CompletableFuture<?> completableFuture) {
        timeout(completableFuture, this.config.getTimeout() + (this.config.getRetryInterval() * this.config.getRetryAttempts()));
    }

    public void timeout(CompletableFuture<?> completableFuture, long j) {
        Timeout newTimeout = this.connectionManager.newTimeout(timeout -> {
            completableFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + j + "ms. Try to increase 'timeout', 'subscriptionsPerConnection', 'subscriptionConnectionPoolSize' parameters."));
        }, j, TimeUnit.MILLISECONDS);
        completableFuture.whenComplete((obj, th) -> {
            newTimeout.cancel();
        });
    }

    private void subscribeNoTimeout(Codec codec, ChannelName channelName, MasterSlaveEntry masterSlaveEntry, CompletableFuture<PubSubConnectionEntry> completableFuture, PubSubType pubSubType, AsyncSemaphore asyncSemaphore, AtomicInteger atomicInteger, RedisPubSubListener<?>... redisPubSubListenerArr) {
        PubSubConnectionEntry pubSubConnectionEntry = this.name2PubSubConnection.get(new PubSubKey(channelName, masterSlaveEntry));
        if (pubSubConnectionEntry != null) {
            addListeners(channelName, completableFuture, pubSubType, asyncSemaphore, pubSubConnectionEntry, redisPubSubListenerArr);
        } else {
            this.freePubSubLock.acquire(() -> {
                if (completableFuture.isDone()) {
                    asyncSemaphore.release();
                    this.freePubSubLock.release();
                    return;
                }
                MasterSlaveEntry masterSlaveEntry2 = (MasterSlaveEntry) Optional.ofNullable(this.connectionManager.getEntry(masterSlaveEntry.getClient())).orElse(masterSlaveEntry);
                PubSubEntry orDefault = this.entry2PubSubConnection.getOrDefault(masterSlaveEntry2, new PubSubEntry());
                PubSubConnectionEntry peek = orDefault.getEntries().peek();
                if (peek == null) {
                    this.freePubSubLock.release();
                    CompletableFuture<RedisPubSubConnection> connect = connect(codec, channelName, masterSlaveEntry2, completableFuture, pubSubType, asyncSemaphore, redisPubSubListenerArr);
                    this.connectionManager.newTimeout(timeout -> {
                        if (atomicInteger.get() == this.config.getRetryAttempts()) {
                            connect.completeExceptionally(new RedisTimeoutException("Unable to acquire connection for subscription after " + atomicInteger.get() + " attempts. Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
                        } else if (connect.cancel(true)) {
                            subscribe(codec, channelName, masterSlaveEntry, completableFuture, pubSubType, asyncSemaphore, atomicInteger, redisPubSubListenerArr);
                            atomicInteger.incrementAndGet();
                        }
                    }, this.config.getRetryInterval(), TimeUnit.MILLISECONDS);
                    return;
                }
                int tryAcquire = peek.tryAcquire();
                if (tryAcquire == -1) {
                    throw new IllegalStateException();
                }
                PubSubConnectionEntry putIfAbsent = this.name2PubSubConnection.putIfAbsent(new PubSubKey(channelName, masterSlaveEntry2), peek);
                if (putIfAbsent != null) {
                    peek.release();
                    this.freePubSubLock.release();
                    addListeners(channelName, completableFuture, pubSubType, asyncSemaphore, putIfAbsent, redisPubSubListenerArr);
                } else {
                    if (tryAcquire == 0) {
                        orDefault.getEntries().poll();
                    }
                    this.freePubSubLock.release();
                    CompletableFuture<Void> addListeners = addListeners(channelName, completableFuture, pubSubType, asyncSemaphore, peek, redisPubSubListenerArr);
                    peek.subscribe(codec, pubSubType, channelName, addListeners);
                    addListeners.whenComplete((r7, th) -> {
                        if (th != null) {
                            unsubscribe(channelName, pubSubType);
                        }
                    });
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MasterSlaveEntry getEntry(ChannelName channelName) {
        return this.connectionManager.getEntry(this.connectionManager.calcSlot(channelName.getName()));
    }

    private CompletableFuture<Void> addListeners(ChannelName channelName, CompletableFuture<PubSubConnectionEntry> completableFuture, PubSubType pubSubType, AsyncSemaphore asyncSemaphore, PubSubConnectionEntry pubSubConnectionEntry, RedisPubSubListener<?>... redisPubSubListenerArr) {
        for (RedisPubSubListener<?> redisPubSubListener : redisPubSubListenerArr) {
            pubSubConnectionEntry.addListener(channelName, redisPubSubListener);
        }
        CompletableFuture<Void> successFuture = pubSubConnectionEntry.getSubscribeFuture(channelName, pubSubType).getSuccessFuture();
        successFuture.whenComplete((r11, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                asyncSemaphore.release();
                return;
            }
            if (completableFuture.complete(pubSubConnectionEntry)) {
                asyncSemaphore.release();
                return;
            }
            for (RedisPubSubListener redisPubSubListener2 : redisPubSubListenerArr) {
                pubSubConnectionEntry.removeListener(channelName, (RedisPubSubListener<?>) redisPubSubListener2);
            }
            if (pubSubConnectionEntry.hasListeners(channelName)) {
                asyncSemaphore.release();
            } else {
                unsubscribe(pubSubType, channelName).whenComplete((r3, th) -> {
                    asyncSemaphore.release();
                });
            }
        });
        return successFuture;
    }

    private CompletableFuture<RedisPubSubConnection> nextPubSubConnection(MasterSlaveEntry masterSlaveEntry, ChannelName channelName) {
        if (masterSlaveEntry != null) {
            return masterSlaveEntry.nextPubSubConnection();
        }
        RedisNodeNotFoundException redisNodeNotFoundException = new RedisNodeNotFoundException("Node for slot: " + this.connectionManager.calcSlot(channelName.getName()) + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
        CompletableFuture<RedisPubSubConnection> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(redisNodeNotFoundException);
        return completableFuture;
    }

    private CompletableFuture<RedisPubSubConnection> connect(Codec codec, ChannelName channelName, MasterSlaveEntry masterSlaveEntry, CompletableFuture<PubSubConnectionEntry> completableFuture, PubSubType pubSubType, AsyncSemaphore asyncSemaphore, RedisPubSubListener<?>... redisPubSubListenerArr) {
        CompletableFuture<RedisPubSubConnection> nextPubSubConnection = nextPubSubConnection(masterSlaveEntry, channelName);
        completableFuture.whenComplete((pubSubConnectionEntry, th) -> {
            if (th != null) {
                nextPubSubConnection.completeExceptionally(th);
            }
        });
        nextPubSubConnection.whenComplete((redisPubSubConnection, th2) -> {
            if (th2 == null) {
                this.freePubSubLock.acquire(() -> {
                    PubSubConnectionEntry pubSubConnectionEntry2 = new PubSubConnectionEntry(redisPubSubConnection, this.connectionManager);
                    int tryAcquire = pubSubConnectionEntry2.tryAcquire();
                    PubSubConnectionEntry putIfAbsent = this.name2PubSubConnection.putIfAbsent(new PubSubKey(channelName, masterSlaveEntry), pubSubConnectionEntry2);
                    if (putIfAbsent != null) {
                        masterSlaveEntry.returnPubSubConnection(redisPubSubConnection);
                        this.freePubSubLock.release();
                        addListeners(channelName, completableFuture, pubSubType, asyncSemaphore, putIfAbsent, redisPubSubListenerArr);
                    } else {
                        if (tryAcquire > 0) {
                            addFreeConnectionEntry(channelName, pubSubConnectionEntry2);
                        }
                        this.freePubSubLock.release();
                        CompletableFuture<Void> addListeners = addListeners(channelName, completableFuture, pubSubType, asyncSemaphore, pubSubConnectionEntry2, redisPubSubListenerArr);
                        pubSubConnectionEntry2.subscribe(codec, pubSubType, channelName, addListeners);
                        addListeners.whenComplete((r7, th2) -> {
                            if (th2 != null) {
                                unsubscribe(channelName, pubSubType);
                            }
                        });
                    }
                });
                return;
            }
            asyncSemaphore.release();
            if (nextPubSubConnection.isCancelled()) {
                return;
            }
            completableFuture.completeExceptionally(th2);
        });
        return nextPubSubConnection;
    }

    public CompletableFuture<Void> unsubscribe(final PubSubType pubSubType, final ChannelName channelName) {
        final PubSubConnectionEntry remove = this.name2PubSubConnection.remove(createKey(channelName));
        if (remove == null || this.connectionManager.isShuttingDown()) {
            return CompletableFuture.completedFuture(null);
        }
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        remove.unsubscribe(pubSubType, channelName, new BaseRedisPubSubListener() { // from class: org.redisson.pubsub.PublishSubscribeService.2
            @Override // org.redisson.client.BaseRedisPubSubListener, org.redisson.client.RedisPubSubListener
            public boolean onStatus(PubSubType pubSubType2, CharSequence charSequence) {
                if (pubSubType2 != pubSubType || !charSequence.equals(channelName)) {
                    return false;
                }
                if (remove.release() == 1) {
                    PublishSubscribeService.this.getEntry(channelName).returnPubSubConnection(remove.getConnection());
                }
                completableFuture.complete(null);
                return true;
            }
        });
        return completableFuture;
    }

    public void remove(MasterSlaveEntry masterSlaveEntry) {
        this.entry2PubSubConnection.remove(masterSlaveEntry);
    }

    public CompletableFuture<Codec> unsubscribe(ChannelName channelName, PubSubType pubSubType) {
        return unsubscribe(channelName, getEntry(channelName), pubSubType);
    }

    private CompletableFuture<Codec> unsubscribe(ChannelName channelName, MasterSlaveEntry masterSlaveEntry, PubSubType pubSubType) {
        if (this.connectionManager.isShuttingDown()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Codec> completableFuture = new CompletableFuture<>();
        AsyncSemaphore semaphore = getSemaphore(channelName);
        semaphore.acquire(() -> {
            PubSubConnectionEntry remove = this.name2PubSubConnection.remove(new PubSubKey(channelName, masterSlaveEntry));
            if (remove != null) {
                this.freePubSubLock.acquire(() -> {
                    this.entry2PubSubConnection.getOrDefault(masterSlaveEntry, new PubSubEntry()).getEntries().remove(remove);
                    this.freePubSubLock.release();
                    final Codec codec = pubSubType == PubSubType.PUNSUBSCRIBE ? remove.getConnection().getPatternChannels().get(channelName) : pubSubType == PubSubType.SUNSUBSCRIBE ? remove.getConnection().getShardedChannels().get(channelName) : remove.getConnection().getChannels().get(channelName);
                    remove.unsubscribe(pubSubType, channelName, new BaseRedisPubSubListener() { // from class: org.redisson.pubsub.PublishSubscribeService.3
                        @Override // org.redisson.client.BaseRedisPubSubListener, org.redisson.client.RedisPubSubListener
                        public boolean onStatus(PubSubType pubSubType2, CharSequence charSequence) {
                            if (pubSubType2 != pubSubType || !charSequence.equals(channelName)) {
                                return false;
                            }
                            semaphore.release();
                            completableFuture.complete(codec);
                            return true;
                        }
                    });
                });
            } else {
                semaphore.release();
                completableFuture.complete(null);
            }
        });
        return completableFuture;
    }

    private void addFreeConnectionEntry(ChannelName channelName, PubSubConnectionEntry pubSubConnectionEntry) {
        this.entry2PubSubConnection.computeIfAbsent(getEntry(channelName), masterSlaveEntry -> {
            return new PubSubEntry();
        }).getEntries().add(pubSubConnectionEntry);
    }

    public void reattachPubSub(int i) {
        this.name2PubSubConnection.entrySet().stream().filter(entry -> {
            return this.connectionManager.calcSlot(((PubSubKey) entry.getKey()).getChannelName().getName()) == i;
        }).forEach(entry2 -> {
            PubSubConnectionEntry pubSubConnectionEntry = (PubSubConnectionEntry) entry2.getValue();
            MasterSlaveEntry entry2 = ((PubSubKey) entry2.getKey()).getEntry();
            Codec codec = pubSubConnectionEntry.getConnection().getChannels().get(((PubSubKey) entry2.getKey()).getChannelName());
            if (codec != null) {
                Queue<RedisPubSubListener<?>> listeners = pubSubConnectionEntry.getListeners(((PubSubKey) entry2.getKey()).getChannelName());
                unsubscribe(((PubSubKey) entry2.getKey()).getChannelName(), entry2, PubSubType.UNSUBSCRIBE);
                subscribe(codec, ((PubSubKey) entry2.getKey()).getChannelName(), (RedisPubSubListener<?>[]) listeners.toArray(new RedisPubSubListener[0]));
            }
            if (pubSubConnectionEntry.getConnection().getShardedChannels().get(((PubSubKey) entry2.getKey()).getChannelName()) != null) {
                Queue<RedisPubSubListener<?>> listeners2 = pubSubConnectionEntry.getListeners(((PubSubKey) entry2.getKey()).getChannelName());
                unsubscribe(((PubSubKey) entry2.getKey()).getChannelName(), entry2, PubSubType.SUNSUBSCRIBE);
                subscribe(codec, ((PubSubKey) entry2.getKey()).getChannelName(), (RedisPubSubListener<?>[]) listeners2.toArray(new RedisPubSubListener[0]));
            }
            Codec codec2 = pubSubConnectionEntry.getConnection().getPatternChannels().get(((PubSubKey) entry2.getKey()).getChannelName());
            if (codec2 != null) {
                Queue<RedisPubSubListener<?>> listeners3 = pubSubConnectionEntry.getListeners(((PubSubKey) entry2.getKey()).getChannelName());
                unsubscribe(((PubSubKey) entry2.getKey()).getChannelName(), entry2, PubSubType.PUNSUBSCRIBE);
                psubscribe(((PubSubKey) entry2.getKey()).getChannelName(), codec2, (RedisPubSubListener<?>[]) listeners3.toArray(new RedisPubSubListener[0]));
            }
        });
    }

    public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
        for (Map.Entry<MasterSlaveEntry, PubSubEntry> entry : this.entry2PubSubConnection.entrySet()) {
            for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue().getEntries()) {
                if (pubSubConnectionEntry.getConnection().equals(redisPubSubConnection)) {
                    this.freePubSubLock.acquire(() -> {
                        ((PubSubEntry) entry.getValue()).getEntries().remove(pubSubConnectionEntry);
                        this.freePubSubLock.release();
                    });
                    reattachPubSubListeners(redisPubSubConnection.getChannels().keySet(), entry.getKey(), pubSubConnectionEntry, PubSubType.UNSUBSCRIBE);
                    reattachPubSubListeners(redisPubSubConnection.getShardedChannels().keySet(), entry.getKey(), pubSubConnectionEntry, PubSubType.SUNSUBSCRIBE);
                    reattachPubSubListeners(redisPubSubConnection.getPatternChannels().keySet(), entry.getKey(), pubSubConnectionEntry, PubSubType.PUNSUBSCRIBE);
                    return;
                }
            }
        }
    }

    private void reattachPubSubListeners(Set<ChannelName> set, MasterSlaveEntry masterSlaveEntry, PubSubConnectionEntry pubSubConnectionEntry, PubSubType pubSubType) {
        for (ChannelName channelName : set) {
            Queue<RedisPubSubListener<?>> listeners = pubSubConnectionEntry.getListeners(channelName);
            CompletableFuture<Codec> unsubscribe = unsubscribe(channelName, masterSlaveEntry, pubSubType);
            if (listeners.isEmpty()) {
                return;
            } else {
                unsubscribe.whenComplete((codec, th) -> {
                    if (codec == null) {
                        return;
                    }
                    if (pubSubType == PubSubType.PUNSUBSCRIBE) {
                        psubscribe(channelName, (Collection<RedisPubSubListener<?>>) listeners, codec);
                    } else if (pubSubType == PubSubType.SUNSUBSCRIBE) {
                        ssubscribe(channelName, (Collection<RedisPubSubListener<?>>) listeners, codec);
                    } else {
                        subscribe(channelName, (Collection<RedisPubSubListener<?>>) listeners, codec);
                    }
                });
            }
        }
    }

    private void subscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> collection, Codec codec) {
        subscribe(codec, channelName, (RedisPubSubListener<?>[]) collection.toArray(new RedisPubSubListener[0])).whenComplete((pubSubConnectionEntry, th) -> {
            if (th != null) {
                this.connectionManager.newTimeout(timeout -> {
                    subscribe(channelName, (Collection<RedisPubSubListener<?>>) collection, codec);
                }, 1L, TimeUnit.SECONDS);
            } else {
                log.info("listeners of '{}' channel to '{}' have been resubscribed", channelName, pubSubConnectionEntry.getConnection().getRedisClient());
            }
        });
    }

    private void ssubscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> collection, Codec codec) {
        ssubscribe(codec, channelName, (RedisPubSubListener<?>[]) collection.toArray(new RedisPubSubListener[0])).whenComplete((pubSubConnectionEntry, th) -> {
            if (th != null) {
                this.connectionManager.newTimeout(timeout -> {
                    ssubscribe(channelName, (Collection<RedisPubSubListener<?>>) collection, codec);
                }, 1L, TimeUnit.SECONDS);
            } else {
                log.info("listeners of '{}' channel to '{}' have been resubscribed", channelName, pubSubConnectionEntry.getConnection().getRedisClient());
            }
        });
    }

    private void psubscribe(ChannelName channelName, Collection<RedisPubSubListener<?>> collection, Codec codec) {
        psubscribe(channelName, codec, (RedisPubSubListener<?>[]) collection.toArray(new RedisPubSubListener[0])).whenComplete((collection2, th) -> {
            if (th != null) {
                this.connectionManager.newTimeout(timeout -> {
                    psubscribe(channelName, (Collection<RedisPubSubListener<?>>) collection, codec);
                }, 1L, TimeUnit.SECONDS);
            } else {
                log.info("listeners of '{}' channel-pattern to '{}' have been resubscribed", channelName, collection2);
            }
        });
    }

    public CompletableFuture<Void> removeListenerAsync(PubSubType pubSubType, ChannelName channelName, EventListener eventListener) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        AsyncSemaphore semaphore = getSemaphore(channelName);
        semaphore.acquire(() -> {
            Collection singletonList = Collections.singletonList(getEntry(channelName));
            if (isMultiEntity(channelName)) {
                singletonList = this.connectionManager.getEntrySet();
            }
            AtomicInteger atomicInteger = new AtomicInteger(singletonList.size());
            Iterator it = singletonList.iterator();
            while (it.hasNext()) {
                PubSubConnectionEntry pubSubConnectionEntry = this.name2PubSubConnection.get(new PubSubKey(channelName, (MasterSlaveEntry) it.next()));
                if (pubSubConnectionEntry != null) {
                    pubSubConnectionEntry.removeListener(channelName, eventListener);
                    if (!pubSubConnectionEntry.hasListeners(channelName)) {
                        unsubscribe(pubSubType, channelName).whenComplete((r6, th) -> {
                            if (atomicInteger.decrementAndGet() == 0) {
                                semaphore.release();
                                completableFuture.complete(null);
                            }
                        });
                    } else if (atomicInteger.decrementAndGet() == 0) {
                        semaphore.release();
                        completableFuture.complete(null);
                    }
                } else if (atomicInteger.decrementAndGet() == 0) {
                    semaphore.release();
                    completableFuture.complete(null);
                }
            }
        });
        return completableFuture;
    }

    public CompletableFuture<Void> removeListenerAsync(PubSubType pubSubType, ChannelName channelName, Integer... numArr) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        AsyncSemaphore semaphore = getSemaphore(channelName);
        semaphore.acquire(() -> {
            Collection singletonList = Collections.singletonList(getEntry(channelName));
            if (isMultiEntity(channelName)) {
                singletonList = this.connectionManager.getEntrySet();
            }
            AtomicInteger atomicInteger = new AtomicInteger(singletonList.size());
            Iterator it = singletonList.iterator();
            while (it.hasNext()) {
                PubSubConnectionEntry pubSubConnectionEntry = this.name2PubSubConnection.get(new PubSubKey(channelName, (MasterSlaveEntry) it.next()));
                if (pubSubConnectionEntry != null) {
                    for (Integer num : numArr) {
                        pubSubConnectionEntry.removeListener(channelName, num.intValue());
                    }
                    if (!pubSubConnectionEntry.hasListeners(channelName)) {
                        unsubscribe(pubSubType, channelName).whenComplete((r6, th) -> {
                            if (atomicInteger.decrementAndGet() == 0) {
                                semaphore.release();
                                completableFuture.complete(null);
                            }
                        });
                    } else if (atomicInteger.decrementAndGet() == 0) {
                        semaphore.release();
                        completableFuture.complete(null);
                    }
                } else if (atomicInteger.decrementAndGet() == 0) {
                    semaphore.release();
                    completableFuture.complete(null);
                }
            }
        });
        return completableFuture;
    }

    public CompletableFuture<Void> removeAllListenersAsync(PubSubType pubSubType, ChannelName channelName) {
        AsyncSemaphore semaphore = getSemaphore(channelName);
        int timeout = this.config.getTimeout() + (this.config.getRetryInterval() * this.config.getRetryAttempts());
        CompletableFuture completableFuture = new CompletableFuture();
        this.connectionManager.newTimeout(timeout2 -> {
            completableFuture.completeExceptionally(new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + ((Object) channelName) + " topic"));
        }, timeout, TimeUnit.MILLISECONDS);
        semaphore.acquire(() -> {
            completableFuture.complete(null);
        });
        return completableFuture.thenCompose(r8 -> {
            PubSubConnectionEntry pubSubEntry = getPubSubEntry(channelName);
            if (pubSubEntry == null) {
                semaphore.release();
                return CompletableFuture.completedFuture(null);
            }
            if (pubSubEntry.hasListeners(channelName)) {
                return unsubscribe(pubSubType, channelName).whenComplete((r3, th) -> {
                    semaphore.release();
                });
            }
            semaphore.release();
            return CompletableFuture.completedFuture(null);
        });
    }

    public String toString() {
        return "PublishSubscribeService [name2PubSubConnection=" + this.name2PubSubConnection + ", entry2PubSubConnection=" + this.entry2PubSubConnection + StringPool.RIGHT_SQ_BRACKET;
    }
}
