/*
 * Decompiled with CFR 0.152.
 */
package org.ehcache.clustered.server.store;

import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.ehcache.clustered.common.Consistency;
import org.ehcache.clustered.common.internal.ServerStoreConfiguration;
import org.ehcache.clustered.common.internal.exceptions.ClusterException;
import org.ehcache.clustered.common.internal.exceptions.InvalidOperationException;
import org.ehcache.clustered.common.internal.exceptions.InvalidStoreException;
import org.ehcache.clustered.common.internal.exceptions.LifecycleException;
import org.ehcache.clustered.common.internal.messages.ClusterTierReconnectMessage;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityMessage;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
import org.ehcache.clustered.common.internal.messages.EhcacheMessageType;
import org.ehcache.clustered.common.internal.messages.EhcacheOperationMessage;
import org.ehcache.clustered.common.internal.messages.LifecycleMessage;
import org.ehcache.clustered.common.internal.messages.ReconnectMessageCodec;
import org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage;
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage;
import org.ehcache.clustered.common.internal.store.Chain;
import org.ehcache.clustered.common.internal.store.ClusterTierEntityConfiguration;
import org.ehcache.clustered.common.internal.store.Element;
import org.ehcache.clustered.server.CommunicatorServiceConfiguration;
import org.ehcache.clustered.server.KeySegmentMapper;
import org.ehcache.clustered.server.ServerSideServerStore;
import org.ehcache.clustered.server.ServerStoreCompatibility;
import org.ehcache.clustered.server.ServerStoreEventListener;
import org.ehcache.clustered.server.internal.messages.EhcacheDataSyncMessage;
import org.ehcache.clustered.server.internal.messages.EhcacheMessageTrackerCatchup;
import org.ehcache.clustered.server.internal.messages.EhcacheMessageTrackerMessage;
import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage;
import org.ehcache.clustered.server.management.ClusterTierManagement;
import org.ehcache.clustered.server.offheap.InternalChain;
import org.ehcache.clustered.server.state.EhcacheStateContext;
import org.ehcache.clustered.server.state.EhcacheStateService;
import org.ehcache.clustered.server.state.InvalidationTracker;
import org.ehcache.clustered.server.state.config.EhcacheStoreStateServiceConfig;
import org.ehcache.clustered.server.store.ClusterTierDump;
import org.ehcache.clustered.server.store.LockManagerImpl;
import org.ehcache.clustered.server.store.NoopLockManager;
import org.ehcache.clustered.server.store.ServerLockManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.client.message.tracker.OOOMessageHandler;
import org.terracotta.client.message.tracker.OOOMessageHandlerConfiguration;
import org.terracotta.entity.ActiveInvokeContext;
import org.terracotta.entity.ActiveServerEntity;
import org.terracotta.entity.BasicServiceConfiguration;
import org.terracotta.entity.ClientCommunicator;
import org.terracotta.entity.ClientDescriptor;
import org.terracotta.entity.ClientSourceId;
import org.terracotta.entity.ConfigurationException;
import org.terracotta.entity.EntityMessage;
import org.terracotta.entity.EntityResponse;
import org.terracotta.entity.EntityUserException;
import org.terracotta.entity.IEntityMessenger;
import org.terracotta.entity.InvokeContext;
import org.terracotta.entity.MessageCodecException;
import org.terracotta.entity.PassiveSynchronizationChannel;
import org.terracotta.entity.ServiceConfiguration;
import org.terracotta.entity.ServiceException;
import org.terracotta.entity.ServiceRegistry;
import org.terracotta.entity.StateDumpCollector;

public class ClusterTierActiveEntity
implements ActiveServerEntity<EhcacheEntityMessage, EhcacheEntityResponse> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClusterTierActiveEntity.class);
    static final String SYNC_DATA_SIZE_PROP = "ehcache.sync.data.size.threshold";
    private static final long DEFAULT_SYNC_DATA_SIZE_THRESHOLD = 0x200000L;
    static final String SYNC_DATA_GETS_PROP = "ehcache.sync.data.gets.threshold";
    private static final int DEFAULT_SYNC_DATA_GETS_THRESHOLD = 8192;
    static final String CHAIN_COMPACTION_THRESHOLD_PROP = "ehcache.server.chain.compaction.threshold";
    private static final int DEFAULT_CHAIN_COMPACTION_THRESHOLD = 8;
    private static final int MAX_SYNC_CONCURRENCY = 1;
    private static final Executor SYNC_GETS_EXECUTOR = new ThreadPoolExecutor(0, 1, 20L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    private final String storeIdentifier;
    private final ServerStoreConfiguration configuration;
    private final ClientCommunicator clientCommunicator;
    private final EhcacheStateService stateService;
    private final OOOMessageHandler<EhcacheEntityMessage, EhcacheEntityResponse> messageHandler;
    private final IEntityMessenger<EhcacheEntityMessage, EhcacheEntityResponse> entityMessenger;
    private final ServerStoreCompatibility storeCompatibility = new ServerStoreCompatibility();
    private final AtomicBoolean reconnectComplete = new AtomicBoolean(true);
    private final AtomicInteger invalidationIdGenerator = new AtomicInteger();
    private final ConcurrentMap<Integer, InvalidationHolder> clientsWaitingForInvalidation = new ConcurrentHashMap<Integer, InvalidationHolder>();
    private final ReconnectMessageCodec reconnectMessageCodec = new ReconnectMessageCodec();
    private final ClusterTierManagement management;
    private final String managerIdentifier;
    private final Object inflightInvalidationsMutex = new Object();
    private volatile List<InvalidationTuple> inflightInvalidations;
    private final Set<ClientDescriptor> eventListeners = new HashSet<ClientDescriptor>();
    private final Map<ClientDescriptor, Boolean> connectedClients = new ConcurrentHashMap<ClientDescriptor, Boolean>();
    private final Map<ClientDescriptor, Map<UUID, Iterator<Map.Entry<Long, Chain>>>> liveIterators = new ConcurrentHashMap<ClientDescriptor, Map<UUID, Iterator<Map.Entry<Long, Chain>>>>();
    private final int chainCompactionLimit;
    private final ServerLockManager lockManager;
    private final long dataSizeThreshold = Long.getLong("ehcache.sync.data.size.threshold", 0x200000L);
    private final int dataGetsThreshold = Integer.getInteger("ehcache.sync.data.gets.threshold", 8192);
    private volatile Integer dataMapInitialCapacity = null;

    public ClusterTierActiveEntity(ServiceRegistry registry, ClusterTierEntityConfiguration entityConfiguration, KeySegmentMapper defaultMapper) throws ConfigurationException {
        if (entityConfiguration == null) {
            throw new ConfigurationException("ClusteredStoreEntityConfiguration cannot be null");
        }
        this.storeIdentifier = entityConfiguration.getStoreIdentifier();
        this.configuration = entityConfiguration.getConfiguration();
        this.managerIdentifier = entityConfiguration.getManagerIdentifier();
        try {
            this.clientCommunicator = (ClientCommunicator)registry.getService((ServiceConfiguration)new CommunicatorServiceConfiguration());
            this.stateService = (EhcacheStateService)registry.getService((ServiceConfiguration)new EhcacheStoreStateServiceConfig(entityConfiguration.getManagerIdentifier(), defaultMapper));
            this.entityMessenger = (IEntityMessenger)registry.getService((ServiceConfiguration)new BasicServiceConfiguration(IEntityMessenger.class));
            this.messageHandler = (OOOMessageHandler)registry.getService((ServiceConfiguration)new OOOMessageHandlerConfiguration(this.managerIdentifier + "###" + this.storeIdentifier, ClusterTierActiveEntity::isTrackedMessage));
        }
        catch (ServiceException e) {
            throw new ConfigurationException("Unable to retrieve service: " + e.getMessage());
        }
        if (this.entityMessenger == null) {
            throw new AssertionError((Object)"Server failed to retrieve IEntityMessenger service.");
        }
        this.management = new ClusterTierManagement(registry, this.stateService, true, this.storeIdentifier, entityConfiguration.getManagerIdentifier());
        this.chainCompactionLimit = Integer.getInteger(CHAIN_COMPACTION_THRESHOLD_PROP, 8);
        this.lockManager = this.configuration.isLoaderWriterConfigured() ? new LockManagerImpl() : new NoopLockManager();
    }

    static boolean isTrackedMessage(EhcacheEntityMessage msg) {
        if (msg instanceof EhcacheOperationMessage) {
            return EhcacheMessageType.isTrackedOperationMessage((EhcacheMessageType)((EhcacheOperationMessage)msg).getMessageType());
        }
        return false;
    }

    public void addStateTo(StateDumpCollector dump) {
        ClusterTierDump.dump(dump, this.managerIdentifier, this.storeIdentifier, this.configuration);
        HashSet<ClientDescriptor> clients = new HashSet<ClientDescriptor>(this.getConnectedClients());
        ArrayList allClients = new ArrayList(clients.size());
        for (ClientDescriptor entry : clients) {
            HashMap<String, String> clientMap = new HashMap<String, String>(1);
            clientMap.put("clientDescriptor", entry.toString());
            allClients.add(clientMap);
        }
        dump.addState("clientCount", (Object)String.valueOf(clients.size()));
        dump.addState("clients", allClients);
    }

    public void createNew() throws ConfigurationException {
        ServerSideServerStore store = this.stateService.createStore(this.storeIdentifier, this.configuration, true);
        store.setEventListener((ServerStoreEventListener)new Listener());
        this.management.entityCreated();
    }

    List<InvalidationTuple> getInflightInvalidations() {
        return this.inflightInvalidations;
    }

    public void loadExisting() {
        this.inflightInvalidations = new ArrayList<InvalidationTuple>();
        if (!this.isStrong()) {
            LOGGER.debug("Preparing for handling inflight invalidations");
            this.addInflightInvalidationsForEventualCaches();
        }
        this.stateService.loadStore(this.storeIdentifier, this.configuration).setEventListener((ServerStoreEventListener)new Listener());
        this.reconnectComplete.set(false);
        this.management.entityPromotionCompleted();
    }

    public void connected(ClientDescriptor clientDescriptor) {
        this.connectedClients.put(clientDescriptor, Boolean.FALSE);
    }

    public void disconnected(ClientDescriptor clientDescriptor) {
        Set invalidationIds = this.clientsWaitingForInvalidation.keySet();
        for (Integer invalidationId : invalidationIds) {
            this.clientInvalidated(clientDescriptor, invalidationId);
        }
        Iterator it = this.clientsWaitingForInvalidation.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry next = it.next();
            ClientDescriptor clientDescriptorWaitingForInvalidation = ((InvalidationHolder)next.getValue()).clientDescriptorWaitingForInvalidation;
            if (clientDescriptorWaitingForInvalidation == null || !clientDescriptorWaitingForInvalidation.equals(clientDescriptor)) continue;
            it.remove();
        }
        this.lockManager.sweepLocksForClient(clientDescriptor, this.configuration.isWriteBehindConfigured() ? null : heldKeys -> heldKeys.forEach(arg_0 -> ((ServerSideServerStore)this.stateService.getStore(this.storeIdentifier)).remove(arg_0)));
        this.liveIterators.remove(clientDescriptor);
        this.removeEventListener(clientDescriptor, this.stateService.getStore(this.storeIdentifier));
        this.connectedClients.remove(clientDescriptor);
    }

    public EhcacheEntityResponse invokeActive(ActiveInvokeContext<EhcacheEntityResponse> context, EhcacheEntityMessage message) throws EntityUserException {
        return (EhcacheEntityResponse)this.messageHandler.invoke(context, (EntityMessage)message, this::invokeActiveInternal);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private EhcacheEntityResponse invokeActiveInternal(InvokeContext context, EhcacheEntityMessage message) {
        try {
            if (!(message instanceof EhcacheOperationMessage)) throw new AssertionError((Object)("Unsupported message : " + message.getClass()));
            EhcacheOperationMessage operationMessage = (EhcacheOperationMessage)message;
            try (EhcacheStateContext ignored = this.stateService.beginProcessing(operationMessage, this.storeIdentifier);){
                EhcacheMessageType messageType = operationMessage.getMessageType();
                if (EhcacheMessageType.isStoreOperationMessage((EhcacheMessageType)messageType)) {
                    EhcacheEntityResponse ehcacheEntityResponse = this.invokeServerStoreOperation(context, (ServerStoreOpMessage)message);
                    return ehcacheEntityResponse;
                }
                if (EhcacheMessageType.isLifecycleMessage((EhcacheMessageType)messageType)) {
                    EhcacheEntityResponse ehcacheEntityResponse = this.invokeLifeCycleOperation(context, (LifecycleMessage)message);
                    return ehcacheEntityResponse;
                }
                if (!EhcacheMessageType.isStateRepoOperationMessage((EhcacheMessageType)messageType)) throw new AssertionError((Object)("Unsupported message : " + message.getClass()));
                EhcacheEntityResponse ehcacheEntityResponse = this.invokeStateRepositoryOperation((StateRepositoryOpMessage)message);
                return ehcacheEntityResponse;
            }
        }
        catch (ClusterException e) {
            return EhcacheEntityResponse.failure((ClusterException)e);
        }
        catch (Exception e) {
            LOGGER.error("Unexpected exception raised during operation: " + message, (Throwable)e);
            return EhcacheEntityResponse.failure((ClusterException)new InvalidOperationException((Throwable)e));
        }
    }

    private EhcacheEntityResponse invokeStateRepositoryOperation(StateRepositoryOpMessage message) {
        return this.stateService.getStateRepositoryManager().invoke(message);
    }

    private EhcacheEntityResponse invokeLifeCycleOperation(InvokeContext context, LifecycleMessage message) throws ClusterException {
        ActiveInvokeContext activeInvokeContext = (ActiveInvokeContext)context;
        switch (message.getMessageType()) {
            case VALIDATE_SERVER_STORE: {
                this.validateServerStore(activeInvokeContext.getClientDescriptor(), (LifecycleMessage.ValidateServerStore)message);
                break;
            }
            default: {
                throw new AssertionError((Object)("Unsupported LifeCycle operation " + message));
            }
        }
        return EhcacheEntityResponse.success();
    }

    private void validateServerStore(ClientDescriptor clientDescriptor, LifecycleMessage.ValidateServerStore validateServerStore) throws ClusterException {
        ServerStoreConfiguration clientConfiguration = validateServerStore.getStoreConfiguration();
        LOGGER.info("Client {} validating cluster tier '{}'", (Object)clientDescriptor, (Object)this.storeIdentifier);
        ServerSideServerStore store = this.stateService.getStore(this.storeIdentifier);
        if (store == null) {
            throw new InvalidStoreException("cluster tier '" + this.storeIdentifier + "' does not exist");
        }
        this.storeCompatibility.verify(store.getStoreConfiguration(), clientConfiguration);
        this.connectedClients.put(clientDescriptor, Boolean.TRUE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private EhcacheEntityResponse invokeServerStoreOperation(InvokeContext context, ServerStoreOpMessage message) throws ClusterException {
        ActiveInvokeContext activeInvokeContext = (ActiveInvokeContext)context;
        ClientDescriptor clientDescriptor = activeInvokeContext.getClientDescriptor();
        ServerSideServerStore cacheStore = this.stateService.getStore(this.storeIdentifier);
        if (cacheStore == null) {
            throw new LifecycleException("cluster tier does not exist : '" + this.storeIdentifier + "'");
        }
        if (this.inflightInvalidations != null) {
            Object object = this.inflightInvalidationsMutex;
            synchronized (object) {
                if (this.inflightInvalidations != null) {
                    List<InvalidationTuple> tmpInflightInvalidations = this.inflightInvalidations;
                    this.inflightInvalidations = null;
                    LOGGER.debug("Stalling all operations for cluster tier {} for firing inflight invalidations again.", (Object)this.storeIdentifier);
                    tmpInflightInvalidations.forEach(invalidationState -> {
                        if (invalidationState.isClearInProgress()) {
                            this.invalidateAll(invalidationState.getClientDescriptor());
                        }
                        invalidationState.getInvalidationsInProgress().forEach(hashInvalidationToBeResent -> this.invalidateHashForClient(invalidationState.getClientDescriptor(), (long)hashInvalidationToBeResent));
                    });
                }
            }
        }
        switch (message.getMessageType()) {
            case GET_STORE: {
                ServerStoreOpMessage.GetMessage getMessage = (ServerStoreOpMessage.GetMessage)message;
                try {
                    return EhcacheEntityResponse.getResponse((Chain)cacheStore.get(getMessage.getKey()));
                }
                catch (TimeoutException e) {
                    throw new AssertionError("Server side store is not expected to throw timeout exception", e);
                }
            }
            case APPEND: {
                Chain newChain;
                ServerStoreOpMessage.AppendMessage appendMessage = (ServerStoreOpMessage.AppendMessage)message;
                long key = appendMessage.getKey();
                InvalidationTracker invalidationTracker = this.stateService.getInvalidationTracker(this.storeIdentifier);
                if (invalidationTracker != null) {
                    invalidationTracker.trackHashInvalidation(key);
                }
                try {
                    cacheStore.append(key, appendMessage.getPayload());
                    newChain = cacheStore.get(key);
                }
                catch (TimeoutException e) {
                    throw new AssertionError("Server side store is not expected to throw timeout exception", e);
                }
                this.sendMessageToSelfAndDeferRetirement((ActiveInvokeContext<EhcacheEntityResponse>)activeInvokeContext, (ServerStoreOpMessage.KeyBasedServerStoreOpMessage)appendMessage, newChain);
                this.invalidateHashForClient(clientDescriptor, key);
                if (newChain.length() > this.chainCompactionLimit) {
                    this.requestChainResolution(clientDescriptor, key, newChain);
                }
                if (!this.configuration.isWriteBehindConfigured()) {
                    this.lockManager.unlock(key);
                }
                return EhcacheEntityResponse.success();
            }
            case GET_AND_APPEND: {
                Chain newChain;
                Chain result;
                ServerStoreOpMessage.GetAndAppendMessage getAndAppendMessage = (ServerStoreOpMessage.GetAndAppendMessage)message;
                LOGGER.trace("Message {} : GET_AND_APPEND on key {} from client {}", new Object[]{message, getAndAppendMessage.getKey(), context.getClientSource().toLong()});
                InvalidationTracker invalidationTracker = this.stateService.getInvalidationTracker(this.storeIdentifier);
                if (invalidationTracker != null) {
                    invalidationTracker.trackHashInvalidation(getAndAppendMessage.getKey());
                }
                try {
                    result = cacheStore.getAndAppend(getAndAppendMessage.getKey(), getAndAppendMessage.getPayload());
                    newChain = cacheStore.get(getAndAppendMessage.getKey());
                }
                catch (TimeoutException e) {
                    throw new AssertionError("Server side store is not expected to throw timeout exception", e);
                }
                this.sendMessageToSelfAndDeferRetirement((ActiveInvokeContext<EhcacheEntityResponse>)activeInvokeContext, (ServerStoreOpMessage.KeyBasedServerStoreOpMessage)getAndAppendMessage, newChain);
                LOGGER.debug("Send invalidations for key {}", (Object)getAndAppendMessage.getKey());
                this.invalidateHashForClient(clientDescriptor, getAndAppendMessage.getKey());
                return EhcacheEntityResponse.getResponse((Chain)result);
            }
            case REPLACE: {
                ServerStoreOpMessage.ReplaceAtHeadMessage replaceAtHeadMessage = (ServerStoreOpMessage.ReplaceAtHeadMessage)message;
                cacheStore.replaceAtHead(replaceAtHeadMessage.getKey(), replaceAtHeadMessage.getExpect(), replaceAtHeadMessage.getUpdate());
                return EhcacheEntityResponse.success();
            }
            case CLIENT_INVALIDATION_ACK: {
                ServerStoreOpMessage.ClientInvalidationAck clientInvalidationAck = (ServerStoreOpMessage.ClientInvalidationAck)message;
                int invalidationId = clientInvalidationAck.getInvalidationId();
                LOGGER.debug("SERVER: got notification of invalidation ack in cache {} from {} (ID {})", new Object[]{this.storeIdentifier, clientDescriptor, invalidationId});
                this.clientInvalidated(clientDescriptor, invalidationId);
                return EhcacheEntityResponse.success();
            }
            case CLIENT_INVALIDATION_ALL_ACK: {
                ServerStoreOpMessage.ClientInvalidationAllAck clientInvalidationAllAck = (ServerStoreOpMessage.ClientInvalidationAllAck)message;
                int invalidationId = clientInvalidationAllAck.getInvalidationId();
                LOGGER.debug("SERVER: got notification of invalidation ack in cache {} from {} (ID {})", new Object[]{this.storeIdentifier, clientDescriptor, invalidationId});
                this.clientInvalidated(clientDescriptor, invalidationId);
                return EhcacheEntityResponse.success();
            }
            case CLEAR: {
                LOGGER.info("Clearing cluster tier {}", (Object)this.storeIdentifier);
                try {
                    cacheStore.clear();
                }
                catch (TimeoutException e) {
                    throw new AssertionError("Server side store is not expected to throw timeout exception", e);
                }
                InvalidationTracker invalidationTracker = this.stateService.getInvalidationTracker(this.storeIdentifier);
                if (invalidationTracker != null) {
                    invalidationTracker.setClearInProgress(true);
                }
                this.invalidateAll(clientDescriptor);
                return EhcacheEntityResponse.success();
            }
            case LOCK: {
                ServerStoreOpMessage.LockMessage lockMessage = (ServerStoreOpMessage.LockMessage)message;
                if (this.lockManager.lock(lockMessage.getHash(), activeInvokeContext.getClientDescriptor())) {
                    try {
                        Chain chain = cacheStore.get(lockMessage.getHash());
                        return EhcacheEntityResponse.lockSuccess((Chain)chain);
                    }
                    catch (TimeoutException e) {
                        throw new AssertionError("Server side store is not expected to throw timeout exception", e);
                    }
                }
                return EhcacheEntityResponse.lockFailure();
            }
            case UNLOCK: {
                ServerStoreOpMessage.UnlockMessage unlockMessage = (ServerStoreOpMessage.UnlockMessage)message;
                this.lockManager.unlock(unlockMessage.getHash());
                return EhcacheEntityResponse.success();
            }
            case ITERATOR_OPEN: {
                ServerStoreOpMessage.IteratorOpenMessage iteratorOpenMessage = (ServerStoreOpMessage.IteratorOpenMessage)message;
                try {
                    Iterator iterator = cacheStore.iterator();
                    List<Map.Entry<Long, Chain>> batch = this.iteratorBatch(iterator, iteratorOpenMessage.getBatchSize());
                    if (iterator.hasNext()) {
                        UUID id;
                        Map liveIterators = this.liveIterators.computeIfAbsent(clientDescriptor, client -> new ConcurrentHashMap());
                        while (liveIterators.putIfAbsent(id = UUID.randomUUID(), iterator) != null) {
                        }
                        return EhcacheEntityResponse.iteratorBatchResponse((UUID)id, batch, (boolean)false);
                    }
                    return EhcacheEntityResponse.iteratorBatchResponse((UUID)UUID.randomUUID(), batch, (boolean)true);
                }
                catch (TimeoutException e) {
                    throw new AssertionError("Server side store is not expected to throw timeout exception", e);
                }
            }
            case ITERATOR_CLOSE: {
                ServerStoreOpMessage.IteratorCloseMessage iteratorCloseMessage = (ServerStoreOpMessage.IteratorCloseMessage)message;
                this.liveIterators.computeIfPresent(clientDescriptor, (client, iterators) -> {
                    iterators.remove(iteratorCloseMessage.getIdentity());
                    if (iterators.isEmpty()) {
                        return null;
                    }
                    return iterators;
                });
                return EhcacheEntityResponse.success();
            }
            case ITERATOR_ADVANCE: {
                ServerStoreOpMessage.IteratorAdvanceMessage iteratorAdvanceMessage = (ServerStoreOpMessage.IteratorAdvanceMessage)message;
                UUID id = iteratorAdvanceMessage.getIdentity();
                Iterator iterator = (Iterator)this.liveIterators.getOrDefault(clientDescriptor, Collections.emptyMap()).get(id);
                if (iterator == null) {
                    return EhcacheEntityResponse.failure((ClusterException)new InvalidOperationException("Referenced iterator is already closed (or never existed)"));
                }
                List<Map.Entry<Long, Chain>> batch = this.iteratorBatch(iterator, iteratorAdvanceMessage.getBatchSize());
                if (iterator.hasNext()) {
                    return EhcacheEntityResponse.iteratorBatchResponse((UUID)id, batch, (boolean)false);
                }
                this.liveIterators.computeIfPresent(clientDescriptor, (client, iterators) -> {
                    iterators.remove(id);
                    return iterators.isEmpty() ? null : iterators;
                });
                return EhcacheEntityResponse.iteratorBatchResponse((UUID)id, batch, (boolean)true);
            }
            case ENABLE_EVENT_LISTENER: {
                ServerStoreOpMessage.EnableEventListenerMessage enableEventListenerMessage = (ServerStoreOpMessage.EnableEventListenerMessage)message;
                if (enableEventListenerMessage.isEnable()) {
                    this.addEventListener(clientDescriptor, cacheStore);
                } else {
                    this.removeEventListener(clientDescriptor, cacheStore);
                }
                return EhcacheEntityResponse.success();
            }
        }
        throw new AssertionError((Object)("Unsupported ServerStore operation : " + message));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addEventListener(ClientDescriptor clientDescriptor, ServerSideServerStore cacheStore) {
        Set<ClientDescriptor> set = this.eventListeners;
        synchronized (set) {
            if (this.eventListeners.add(clientDescriptor)) {
                cacheStore.enableEvents(true);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeEventListener(ClientDescriptor clientDescriptor, ServerSideServerStore cacheStore) {
        Set<ClientDescriptor> set = this.eventListeners;
        synchronized (set) {
            if (this.eventListeners.remove(clientDescriptor) && this.eventListeners.isEmpty()) {
                cacheStore.enableEvents(false);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isEventsEnabledFor(ClientDescriptor clientDescriptor) {
        Set<ClientDescriptor> set = this.eventListeners;
        synchronized (set) {
            return this.eventListeners.contains(clientDescriptor);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Set<ClientDescriptor> getEventListeners() {
        Set<ClientDescriptor> set = this.eventListeners;
        synchronized (set) {
            return new HashSet<ClientDescriptor>(this.eventListeners);
        }
    }

    private List<Map.Entry<Long, Chain>> iteratorBatch(Iterator<Map.Entry<Long, Chain>> iterator, int batchSize) {
        ArrayList<Map.Entry<Long, Chain>> chains = new ArrayList<Map.Entry<Long, Chain>>();
        int size = 0;
        while (iterator.hasNext() && size < batchSize && size >= 0) {
            Map.Entry<Long, Chain> nextChain = iterator.next();
            chains.add(new AbstractMap.SimpleImmutableEntry<Long, Chain>(nextChain.getKey(), nextChain.getValue()));
            for (Element e : nextChain.getValue()) {
                size += e.getPayload().remaining();
            }
        }
        return chains;
    }

    private void invalidateAll(ClientDescriptor originatingClientDescriptor) {
        int invalidationId = this.invalidationIdGenerator.getAndIncrement();
        HashSet<ClientDescriptor> clientsToInvalidate = new HashSet<ClientDescriptor>(this.getValidatedClients());
        if (originatingClientDescriptor != null) {
            clientsToInvalidate.remove(originatingClientDescriptor);
        }
        InvalidationHolder invalidationHolder = new InvalidationHolder(originatingClientDescriptor, clientsToInvalidate);
        this.clientsWaitingForInvalidation.put(invalidationId, invalidationHolder);
        LOGGER.debug("SERVER: requesting {} client(s) invalidation of all in cache {} (ID {})", new Object[]{clientsToInvalidate.size(), this.storeIdentifier, invalidationId});
        for (ClientDescriptor clientDescriptorThatHasToInvalidate : clientsToInvalidate) {
            LOGGER.debug("SERVER: asking client {} to invalidate all from cache {} (ID {})", new Object[]{clientDescriptorThatHasToInvalidate, this.storeIdentifier, invalidationId});
            try {
                this.clientCommunicator.sendNoResponse(clientDescriptorThatHasToInvalidate, (EntityResponse)EhcacheEntityResponse.clientInvalidateAll((int)invalidationId));
            }
            catch (MessageCodecException mce) {
                throw new AssertionError("Codec error", mce);
            }
        }
        if (clientsToInvalidate.isEmpty()) {
            this.clientInvalidated(invalidationHolder.clientDescriptorWaitingForInvalidation, invalidationId);
        }
    }

    private void clientInvalidated(ClientDescriptor clientDescriptor, int invalidationId) {
        InvalidationHolder invalidationHolder = (InvalidationHolder)this.clientsWaitingForInvalidation.get(invalidationId);
        if (invalidationHolder == null) {
            LOGGER.debug("Ignoring invalidation from client {} " + clientDescriptor);
            return;
        }
        invalidationHolder.clientsHavingToInvalidate.remove(clientDescriptor);
        if (invalidationHolder.clientsHavingToInvalidate.isEmpty() && this.clientsWaitingForInvalidation.remove(invalidationId) != null) {
            try {
                Long key = invalidationHolder.key;
                if (key == null) {
                    if (this.isStrong()) {
                        this.clientCommunicator.sendNoResponse(invalidationHolder.clientDescriptorWaitingForInvalidation, (EntityResponse)EhcacheEntityResponse.allInvalidationDone());
                        LOGGER.debug("SERVER: notifying originating client that all other clients invalidated all in cache {} from {} (ID {})", new Object[]{this.storeIdentifier, clientDescriptor, invalidationId});
                    } else {
                        this.entityMessenger.messageSelf((EntityMessage)new PassiveReplicationMessage.ClearInvalidationCompleteMessage());
                        InvalidationTracker invalidationTracker = this.stateService.getInvalidationTracker(this.storeIdentifier);
                        if (invalidationTracker != null) {
                            invalidationTracker.setClearInProgress(false);
                        }
                    }
                } else if (this.isStrong()) {
                    this.clientCommunicator.sendNoResponse(invalidationHolder.clientDescriptorWaitingForInvalidation, (EntityResponse)EhcacheEntityResponse.hashInvalidationDone((long)key));
                    LOGGER.debug("SERVER: notifying originating client that all other clients invalidated key {} in cache {} from {} (ID {})", new Object[]{key, this.storeIdentifier, clientDescriptor, invalidationId});
                } else {
                    this.entityMessenger.messageSelf((EntityMessage)new PassiveReplicationMessage.InvalidationCompleteMessage(key));
                    InvalidationTracker invalidationTracker = this.stateService.getInvalidationTracker(this.storeIdentifier);
                    if (invalidationTracker != null) {
                        invalidationTracker.untrackHashInvalidation(key.longValue());
                    }
                }
            }
            catch (MessageCodecException mce) {
                throw new AssertionError("Codec error", mce);
            }
        }
    }

    private void invalidateHashForClient(ClientDescriptor originatingClientDescriptor, long key) {
        int invalidationId = this.invalidationIdGenerator.getAndIncrement();
        Set<ClientDescriptor> validatedClients = this.getValidatedClients();
        ConcurrentHashMap.KeySetView clientsToInvalidate = ConcurrentHashMap.newKeySet(validatedClients.size());
        clientsToInvalidate.addAll(validatedClients);
        if (originatingClientDescriptor != null) {
            clientsToInvalidate.remove(originatingClientDescriptor);
        }
        InvalidationHolder invalidationHolder = new InvalidationHolder(originatingClientDescriptor, clientsToInvalidate, key);
        this.clientsWaitingForInvalidation.put(invalidationId, invalidationHolder);
        LOGGER.debug("SERVER: requesting {} client(s) invalidation of hash {} in cache {} (ID {})", new Object[]{clientsToInvalidate.size(), key, this.storeIdentifier, invalidationId});
        for (ClientDescriptor clientDescriptorThatHasToInvalidate : clientsToInvalidate) {
            LOGGER.debug("SERVER: asking client {} to invalidate hash {} from cache {} (ID {})", new Object[]{clientDescriptorThatHasToInvalidate, key, this.storeIdentifier, invalidationId});
            try {
                this.clientCommunicator.sendNoResponse(clientDescriptorThatHasToInvalidate, (EntityResponse)EhcacheEntityResponse.clientInvalidateHash((long)key, (int)invalidationId));
            }
            catch (MessageCodecException mce) {
                throw new AssertionError("Codec error", mce);
            }
        }
        if (clientsToInvalidate.isEmpty()) {
            this.clientInvalidated(invalidationHolder.clientDescriptorWaitingForInvalidation, invalidationId);
        }
    }

    private void requestChainResolution(ClientDescriptor clientDescriptor, long key, Chain chain) {
        try {
            this.clientCommunicator.sendNoResponse(clientDescriptor, (EntityResponse)EhcacheEntityResponse.resolveRequest((long)key, (Chain)chain));
        }
        catch (MessageCodecException e) {
            throw new AssertionError("Codec error", e);
        }
    }

    private void sendMessageToSelfAndDeferRetirement(ActiveInvokeContext<EhcacheEntityResponse> context, ServerStoreOpMessage.KeyBasedServerStoreOpMessage message, Chain newChain) {
        try {
            long clientId = context.getClientSource().toLong();
            this.entityMessenger.messageSelfAndDeferRetirement((EntityMessage)message, (EntityMessage)new PassiveReplicationMessage.ChainReplicationMessage(message.getKey(), newChain, context.getCurrentTransactionId(), context.getOldestTransactionId(), clientId));
        }
        catch (MessageCodecException e) {
            throw new AssertionError("Codec error", e);
        }
    }

    private void addInflightInvalidationsForEventualCaches() {
        InvalidationTracker invalidationTracker = this.stateService.getInvalidationTracker(this.storeIdentifier);
        if (invalidationTracker != null) {
            this.inflightInvalidations.add(new InvalidationTuple(null, invalidationTracker.getTrackedKeys(), invalidationTracker.isClearInProgress()));
            invalidationTracker.clear();
        }
    }

    public void notifyDestroyed(ClientSourceId sourceId) {
        this.messageHandler.untrackClient(sourceId);
    }

    public ActiveServerEntity.ReconnectHandler startReconnect() {
        try {
            this.entityMessenger.messageSelf((EntityMessage)new EhcacheMessageTrackerCatchup(this.messageHandler.getRecordedMessages().filter(m -> m.getRequest() != null).collect(Collectors.toList())));
        }
        catch (MessageCodecException mce) {
            throw new AssertionError("Codec error", mce);
        }
        return (clientDescriptor, bytes) -> {
            if (this.inflightInvalidations == null) {
                throw new AssertionError((Object)"Load existing was not invoked before handleReconnect");
            }
            ClusterTierReconnectMessage reconnectMessage = this.reconnectMessageCodec.decode(bytes);
            ServerSideServerStore serverStore = this.stateService.getStore(this.storeIdentifier);
            this.addInflightInvalidationsForStrongCache(clientDescriptor, reconnectMessage, serverStore);
            this.lockManager.createLockStateAfterFailover(clientDescriptor, reconnectMessage.getLocksHeld());
            if (reconnectMessage.isEventsEnabled()) {
                this.addEventListener(clientDescriptor, serverStore);
            }
            LOGGER.info("Client '{}' successfully reconnected to newly promoted ACTIVE after failover.", (Object)clientDescriptor);
            this.connectedClients.put(clientDescriptor, Boolean.TRUE);
        };
    }

    private void addInflightInvalidationsForStrongCache(ClientDescriptor clientDescriptor, ClusterTierReconnectMessage reconnectMessage, ServerSideServerStore serverStore) {
        if (serverStore.getStoreConfiguration().getConsistency().equals((Object)Consistency.STRONG)) {
            Set invalidationsInProgress = reconnectMessage.getInvalidationsInProgress();
            LOGGER.debug("Number of Inflight Invalidations from client ID {} for cache {} is {}.", new Object[]{clientDescriptor.getSourceId().toLong(), this.storeIdentifier, invalidationsInProgress.size()});
            this.inflightInvalidations.add(new InvalidationTuple(clientDescriptor, invalidationsInProgress, reconnectMessage.isClearInProgress()));
        }
    }

    public void synchronizeKeyToPassive(PassiveSynchronizationChannel<EhcacheEntityMessage> syncChannel, int concurrencyKey) {
        LOGGER.info("Sync started for concurrency key {}.", (Object)concurrencyKey);
        if (concurrencyKey == 1) {
            this.stateService.getStateRepositoryManager().syncMessageFor(this.storeIdentifier).forEach(arg_0 -> syncChannel.synchronizeToPassive(arg_0));
        } else if (concurrencyKey == 0x7FFFFFFE) {
            this.sendMessageTrackerReplication(syncChannel);
        } else {
            boolean interrupted = false;
            SynchronousQueue messageQ = new SynchronousQueue();
            int segmentId = concurrencyKey - 1 - 1;
            Thread thisThread = Thread.currentThread();
            CompletableFuture<Void> asyncGet = CompletableFuture.runAsync(() -> this.doGetsForSync(segmentId, messageQ, syncChannel, thisThread), SYNC_GETS_EXECUTOR);
            try {
                try {
                    while (((DataSyncMessageHandler)messageQ.take()).execute()) {
                    }
                }
                catch (InterruptedException e) {
                    interrupted = true;
                }
                if (interrupted) {
                    asyncGet.get(10L, TimeUnit.SECONDS);
                    throw new InterruptedException();
                }
                asyncGet.get();
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw new RuntimeException(e);
            }
        }
        LOGGER.info("Sync complete for concurrency key {}.", (Object)concurrencyKey);
    }

    private void doGetsForSync(int segmentId, BlockingQueue<DataSyncMessageHandler> messageQ, PassiveSynchronizationChannel<EhcacheEntityMessage> syncChannel, Thread waitingThread) {
        int numKeyGets = 0;
        long dataSize = 0L;
        try {
            ServerSideServerStore store = this.stateService.getStore(this.storeIdentifier);
            Set keys = (Set)store.getSegmentKeySets().get(segmentId);
            int remainingKeys = keys.size();
            HashMap<Long, Object> mappingsToSend = new HashMap<Long, Chain>(this.computeInitialMapCapacity(remainingKeys));
            boolean capacityAdjusted = false;
            for (Long key : keys) {
                Chain chain;
                try {
                    chain = store.get(key.longValue());
                    if (chain.isEmpty()) {
                        --remainingKeys;
                        continue;
                    }
                    ++numKeyGets;
                }
                catch (TimeoutException e) {
                    throw new AssertionError((Object)"Server side store is not expected to throw timeout exception");
                }
                for (Element element : chain) {
                    dataSize += (long)element.getPayload().remaining();
                }
                mappingsToSend.put(key, chain);
                if (dataSize <= this.dataSizeThreshold && numKeyGets < this.dataGetsThreshold) continue;
                this.putMessage(messageQ, syncChannel, mappingsToSend);
                if (!capacityAdjusted && segmentId == 0) {
                    capacityAdjusted = true;
                    this.adjustInitialCapacity(numKeyGets);
                }
                mappingsToSend = new HashMap(this.computeMapCapacity(remainingKeys -= numKeyGets, numKeyGets));
                dataSize = 0L;
                numKeyGets = 0;
            }
            if (!mappingsToSend.isEmpty()) {
                this.putMessage(messageQ, syncChannel, mappingsToSend);
            }
            this.putMessage(messageQ, null, null);
        }
        catch (Throwable e) {
            waitingThread.interrupt();
            throw e;
        }
    }

    private void putMessage(BlockingQueue<DataSyncMessageHandler> messageQ, PassiveSynchronizationChannel<EhcacheEntityMessage> syncChannel, Map<Long, Chain> mappingsToSend) {
        try {
            if (syncChannel != null) {
                EhcacheDataSyncMessage msg = new EhcacheDataSyncMessage(mappingsToSend);
                messageQ.put(() -> {
                    syncChannel.synchronizeToPassive((EntityMessage)msg);
                    return true;
                });
            } else {
                messageQ.put(() -> false);
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private int computeMapCapacity(int remainingSize, int expectedGets) {
        if (remainingSize < 16) {
            return 16;
        }
        if (expectedGets < 32) {
            return 32;
        }
        if (remainingSize < expectedGets) {
            return (int)((float)remainingSize / 0.75f + 1.0f);
        }
        return (int)((float)expectedGets / 0.75f + 1.0f);
    }

    private void adjustInitialCapacity(int actualKeyGets) {
        this.dataMapInitialCapacity = actualKeyGets < 32 ? 32 : (int)((float)actualKeyGets / 0.75f + 1.0f);
    }

    private int computeInitialMapCapacity(int totalKeys) {
        if (this.dataMapInitialCapacity == null) {
            this.dataMapInitialCapacity = (int)((float)this.dataGetsThreshold / 0.75f + 1.0f);
        }
        if (totalKeys < 16) {
            return 16;
        }
        if (totalKeys < this.dataMapInitialCapacity) {
            return (int)((float)totalKeys / 0.75f + 1.0f);
        }
        return this.dataMapInitialCapacity;
    }

    private void sendMessageTrackerReplication(PassiveSynchronizationChannel<EhcacheEntityMessage> syncChannel) {
        Map<Long, Map<Long, EhcacheEntityResponse>> clientSourceIdTrackingMap = this.messageHandler.getTrackedClients().collect(Collectors.toMap(ClientSourceId::toLong, clientSourceId -> this.messageHandler.getRecordedMessages().filter(r -> r.getClientSourceId().toLong() == clientSourceId.toLong()).collect(Collectors.toMap(rm -> rm.getTransactionId(), rm -> (EhcacheEntityResponse)rm.getResponse()))));
        if (!clientSourceIdTrackingMap.isEmpty()) {
            syncChannel.synchronizeToPassive((EntityMessage)new EhcacheMessageTrackerMessage(clientSourceIdTrackingMap));
        }
    }

    public void destroy() {
        LOGGER.info("Destroying cluster tier '{}'", (Object)this.storeIdentifier);
        try {
            this.stateService.destroyServerStore(this.storeIdentifier);
        }
        catch (ClusterException e) {
            LOGGER.error("Failed to destroy server store - does not exist", (Throwable)e);
        }
        this.messageHandler.destroy();
        this.management.close();
    }

    protected Set<ClientDescriptor> getConnectedClients() {
        return this.connectedClients.keySet();
    }

    protected Set<ClientDescriptor> getValidatedClients() {
        return this.connectedClients.entrySet().stream().filter(Map.Entry::getValue).map(Map.Entry::getKey).collect(Collectors.toSet());
    }

    ConcurrentMap<Integer, InvalidationHolder> getClientsWaitingForInvalidation() {
        return this.clientsWaitingForInvalidation;
    }

    private boolean isStrong() {
        return this.configuration.getConsistency() == Consistency.STRONG;
    }

    private static class InvalidationTuple {
        private final ClientDescriptor clientDescriptor;
        private final Set<Long> invalidationsInProgress;
        private final boolean isClearInProgress;

        InvalidationTuple(ClientDescriptor clientDescriptor, Set<Long> invalidationsInProgress, boolean isClearInProgress) {
            this.clientDescriptor = clientDescriptor;
            this.invalidationsInProgress = invalidationsInProgress;
            this.isClearInProgress = isClearInProgress;
        }

        ClientDescriptor getClientDescriptor() {
            return this.clientDescriptor;
        }

        Set<Long> getInvalidationsInProgress() {
            return this.invalidationsInProgress;
        }

        boolean isClearInProgress() {
            return this.isClearInProgress;
        }
    }

    static class InvalidationHolder {
        final ClientDescriptor clientDescriptorWaitingForInvalidation;
        final Set<ClientDescriptor> clientsHavingToInvalidate;
        final Long key;

        InvalidationHolder(ClientDescriptor clientDescriptorWaitingForInvalidation, Set<ClientDescriptor> clientsHavingToInvalidate, Long key) {
            this.clientDescriptorWaitingForInvalidation = clientDescriptorWaitingForInvalidation;
            this.clientsHavingToInvalidate = clientsHavingToInvalidate;
            this.key = key;
        }

        InvalidationHolder(ClientDescriptor clientDescriptorWaitingForInvalidation, Set<ClientDescriptor> clientsHavingToInvalidate) {
            this(clientDescriptorWaitingForInvalidation, clientsHavingToInvalidate, null);
        }
    }

    @FunctionalInterface
    private static interface DataSyncMessageHandler {
        public boolean execute();
    }

    private class Listener
    implements ServerStoreEventListener {
        private Listener() {
        }

        public void onAppend(Chain beforeAppend, ByteBuffer appended) {
            HashSet<ClientDescriptor> clients = new HashSet<ClientDescriptor>(ClusterTierActiveEntity.this.getValidatedClients());
            for (ClientDescriptor clientDescriptor : clients) {
                LOGGER.debug("SERVER: append happened in cache {}; notifying client {} ", (Object)ClusterTierActiveEntity.this.storeIdentifier, (Object)clientDescriptor);
                try {
                    ClusterTierActiveEntity.this.clientCommunicator.sendNoResponse(clientDescriptor, (EntityResponse)EhcacheEntityResponse.serverAppend((ByteBuffer)appended.duplicate(), (Chain)beforeAppend));
                }
                catch (MessageCodecException mce) {
                    throw new AssertionError("Codec error", mce);
                }
            }
        }

        public void onEviction(long key, InternalChain evictedChain) {
            HashSet<ClientDescriptor> clientsToInvalidate = new HashSet<ClientDescriptor>(ClusterTierActiveEntity.this.getValidatedClients());
            if (!clientsToInvalidate.isEmpty()) {
                Chain detachedChain = evictedChain.detach();
                for (ClientDescriptor clientDescriptorThatHasToInvalidate : clientsToInvalidate) {
                    LOGGER.debug("SERVER: eviction happened; asking client {} to invalidate hash {} from cache {}", new Object[]{clientDescriptorThatHasToInvalidate, key, ClusterTierActiveEntity.this.storeIdentifier});
                    try {
                        boolean eventsEnabledForClient = ClusterTierActiveEntity.this.isEventsEnabledFor(clientDescriptorThatHasToInvalidate);
                        ClusterTierActiveEntity.this.clientCommunicator.sendNoResponse(clientDescriptorThatHasToInvalidate, (EntityResponse)EhcacheEntityResponse.serverInvalidateHash((long)key, (Chain)(eventsEnabledForClient ? detachedChain : null)));
                    }
                    catch (MessageCodecException mce) {
                        throw new AssertionError("Codec error", mce);
                    }
                }
            }
        }
    }
}

