/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.impl.transport.netty;

import com.github.benmanes.caffeine.cache.Caffeine;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderException;
import io.reactivex.rxjava3.core.Completable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer;
import java.util.function.Function;
import net.jcip.annotations.GuardedBy;
import org.infinispan.client.hotrod.CacheTopologyInfo;
import org.infinispan.client.hotrod.FailoverRequestBalancingStrategy;
import org.infinispan.client.hotrod.configuration.ClientIntelligence;
import org.infinispan.client.hotrod.configuration.ClusterConfiguration;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ServerConfiguration;
import org.infinispan.client.hotrod.event.impl.ClientEventDispatcher;
import org.infinispan.client.hotrod.event.impl.ClientListenerNotifier;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.client.hotrod.exceptions.RemoteIllegalLifecycleStateException;
import org.infinispan.client.hotrod.exceptions.RemoteNodeSuspectException;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.ClientTopology;
import org.infinispan.client.hotrod.impl.TopologyInfo;
import org.infinispan.client.hotrod.impl.Util;
import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash;
import org.infinispan.client.hotrod.impl.consistenthash.SegmentConsistentHash;
import org.infinispan.client.hotrod.impl.operations.AddClientListenerOperation;
import org.infinispan.client.hotrod.impl.operations.ClientListenerOperation;
import org.infinispan.client.hotrod.impl.operations.DelegatingHotRodOperation;
import org.infinispan.client.hotrod.impl.operations.HotRodBulkOperation;
import org.infinispan.client.hotrod.impl.operations.HotRodOperation;
import org.infinispan.client.hotrod.impl.operations.NoCachePingOperation;
import org.infinispan.client.hotrod.impl.operations.NoHotRodOperation;
import org.infinispan.client.hotrod.impl.topology.CacheInfo;
import org.infinispan.client.hotrod.impl.topology.ClusterInfo;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelHandler;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelRecord;
import org.infinispan.client.hotrod.impl.transport.netty.HeaderDecoder;
import org.infinispan.client.hotrod.impl.transport.netty.OperationChannel;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.stat.CounterTracker;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.concurrent.CompletionStages;

public class OperationDispatcher {
    private static final Log log = LogFactory.getLog(OperationDispatcher.class, Log.class);
    public static final String DEFAULT_CLUSTER_NAME = "___DEFAULT-CLUSTER___";
    private final StampedLock lock = new StampedLock();
    private final List<ClusterInfo> clusters;
    private final int maxRetries;
    private final AtomicLong retryCounter = new AtomicLong();
    @GuardedBy(value="lock")
    private final TopologyInfo topologyInfo;
    @GuardedBy(value="lock")
    private CompletableFuture<Void> clusterSwitchStage;
    @GuardedBy(value="lock")
    private Set<HotRodOperation<?>> priorAgeOperations = null;
    @GuardedBy(value="lock")
    private final Set<SocketAddress> connectionFailedServers;
    private final ChannelHandler channelHandler;
    private final ExecutorService executorService;
    private final TimeService timeService;
    private final ClientListenerNotifier clientListenerNotifier;
    private final CounterTracker totalRetriesMetric;
    private volatile boolean isRunning;

    public OperationDispatcher(Configuration configuration, ExecutorService executorService, TimeService timeService, ClientListenerNotifier clientListenerNotifier, Consumer<ChannelPipeline> pipelineDecorator) {
        this.executorService = executorService;
        this.timeService = timeService;
        this.clientListenerNotifier = clientListenerNotifier;
        this.maxRetries = configuration.maxRetries();
        this.connectionFailedServers = configuration.serverFailureTimeout() > 0 ? Collections.newSetFromMap(Caffeine.newBuilder().expireAfterWrite((long)configuration.serverFailureTimeout(), TimeUnit.MILLISECONDS).build().asMap()) : ConcurrentHashMap.newKeySet();
        ArrayList<InetSocketAddress> initialServers = new ArrayList<InetSocketAddress>();
        for (ServerConfiguration server : configuration.servers()) {
            initialServers.add(InetSocketAddress.createUnresolved(server.host(), server.port()));
        }
        ClusterInfo mainCluster = new ClusterInfo(DEFAULT_CLUSTER_NAME, initialServers, configuration.clientIntelligence(), configuration.security().ssl().sniHostName());
        this.topologyInfo = new TopologyInfo(configuration, mainCluster);
        ArrayList<ClusterInfo> clustersDefinitions = new ArrayList<ClusterInfo>();
        if (log.isDebugEnabled()) {
            log.debugf("Statically configured servers: %s", initialServers);
            log.debugf("Tcp no delay = %b; client socket timeout = %d ms; connect timeout = %d ms", configuration.tcpNoDelay(), configuration.socketTimeout(), configuration.connectionTimeout());
        }
        if (!configuration.clusters().isEmpty()) {
            for (ClusterConfiguration clusterConfiguration : configuration.clusters()) {
                ArrayList<InetSocketAddress> alternateServers = new ArrayList<InetSocketAddress>();
                for (ServerConfiguration server : clusterConfiguration.getCluster()) {
                    alternateServers.add(InetSocketAddress.createUnresolved(server.host(), server.port()));
                }
                ClientIntelligence intelligence = clusterConfiguration.getClientIntelligence() != null ? clusterConfiguration.getClientIntelligence() : configuration.clientIntelligence();
                String sniHostName = clusterConfiguration.sniHostName() != null ? clusterConfiguration.sniHostName() : configuration.security().ssl().sniHostName();
                ClusterInfo alternateCluster = new ClusterInfo(clusterConfiguration.getClusterName(), alternateServers, intelligence, sniHostName);
                log.debugf("Add secondary cluster: %s", alternateCluster);
                clustersDefinitions.add(alternateCluster);
            }
            clustersDefinitions.add(mainCluster);
        }
        this.clusters = List.copyOf(clustersDefinitions);
        this.channelHandler = new ChannelHandler(configuration, this.topologyInfo.getCluster().getSniHostName(), executorService, this, pipelineDecorator);
        this.topologyInfo.getOrCreateCacheInfo("");
        this.totalRetriesMetric = configuration.metricRegistry().createCounter("connection.pool.retries", "The total number of retries", Map.of(), null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CacheInfo getCacheInfo(String cacheName) {
        long stamp = this.lock.tryOptimisticRead();
        CacheInfo cacheInfo = this.topologyInfo.getCacheInfo(cacheName);
        if (!this.lock.validate(stamp)) {
            stamp = this.lock.readLock();
            try {
                cacheInfo = this.topologyInfo.getCacheInfo(cacheName);
            }
            finally {
                this.lock.unlockRead(stamp);
            }
        }
        return cacheInfo;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ClusterInfo getClusterInfo() {
        long stamp = this.lock.readLock();
        try {
            ClusterInfo clusterInfo = this.topologyInfo.getCluster();
            return clusterInfo;
        }
        finally {
            this.lock.unlockRead(stamp);
        }
    }

    public TimeService getTimeService() {
        return this.timeService;
    }

    public ClientListenerNotifier getClientListenerNotifier() {
        return this.clientListenerNotifier;
    }

    public void start() {
        this.isRunning = true;
        Util.await(CompletionStages.performSequentially(this.topologyInfo.getCluster().getInitialServers().iterator(), sa -> this.channelHandler.startChannelIfNeeded((SocketAddress)sa).exceptionally(t -> {
            if (log.isTraceEnabled()) {
                log.tracef((Throwable)t, "Ignoring exception establishing a connection to initial server %s", sa);
            }
            return null;
        })));
    }

    public void stop() {
        try {
            this.isRunning = false;
            this.channelHandler.close();
        }
        catch (Exception e) {
            log.warn("Exception while shutting down the operation dispatcher.", e);
        }
    }

    public OperationChannel getHandlerForAddress(SocketAddress socketAddress) {
        return this.channelHandler.getChannelForAddress(socketAddress);
    }

    public <E> CompletionStage<E> execute(HotRodOperation<E> operation) {
        assert (!operation.isInstanceOf(ClientListenerOperation.class));
        return this.execute(operation, Set.of());
    }

    public <E, O extends HotRodOperation<E>> CompletionStage<E> executeBulk(String cacheName, HotRodBulkOperation<?, E, O> operation) {
        return operation.executeOperations(this.identifyOperationTarget(cacheName, this.connectionFailedServers), this::executeOnSingleAddress);
    }

    public CompletionStage<Channel> executeAddListener(ClientListenerOperation operation) {
        return this.executeAddListener(operation, this.getBalancer(operation.getCacheName()).nextServer(Set.of()));
    }

    public CompletionStage<Channel> executeAddListener(ClientListenerOperation operation, SocketAddress target) {
        this.clientListenerNotifier.addDispatcher(ClientEventDispatcher.create(operation, target, () -> {}, operation.getRemoteCache()));
        operation.whenComplete((channel, t) -> {
            if (t != null) {
                log.errorf("Error encountered trying to add listener %s", operation.listener);
                this.clientListenerNotifier.removeClientListener(operation.listenerId);
            } else {
                SocketAddress unresolvedAddress = ChannelRecord.of(channel);
                if (unresolvedAddress != target) {
                    this.clientListenerNotifier.addDispatcher(ClientEventDispatcher.create(operation, unresolvedAddress, () -> {}, operation.getRemoteCache()));
                }
                this.addListener(unresolvedAddress, operation.listenerId);
                this.clientListenerNotifier.startClientListener(operation.listenerId);
            }
        });
        return this.executeOnSingleAddress(operation, target);
    }

    private Function<Object, SocketAddress> identifyOperationTarget(String cacheName, Set<SocketAddress> failedServers) {
        CacheInfo info = this.getCacheInfo(cacheName);
        if (info != null && info.getConsistentHash() != null) {
            ConsistentHash ch = info.getConsistentHash();
            return ch::getServer;
        }
        FailoverRequestBalancingStrategy frbs = this.getBalancer(cacheName);
        return obj -> frbs.nextServer(failedServers);
    }

    private <E> CompletionStage<E> execute(HotRodOperation<E> operation, Set<SocketAddress> opFailedServers) {
        Object routingObj = operation.getRoutingObject();
        SocketAddress targetAddress = null;
        if (routingObj != null) {
            targetAddress = this.addressForObject(routingObj, operation.getCacheName(), opFailedServers);
        }
        if (targetAddress == null) {
            targetAddress = this.getBalancer(operation.getCacheName()).nextServer(opFailedServers);
        }
        return this.executeOnSingleAddress(operation, targetAddress);
    }

    public SocketAddress addressForObject(Object routingObject, String cacheName) {
        return this.addressForObject(routingObject, cacheName, Set.of());
    }

    protected SocketAddress addressForObject(Object routingObject, String cacheName, Set<SocketAddress> opFailedServers) {
        SocketAddress server;
        CacheInfo cacheInfo = this.getCacheInfo(cacheName);
        if (cacheInfo != null && cacheInfo.getConsistentHash() != null && (server = cacheInfo.getConsistentHash().getServer(routingObject)) != null && !opFailedServers.contains(server)) {
            return server;
        }
        return null;
    }

    public <E> CompletionStage<E> executeOnSingleAddress(HotRodOperation<E> operation, SocketAddress socketAddress) {
        if (!this.connectionFailedServers.isEmpty() && this.connectionFailedServers.contains(socketAddress)) {
            log.tracef("Server %s is suspected, trying another for %s", socketAddress, operation);
            socketAddress = this.getBalancer(operation.getCacheName()).nextServer(this.connectionFailedServers);
        }
        log.tracef("Dispatching %s to %s", operation, socketAddress);
        return this.channelHandler.submitOperation(operation, Objects.requireNonNull(socketAddress));
    }

    public FailoverRequestBalancingStrategy getBalancer(String cacheName) {
        return this.topologyInfo.getOrCreateCacheInfo(cacheName).getBalancer();
    }

    public ClientIntelligence getClientIntelligence() {
        return this.getClusterInfo().getIntelligence();
    }

    public CacheTopologyInfo getCacheTopologyInfo(String cacheName) {
        return this.getCacheInfo(cacheName).getCacheTopologyInfo();
    }

    public ClientTopology getClientTopologyInfo(String cacheName) {
        return this.getCacheInfo(cacheName).getClientTopologyRef().get();
    }

    public ChannelHandler getChannelHandler() {
        return this.channelHandler;
    }

    public Map<SocketAddress, Set<Integer>> getPrimarySegmentsByAddress(String cacheName) {
        CacheInfo cacheInfo = this.getCacheInfo(cacheName);
        return cacheInfo != null ? cacheInfo.getPrimarySegments() : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<InetSocketAddress> getServers() {
        long stamp = this.lock.readLock();
        try {
            Collection<InetSocketAddress> collection = this.topologyInfo.getAllServers();
            return collection;
        }
        finally {
            this.lock.unlockRead(stamp);
        }
    }

    @GuardedBy(value="lock")
    private boolean fromPreviousAge(HotRodOperation<?> operation) {
        return this.priorAgeOperations != null && this.priorAgeOperations.contains(operation);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateTopology(String cacheName, HotRodOperation<?> operation, int responseTopologyId, InetSocketAddress[] addresses, SocketAddress[][] segmentOwners, short hashFunctionVersion) {
        long stamp = this.lock.writeLock();
        try {
            CacheInfo cacheInfo = this.topologyInfo.getCacheInfo(cacheName);
            assert (cacheInfo != null) : "The cache info must exist before receiving a topology update";
            if (this.priorAgeOperations == null && responseTopologyId != cacheInfo.getTopologyId()) {
                CacheInfo newCacheInfo;
                List<InetSocketAddress> addressList = Arrays.asList(addresses);
                Log.HOTROD.newTopology(responseTopologyId, -1, addresses.length, addressList);
                if (hashFunctionVersion >= 0) {
                    SegmentConsistentHash consistentHash = this.createConsistentHash(segmentOwners, hashFunctionVersion, cacheInfo.getCacheName());
                    newCacheInfo = cacheInfo.withNewHash(responseTopologyId, addressList, consistentHash, segmentOwners.length);
                } else {
                    newCacheInfo = cacheInfo.withNewServers(responseTopologyId, addressList);
                }
                this.updateCacheInfo(cacheName, newCacheInfo);
            } else if (log.isTraceEnabled()) {
                log.tracef("[%s] Ignoring outdated topology: topology id = %s, previous topology age = %s, servers = %s", new Object[]{cacheInfo.getCacheName(), responseTopologyId, this.fromPreviousAge(operation), Arrays.toString(addresses)});
            }
        }
        finally {
            this.lock.unlockWrite(stamp);
        }
    }

    private SegmentConsistentHash createConsistentHash(SocketAddress[][] segmentOwners, short hashFunctionVersion, String cacheNameString) {
        if (log.isTraceEnabled()) {
            if (hashFunctionVersion == 0) {
                log.tracef("[%s] Not using a consistent hash function (hash function version == 0).", cacheNameString);
            } else {
                log.tracef("[%s] Updating client hash function with %s number of segments", cacheNameString, segmentOwners.length);
            }
        }
        return this.topologyInfo.createConsistentHash(segmentOwners.length, hashFunctionVersion, segmentOwners);
    }

    @GuardedBy(value="lock")
    protected void updateCacheInfo(String cacheName, CacheInfo newCacheInfo) {
        List<InetSocketAddress> newServers = newCacheInfo.getServers();
        CacheInfo oldCacheInfo = this.topologyInfo.getCacheInfo(cacheName);
        List<InetSocketAddress> oldServers = oldCacheInfo.getServers();
        HashSet<InetSocketAddress> addedServers = new HashSet<InetSocketAddress>(newServers);
        oldServers.forEach(addedServers::remove);
        HashSet<InetSocketAddress> removedServers = new HashSet<InetSocketAddress>(oldServers);
        newServers.forEach(removedServers::remove);
        if (log.isTraceEnabled()) {
            String cacheNameString = newCacheInfo.getCacheName();
            log.tracef("[%s] Current list: %s", cacheNameString, oldServers);
            log.tracef("[%s] New list: %s", cacheNameString, newServers);
            log.tracef("[%s] Added servers: %s", cacheNameString, addedServers);
            log.tracef("[%s] Removed servers: %s", cacheNameString, removedServers);
        }
        for (SocketAddress socketAddress : addedServers) {
            Log.HOTROD.newServerAdded(socketAddress);
            this.channelHandler.startChannelIfNeeded(socketAddress);
        }
        this.topologyInfo.updateCacheInfo(cacheName, oldCacheInfo, newCacheInfo);
        for (SocketAddress socketAddress : removedServers) {
            Log.HOTROD.removingServer(socketAddress);
            this.connectionFailedServers.remove(socketAddress);
            this.closeChannel(socketAddress);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void trySwitchCluster() {
        ClusterInfo cluster;
        int ageBeforeSwitch;
        long stamp = this.lock.writeLock();
        try {
            ageBeforeSwitch = this.topologyInfo.getTopologyAge();
            cluster = this.topologyInfo.getCluster();
            if (this.clusterSwitchStage != null) {
                if (log.isTraceEnabled()) {
                    log.trace("Cluster switch is already in progress");
                }
                return;
            }
            this.clusterSwitchStage = new CompletableFuture();
        }
        finally {
            this.lock.unlockWrite(stamp);
        }
        this.checkServersAlive(cluster.getInitialServers()).thenCompose(alive -> {
            if (alive.booleanValue()) {
                if (log.isTraceEnabled()) {
                    log.tracef("Cluster %s is still alive, not switching", cluster);
                }
                return CompletableFuture.completedFuture(null);
            }
            if (log.isTraceEnabled()) {
                log.tracef("Trying to switch cluster away from '%s'", cluster.getName());
            }
            return this.findLiveCluster(cluster, ageBeforeSwitch);
        }).thenAccept(newCluster -> {
            if (newCluster != null) {
                this.automaticSwitchToCluster((ClusterInfo)newCluster, cluster, ageBeforeSwitch);
            }
        }).whenComplete((__, t) -> this.completeClusterSwitch());
    }

    private CompletionStage<Boolean> checkServersAlive(Collection<InetSocketAddress> servers) {
        if (servers.isEmpty()) {
            return CompletableFuture.completedFuture(false);
        }
        AtomicInteger remainingResponses = new AtomicInteger(servers.size());
        CompletableFuture<Boolean> allFuture = new CompletableFuture<Boolean>();
        for (SocketAddress socketAddress : servers) {
            this.executeOnSingleAddress(new NoCachePingOperation(), socketAddress).whenComplete((result, throwable) -> {
                if (throwable != null) {
                    if (log.isTraceEnabled()) {
                        log.tracef((Throwable)throwable, "Error checking whether this server is alive: %s", server);
                    }
                    if (remainingResponses.decrementAndGet() == 0) {
                        allFuture.complete(false);
                    }
                } else {
                    log.tracef("Ping to server %s succeeded", server);
                    allFuture.complete(true);
                }
            });
        }
        return allFuture;
    }

    private CompletionStage<ClusterInfo> findLiveCluster(ClusterInfo failedCluster, int ageBeforeSwitch) {
        ArrayList<ClusterInfo> candidateClusters = new ArrayList<ClusterInfo>();
        for (ClusterInfo cluster : this.clusters) {
            String clusterName = cluster.getName();
            if (clusterName.equals(failedCluster.getName())) continue;
            candidateClusters.add(cluster);
        }
        Iterator<ClusterInfo> clusterIterator = candidateClusters.iterator();
        return this.findLiveCluster0(false, null, clusterIterator, ageBeforeSwitch);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletionStage<ClusterInfo> findLiveCluster0(boolean alive, ClusterInfo testedCluster, Iterator<ClusterInfo> clusterIterator, int ageBeforeSwitch) {
        long stamp = this.lock.writeLock();
        try {
            if (this.clusterSwitchStage == null || this.topologyInfo.getTopologyAge() != ageBeforeSwitch) {
                log.debugf("Cluster switch already completed by another thread, bailing out", new Object[0]);
                CompletableFuture<Object> completableFuture = CompletableFuture.completedFuture(null);
                return completableFuture;
            }
        }
        finally {
            this.lock.unlockWrite(stamp);
        }
        if (alive) {
            return CompletableFuture.completedFuture(testedCluster);
        }
        if (!clusterIterator.hasNext()) {
            log.debugf("All cluster addresses viewed and none worked: %s", this.clusters);
            return CompletableFuture.completedFuture(null);
        }
        ClusterInfo nextCluster = clusterIterator.next();
        return this.checkServersAlive(nextCluster.getInitialServers()).thenCompose(aliveNext -> this.findLiveCluster0((boolean)aliveNext, nextCluster, clusterIterator, ageBeforeSwitch));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void automaticSwitchToCluster(ClusterInfo newCluster, ClusterInfo failedCluster, int ageBeforeSwitch) {
        long stamp = this.lock.writeLock();
        try {
            if (this.clusterSwitchStage == null || this.priorAgeOperations != null) {
                log.debugf("Cluster switch already completed by another thread, bailing out", new Object[0]);
                return;
            }
            log.debugf("Switching to cluster %s, servers: %s", newCluster.getName(), newCluster.getInitialServers());
            this.markPendingOperationsAsPriorAge();
            for (SocketAddress socketAddress : this.topologyInfo.getAllServers()) {
                this.closeChannel(socketAddress);
                this.connectionFailedServers.remove(socketAddress);
            }
            this.topologyInfo.switchCluster(newCluster);
        }
        finally {
            this.lock.unlockWrite(stamp);
        }
        if (!newCluster.getName().equals(DEFAULT_CLUSTER_NAME)) {
            Log.HOTROD.switchedToCluster(newCluster.getName());
        } else {
            Log.HOTROD.switchedBackToMainCluster();
        }
    }

    private void closeChannel(SocketAddress server) {
        List<HotRodOperation<?>> ops = this.channelHandler.closeChannel(server);
        if (!ops.isEmpty()) {
            this.executorService.submit(() -> {
                TransportException transportException = log.connectionClosed(server, server);
                for (HotRodOperation op : ops) {
                    this.handleResponse(op, -1L, server, null, (Throwable)transportException);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean manualSwitchToCluster(String clusterName) {
        if (this.clusters.isEmpty()) {
            log.debugf("No alternative clusters configured, so can't switch cluster", new Object[0]);
            return false;
        }
        ClusterInfo newCluster = null;
        for (ClusterInfo cluster : this.clusters) {
            if (!cluster.getName().equals(clusterName)) continue;
            newCluster = cluster;
        }
        if (newCluster == null) {
            log.debugf("Cluster named %s does not exist in the configuration", clusterName);
            return false;
        }
        long stamp = this.lock.writeLock();
        boolean shouldComplete = false;
        try {
            if (this.clusterSwitchStage != null) {
                log.debugf("Another cluster switch is already in progress, overriding it", new Object[0]);
                shouldComplete = true;
            }
            log.debugf("Switching to cluster %s, servers: %s", clusterName, newCluster.getInitialServers());
            this.markPendingOperationsAsPriorAge();
            this.topologyInfo.switchCluster(newCluster);
        }
        finally {
            this.lock.unlockWrite(stamp);
        }
        if (!clusterName.equals(DEFAULT_CLUSTER_NAME)) {
            Log.HOTROD.manuallySwitchedToCluster(clusterName);
        } else {
            Log.HOTROD.manuallySwitchedBackToMainCluster();
        }
        if (shouldComplete) {
            this.completeClusterSwitch();
        }
        return true;
    }

    @GuardedBy(value="lock")
    private void markPendingOperationsAsPriorAge() {
        Set<Object> set;
        if (this.priorAgeOperations == null) {
            set = ConcurrentHashMap.newKeySet();
            this.priorAgeOperations = set;
        } else {
            set = this.priorAgeOperations;
        }
        NoHotRodOperation endPlaceholder = NoHotRodOperation.instance();
        set.add(endPlaceholder);
        this.channelHandler.pendingOperationFlowable().concatMapCompletable(op -> {
            set.add(op);
            return Completable.fromCompletionStage((CompletionStage)op.asCompletableFuture().whenComplete((___, t) -> set.remove(op)));
        }).subscribe(() -> {
            if (set.isEmpty()) {
                long stamp = this.lock.writeLock();
                try {
                    this.priorAgeOperations = null;
                }
                finally {
                    this.lock.unlockWrite(stamp);
                }
            }
        }, t -> log.fatal("Problem occurred while configuring prior age operations for cluster failover", (Throwable)t));
        set.remove(endPlaceholder);
        if (set.isEmpty()) {
            this.priorAgeOperations = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void completeClusterSwitch() {
        CompletableFuture<Void> localStage;
        long stamp = this.lock.writeLock();
        try {
            localStage = this.clusterSwitchStage;
            this.clusterSwitchStage = null;
        }
        finally {
            this.lock.unlockWrite(stamp);
        }
        if (localStage != null) {
            localStage.complete(null);
        }
    }

    public <E> void handleResponse(HotRodOperation<E> op, long messageId, Channel channel, E returnValue, Throwable t) {
        this.handleResponse(op, messageId, ChannelRecord.of(channel), returnValue, t);
    }

    public <E> void handleResponse(HotRodOperation<E> op, long messageId, SocketAddress unresolvedAddress, E returnValue, Throwable t) {
        if (log.isTraceEnabled()) {
            log.tracef("Completing message %d op %s with value %s or exception %s", new Object[]{messageId, op, org.infinispan.commons.util.Util.toStr(returnValue), t});
        }
        if (t != null) {
            RetryingHotRodOperation<?> retryOp = this.checkException(t, unresolvedAddress, op);
            if (retryOp != null) {
                if (op instanceof AddClientListenerOperation) {
                    AddClientListenerOperation aclo = (AddClientListenerOperation)op;
                    this.logAndRetryOrFail(t, retryOp, aclo);
                } else {
                    this.logAndRetryOrFail(t, retryOp);
                }
            }
        } else {
            op.asCompletableFuture().complete(returnValue);
        }
    }

    private void addListener(SocketAddress sa, byte[] listenerId) {
        OperationChannel operationChannel = this.channelHandler.getChannelForAddress(sa);
        if (operationChannel == null) {
            throw new IllegalStateException("Channel is not running for address " + String.valueOf(sa));
        }
        HeaderDecoder headerDecoder = (HeaderDecoder)operationChannel.getChannel().pipeline().get("header-decoder");
        headerDecoder.addListener(listenerId);
    }

    public void removeListener(SocketAddress sa, byte[] listenerId) {
        OperationChannel operationChannel = this.channelHandler.getChannelForAddress(sa);
        if (operationChannel != null) {
            HeaderDecoder headerDecoder = (HeaderDecoder)operationChannel.getChannel().pipeline().get("header-decoder");
            headerDecoder.removeListener(listenerId);
        }
    }

    public SocketAddress unresolvedAddressForChannel(Channel c) {
        return ChannelRecord.of(c);
    }

    private RetryingHotRodOperation<?> checkException(Throwable cause, SocketAddress unresolvedAddress, HotRodOperation<?> op) {
        while (cause instanceof DecoderException && cause.getCause() != null) {
            cause = cause.getCause();
        }
        if (!op.supportRetry() || this.isServerError(cause) && !(cause instanceof RemoteIllegalLifecycleStateException) && !(cause instanceof RemoteNodeSuspectException)) {
            op.asCompletableFuture().completeExceptionally(cause);
            return null;
        }
        if (Thread.interrupted()) {
            InterruptedException e = new InterruptedException();
            e.addSuppressed(cause);
            op.asCompletableFuture().completeExceptionally(e);
            return null;
        }
        RetryingHotRodOperation<?> retrying = RetryingHotRodOperation.retryingOp(op);
        if (unresolvedAddress != null) {
            retrying.addFailedServer(unresolvedAddress);
        }
        return retrying;
    }

    protected final boolean isServerError(Throwable t) {
        return t instanceof HotRodClientException && ((HotRodClientException)t).isServerError();
    }

    protected void logAndRetryOrFail(Throwable t, RetryingHotRodOperation<?> op) {
        if (this.canRetry(t, op)) {
            this.execute(op, op.getFailedServers());
        }
    }

    protected void logAndRetryOrFail(Throwable t, RetryingHotRodOperation<?> op, AddClientListenerOperation aclo) {
        if (this.canRetry(t, op)) {
            FailoverRequestBalancingStrategy balancer = this.getBalancer(op.getCacheName());
            SocketAddress sa = balancer.nextServer(op.getFailedServers());
            if (this.connectionFailedServers.contains(sa)) {
                sa = balancer.nextServer(op.getFailedServers());
            }
            this.clientListenerNotifier.addDispatcher(ClientEventDispatcher.create(aclo, sa, () -> {}, aclo.getRemoteCache()));
            this.executeOnSingleAddress(op, sa);
        }
    }

    protected boolean canRetry(Throwable t, RetryingHotRodOperation<?> op) {
        int retryAttempt = op.incrementRetry();
        if (retryAttempt <= this.maxRetries) {
            if (log.isTraceEnabled()) {
                log.tracef(t, "Exception encountered in %s. Retry %d out of %d", this, retryAttempt, this.maxRetries);
            }
            this.totalRetriesMetric.increment();
            this.retryCounter.incrementAndGet();
            op.reset();
            return true;
        }
        Log.HOTROD.exceptionAndNoRetriesLeft(retryAttempt, this.maxRetries, t);
        op.asCompletableFuture().completeExceptionally(t);
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleConnectionFailure(OperationChannel operationChannel, Throwable t) {
        if (t != null) {
            boolean allInitialServersFailed;
            long stamp = this.lock.writeLock();
            try {
                this.connectionFailedServers.add(operationChannel.getAddress());
                allInitialServersFailed = this.connectionFailedServers.containsAll(this.topologyInfo.getCluster().getInitialServers());
                if (log.isTraceEnabled()) {
                    log.tracef("Connection attempt failed, we now have %d servers with no established connections: %s", this.connectionFailedServers.size(), this.connectionFailedServers);
                }
                if (!allInitialServersFailed || this.clusters.isEmpty()) {
                    this.resetCachesWithFailedServers();
                }
            }
            finally {
                this.lock.unlockWrite(stamp);
            }
            if (allInitialServersFailed && !this.clusters.isEmpty()) {
                this.trySwitchCluster();
            }
            this.handleChannelFailure(operationChannel, t);
        } else {
            SocketAddress unresolvedAddress = operationChannel.getAddress();
            ChannelRecord.set(operationChannel.getChannel(), unresolvedAddress);
            this.connectionFailedServers.remove(unresolvedAddress);
            log.tracef("OperationChannel connected: %s", operationChannel);
        }
    }

    @GuardedBy(value="lock")
    private void resetCachesWithFailedServers() {
        ArrayList<String> failedCaches = new ArrayList<String>();
        this.topologyInfo.forEachCache((cacheNameBytes, cacheInfo) -> {
            boolean canReset;
            List<InetSocketAddress> cacheServers = cacheInfo.getServers();
            boolean currentServersHaveFailed = this.connectionFailedServers.containsAll(cacheServers);
            boolean bl = canReset = !cacheServers.equals(this.topologyInfo.getCluster().getInitialServers());
            if (currentServersHaveFailed && canReset) {
                failedCaches.add(cacheInfo.getCacheName());
            }
        });
        if (!failedCaches.isEmpty()) {
            Log.HOTROD.revertCacheToInitialServerList(failedCaches);
            for (String cacheName : failedCaches) {
                this.topologyInfo.reset(cacheName);
            }
        }
    }

    public void handleChannelFailure(Channel channel, Throwable t) {
        assert (channel.eventLoop().inEventLoop());
        if (!this.isRunning) {
            log.tracef("Dispatcher is not running, ignoring received exception: " + t.toString(), new Object[0]);
            return;
        }
        SocketAddress unresolved = ChannelRecord.of(channel);
        OperationChannel operationChannel = this.channelHandler.getChannelForAddress(unresolved);
        if (operationChannel != null) {
            this.handleChannelFailure(operationChannel, t);
        }
    }

    private void handleChannelFailure(OperationChannel operationChannel, Throwable t) {
        Iterable<HotRodOperation<?>> ops = operationChannel.reconnect(t);
        for (HotRodOperation<?> op : ops) {
            this.handleResponse(op, -1L, operationChannel.getAddress(), null, t);
        }
    }

    public String getCurrentClusterName() {
        return this.getClusterInfo().getName();
    }

    public long getRetries() {
        return this.retryCounter.get();
    }

    public ConsistentHash getConsistentHash(String cacheName) {
        return this.getCacheInfo(cacheName).getConsistentHash();
    }

    public int getTopologyId(String cacheName) {
        return this.getCacheInfo(cacheName).getTopologyId();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<InetSocketAddress> getServers(String cacheName) {
        long stamp = this.lock.readLock();
        try {
            List<InetSocketAddress> list = this.topologyInfo.getServers(cacheName);
            return list;
        }
        finally {
            this.lock.unlockRead(stamp);
        }
    }

    public void addCacheTopologyInfoIfAbsent(String cacheName) {
        this.topologyInfo.getOrCreateCacheInfo(cacheName);
    }

    public Set<SocketAddress> getConnectionFailedServers() {
        return this.connectionFailedServers;
    }

    static class RetryingHotRodOperation<T>
    extends DelegatingHotRodOperation<T> {
        private final Set<SocketAddress> failedServers = new HashSet<SocketAddress>();
        private int retryCount;

        static <T> RetryingHotRodOperation<T> retryingOp(HotRodOperation<T> op) {
            if (op instanceof RetryingHotRodOperation) {
                RetryingHotRodOperation operation = (RetryingHotRodOperation)op;
                return operation;
            }
            return new RetryingHotRodOperation<T>(op);
        }

        RetryingHotRodOperation(HotRodOperation<T> op) {
            super(op);
        }

        void addFailedServer(SocketAddress socketAddress) {
            this.failedServers.add(socketAddress);
        }

        int incrementRetry() {
            return ++this.retryCount;
        }

        public Set<SocketAddress> getFailedServers() {
            return this.failedServers;
        }
    }
}

