/*
 * Decompiled with CFR 0.152.
 */
package org.ehcache.clustered.client.internal.store;

import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import org.ehcache.clustered.client.config.TimeoutDuration;
import org.ehcache.clustered.client.internal.Timeouts;
import org.ehcache.clustered.client.internal.store.ClusterTierClientEntity;
import org.ehcache.clustered.client.internal.store.CommonServerStoreProxy;
import org.ehcache.clustered.client.internal.store.ServerStoreProxy;
import org.ehcache.clustered.common.internal.messages.ClusterTierReconnectMessage;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
import org.ehcache.clustered.common.internal.store.Chain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StrongServerStoreProxy
implements ServerStoreProxy {
    private static final Logger LOGGER = LoggerFactory.getLogger(StrongServerStoreProxy.class);
    private final CommonServerStoreProxy delegate;
    private final ConcurrentMap<Long, CountDownLatch> hashInvalidationsInProgress = new ConcurrentHashMap<Long, CountDownLatch>();
    private final AtomicReference<CountDownLatch> invalidateAllLatch = new AtomicReference();
    private final ClusterTierClientEntity entity;

    public StrongServerStoreProxy(String cacheId, ClusterTierClientEntity entity, ServerStoreProxy.ServerCallback invalidation) {
        this.delegate = new CommonServerStoreProxy(cacheId, entity, invalidation);
        this.entity = entity;
        this.delegate.addResponseListener(EhcacheEntityResponse.HashInvalidationDone.class, this::hashInvalidationDoneResponseListener);
        this.delegate.addResponseListener(EhcacheEntityResponse.AllInvalidationDone.class, this::allInvalidationDoneResponseListener);
        entity.setReconnectListener(this::reconnectListener);
        entity.setDisconnectionListener(this::disconnectionListener);
    }

    private void disconnectionListener() {
        for (CountDownLatch latch : this.hashInvalidationsInProgress.values()) {
            latch.countDown();
        }
        this.hashInvalidationsInProgress.clear();
        CountDownLatch countDownLatch = this.invalidateAllLatch.get();
        if (countDownLatch != null) {
            countDownLatch.countDown();
        }
    }

    private void allInvalidationDoneResponseListener(EhcacheEntityResponse.AllInvalidationDone response) {
        LOGGER.debug("CLIENT: on cache {}, server notified that clients invalidated all", (Object)this.getCacheId());
        CountDownLatch countDownLatch = this.invalidateAllLatch.getAndSet(null);
        if (countDownLatch != null) {
            LOGGER.debug("CLIENT: on cache {}, count down", (Object)this.getCacheId());
            countDownLatch.countDown();
        }
    }

    private void hashInvalidationDoneResponseListener(EhcacheEntityResponse.HashInvalidationDone response) {
        long key = response.getKey();
        LOGGER.debug("CLIENT: on cache {}, server notified that clients invalidated hash {}", (Object)this.getCacheId(), (Object)key);
        CountDownLatch countDownLatch = (CountDownLatch)this.hashInvalidationsInProgress.remove(key);
        if (countDownLatch != null) {
            countDownLatch.countDown();
        }
    }

    private void reconnectListener(ClusterTierReconnectMessage reconnectMessage) {
        Set<Long> inflightInvalidations = this.hashInvalidationsInProgress.keySet();
        reconnectMessage.addInvalidationsInProgress(inflightInvalidations);
        if (this.invalidateAllLatch.get() != null) {
            reconnectMessage.clearInProgress();
        }
    }

    private <T> T performWaitingForHashInvalidation(long key, Callable<T> c, TimeoutDuration timeout) throws TimeoutException {
        LongSupplier nanosRemaining = Timeouts.nanosStartingFromNow(timeout);
        CountDownLatch latch = new CountDownLatch(1);
        while (true) {
            if (!this.entity.isConnected()) {
                throw new IllegalStateException("Cluster tier manager disconnected");
            }
            CountDownLatch countDownLatch = this.hashInvalidationsInProgress.putIfAbsent(key, latch);
            if (countDownLatch == null) break;
            this.awaitOnLatch(countDownLatch, nanosRemaining);
        }
        try {
            T result = c.call();
            LOGGER.debug("CLIENT: Waiting for invalidations on key {}", (Object)key);
            this.awaitOnLatch(latch, nanosRemaining);
            LOGGER.debug("CLIENT: key {} invalidated on all clients, unblocking call", (Object)key);
            return result;
        }
        catch (Exception ex) {
            this.hashInvalidationsInProgress.remove(key);
            latch.countDown();
            if (ex instanceof TimeoutException) {
                throw (TimeoutException)ex;
            }
            throw new RuntimeException(ex);
        }
    }

    private <T> T performWaitingForAllInvalidation(Callable<T> c, TimeoutDuration timeout) throws TimeoutException {
        LongSupplier nanosRemaining = Timeouts.nanosStartingFromNow(timeout);
        CountDownLatch newLatch = new CountDownLatch(1);
        while (true) {
            if (!this.entity.isConnected()) {
                throw new IllegalStateException("Cluster tier manager disconnected");
            }
            if (this.invalidateAllLatch.compareAndSet(null, newLatch)) break;
            CountDownLatch existingLatch = this.invalidateAllLatch.get();
            if (existingLatch == null) continue;
            this.awaitOnLatch(existingLatch, nanosRemaining);
        }
        try {
            T result = c.call();
            this.awaitOnLatch(newLatch, nanosRemaining);
            LOGGER.debug("CLIENT: all invalidated on all clients, unblocking call");
            return result;
        }
        catch (Exception ex) {
            this.invalidateAllLatch.set(null);
            newLatch.countDown();
            if (ex instanceof TimeoutException) {
                throw (TimeoutException)ex;
            }
            throw new RuntimeException(ex);
        }
    }

    private void awaitOnLatch(CountDownLatch countDownLatch, LongSupplier nanosRemaining) throws TimeoutException {
        boolean interrupted = Thread.interrupted();
        while (true) {
            block9: {
                if (!countDownLatch.await(nanosRemaining.getAsLong(), TimeUnit.NANOSECONDS)) break block9;
                if (!this.entity.isConnected()) {
                    throw new IllegalStateException("Cluster tier manager disconnected");
                }
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
                return;
            }
            try {
                try {
                    throw new TimeoutException();
                }
                catch (InterruptedException e) {
                    interrupted = true;
                    continue;
                }
            }
            catch (Throwable throwable) {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
                throw throwable;
            }
            break;
        }
    }

    @Override
    public String getCacheId() {
        return this.delegate.getCacheId();
    }

    @Override
    public void close() {
        this.delegate.close();
    }

    @Override
    public Chain get(long key) throws TimeoutException {
        return this.delegate.get(key);
    }

    @Override
    public void append(long key, ByteBuffer payLoad) throws TimeoutException {
        this.performWaitingForHashInvalidation(key, () -> {
            this.delegate.append(key, payLoad);
            return null;
        }, this.entity.getTimeouts().getMutativeOperationTimeout());
    }

    @Override
    public Chain getAndAppend(long key, ByteBuffer payLoad) throws TimeoutException {
        return this.performWaitingForHashInvalidation(key, () -> this.delegate.getAndAppend(key, payLoad), this.entity.getTimeouts().getMutativeOperationTimeout());
    }

    @Override
    public void replaceAtHead(long key, Chain expect, Chain update) {
        this.delegate.replaceAtHead(key, expect, update);
    }

    @Override
    public void clear() throws TimeoutException {
        this.performWaitingForAllInvalidation(() -> {
            this.delegate.clear();
            return null;
        }, this.entity.getTimeouts().getMutativeOperationTimeout());
    }
}

