/*
 * Decompiled with CFR 0.152.
 */
package org.ehcache.clustered.client.internal.store;

import java.time.Duration;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.LongSupplier;
import org.ehcache.clustered.client.config.Timeouts;
import org.ehcache.clustered.client.internal.service.ClusterTierValidationException;
import org.ehcache.clustered.client.internal.store.ClusterTierClientEntity;
import org.ehcache.clustered.client.internal.store.InternalClusterTierClientEntity;
import org.ehcache.clustered.common.internal.ServerStoreConfiguration;
import org.ehcache.clustered.common.internal.exceptions.ClusterException;
import org.ehcache.clustered.common.internal.messages.ClusterTierReconnectMessage;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityMessage;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
import org.ehcache.clustered.common.internal.messages.EhcacheMessageType;
import org.ehcache.clustered.common.internal.messages.EhcacheOperationMessage;
import org.ehcache.clustered.common.internal.messages.EhcacheResponseType;
import org.ehcache.clustered.common.internal.messages.LifeCycleMessageFactory;
import org.ehcache.clustered.common.internal.messages.ReconnectMessageCodec;
import org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage;
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.entity.EndpointDelegate;
import org.terracotta.entity.EntityClientEndpoint;
import org.terracotta.entity.EntityMessage;
import org.terracotta.entity.EntityResponse;
import org.terracotta.entity.InvocationBuilder;
import org.terracotta.entity.InvokeFuture;
import org.terracotta.entity.MessageCodecException;
import org.terracotta.exception.EntityException;

public class SimpleClusterTierClientEntity
implements InternalClusterTierClientEntity {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleClusterTierClientEntity.class);
    private static final Set<EhcacheMessageType> GET_STORE_OPS = EnumSet.of(EhcacheMessageType.GET_STORE, EhcacheMessageType.ITERATOR_ADVANCE, EhcacheMessageType.ITERATOR_OPEN, EhcacheMessageType.ITERATOR_CLOSE);
    private final EntityClientEndpoint<EhcacheEntityMessage, EhcacheEntityResponse> endpoint;
    private final LifeCycleMessageFactory messageFactory;
    private final Object lock = new Object();
    private final ReconnectMessageCodec reconnectMessageCodec = new ReconnectMessageCodec();
    private final Map<Class<? extends EhcacheEntityResponse>, List<ClusterTierClientEntity.ResponseListener<? extends EhcacheEntityResponse>>> responseListeners = new ConcurrentHashMap<Class<? extends EhcacheEntityResponse>, List<ClusterTierClientEntity.ResponseListener<? extends EhcacheEntityResponse>>>();
    private final List<ClusterTierClientEntity.DisconnectionListener> disconnectionListeners = new CopyOnWriteArrayList<ClusterTierClientEntity.DisconnectionListener>();
    private final Timeouts timeouts;
    private final String storeIdentifier;
    private final List<ClusterTierClientEntity.ReconnectListener> reconnectListeners = new CopyOnWriteArrayList<ClusterTierClientEntity.ReconnectListener>();
    private volatile boolean connected = true;
    private volatile boolean eventsEnabled;
    private final Executor asyncWorker;

    public SimpleClusterTierClientEntity(EntityClientEndpoint<EhcacheEntityMessage, EhcacheEntityResponse> endpoint, Timeouts timeouts, final String storeIdentifier, Executor asyncWorker) {
        this.endpoint = endpoint;
        this.timeouts = timeouts;
        this.storeIdentifier = storeIdentifier;
        this.asyncWorker = Objects.requireNonNull(asyncWorker);
        this.messageFactory = new LifeCycleMessageFactory();
        endpoint.setDelegate((EndpointDelegate)new EndpointDelegate<EhcacheEntityResponse>(){

            public void handleMessage(EhcacheEntityResponse messageFromServer) {
                LOGGER.trace("Entity response received from server: {}", (Object)messageFromServer);
                SimpleClusterTierClientEntity.this.fireResponseEvent(messageFromServer);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public byte[] createExtendedReconnectData() {
                Object object = SimpleClusterTierClientEntity.this.lock;
                synchronized (object) {
                    ClusterTierReconnectMessage reconnectMessage = new ClusterTierReconnectMessage(SimpleClusterTierClientEntity.this.eventsEnabled);
                    SimpleClusterTierClientEntity.this.reconnectListeners.forEach(reconnectListener -> reconnectListener.onHandleReconnect(reconnectMessage));
                    return SimpleClusterTierClientEntity.this.reconnectMessageCodec.encode(reconnectMessage);
                }
            }

            public void didDisconnectUnexpectedly() {
                LOGGER.info("Cluster tier for cache {} disconnected", (Object)storeIdentifier);
                SimpleClusterTierClientEntity.this.fireDisconnectionEvent();
            }
        });
    }

    void fireDisconnectionEvent() {
        this.connected = false;
        this.disconnectionListeners.forEach(ClusterTierClientEntity.DisconnectionListener::onDisconnection);
    }

    private <T extends EhcacheEntityResponse> void fireResponseEvent(T response) {
        List<ClusterTierClientEntity.ResponseListener<? extends EhcacheEntityResponse>> responseListeners = this.responseListeners.get(response.getClass());
        if (responseListeners == null) {
            LOGGER.warn("Ignoring the response {} as no registered response listener could be found.", response);
            return;
        }
        LOGGER.debug("{} registered response listener(s) for {}", (Object)responseListeners.size(), response.getClass());
        for (ClusterTierClientEntity.ResponseListener<? extends EhcacheEntityResponse> responseListener : responseListeners) {
            Runnable responseProcessing = () -> {
                try {
                    responseListener.onResponse(response);
                }
                catch (TimeoutException e) {
                    LOGGER.debug("Timeout exception processing: {} - resubmitting", (Object)response, (Object)e);
                    this.fireResponseEvent(response);
                }
                catch (Exception e) {
                    LOGGER.warn("Unhandled failure processing: {}", (Object)response, (Object)e);
                }
            };
            try {
                this.asyncWorker.execute(responseProcessing);
            }
            catch (RejectedExecutionException f) {
                LOGGER.warn("Response task execution rejected using inline execution: {}", response, (Object)f);
                responseProcessing.run();
            }
        }
    }

    public void close() {
        this.endpoint.close();
        this.reconnectListeners.clear();
        this.disconnectionListeners.clear();
    }

    @Override
    public Timeouts getTimeouts() {
        return this.timeouts;
    }

    @Override
    public void addReconnectListener(ClusterTierClientEntity.ReconnectListener reconnectListener) {
        this.reconnectListeners.add(reconnectListener);
    }

    @Override
    public void enableEvents(boolean enable) throws ClusterException, TimeoutException {
        if (enable == this.eventsEnabled) {
            return;
        }
        this.invokeAndWaitForComplete((EhcacheOperationMessage)new ServerStoreOpMessage.EnableEventListenerMessage(enable), true);
        this.eventsEnabled = enable;
    }

    @Override
    public void addDisconnectionListener(ClusterTierClientEntity.DisconnectionListener disconnectionListener) {
        this.disconnectionListeners.add(disconnectionListener);
    }

    @Override
    public boolean isConnected() {
        return this.connected;
    }

    @Override
    public <T extends EhcacheEntityResponse> void addResponseListener(Class<T> responseType, ClusterTierClientEntity.ResponseListener<T> responseListener) {
        List<ClusterTierClientEntity.ResponseListener<? extends EhcacheEntityResponse>> responseListeners = this.responseListeners.get(responseType);
        if (responseListeners == null) {
            responseListeners = new CopyOnWriteArrayList<ClusterTierClientEntity.ResponseListener<? extends EhcacheEntityResponse>>();
            this.responseListeners.put(responseType, responseListeners);
        }
        responseListeners.add(responseListener);
    }

    @Override
    public void validate(ServerStoreConfiguration clientStoreConfiguration) throws ClusterTierValidationException, TimeoutException {
        try {
            this.invokeInternalAndWait((InvocationBuilder<EhcacheEntityMessage, EhcacheEntityResponse>)this.endpoint.beginInvoke(), this.timeouts.getConnectionTimeout(), (EhcacheEntityMessage)this.messageFactory.validateServerStore(this.storeIdentifier, clientStoreConfiguration), false);
        }
        catch (ClusterException e) {
            throw new ClusterTierValidationException("Error validating cluster tier '" + this.storeIdentifier + "'", e);
        }
    }

    @Override
    public EhcacheEntityResponse invokeStateRepositoryOperation(StateRepositoryOpMessage message, boolean track) throws ClusterException, TimeoutException {
        return this.invokeAndWaitForRetired((EhcacheOperationMessage)message, track);
    }

    @Override
    public void invokeAndWaitForSend(EhcacheOperationMessage message, boolean track) throws TimeoutException {
        this.invokeInternal((InvocationBuilder<EhcacheEntityMessage, EhcacheEntityResponse>)this.endpoint.beginInvoke().ackSent(), this.getTimeoutDuration(message), (EhcacheEntityMessage)message, track);
    }

    @Override
    public void invokeAndWaitForReceive(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
        this.invokeInternalAndWait((InvocationBuilder<EhcacheEntityMessage, EhcacheEntityResponse>)this.endpoint.beginInvoke().ackReceived(), message, track);
    }

    @Override
    public EhcacheEntityResponse invokeAndWaitForComplete(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
        return this.invokeInternalAndWait((InvocationBuilder<EhcacheEntityMessage, EhcacheEntityResponse>)this.endpoint.beginInvoke().blockGetOnRetire(false), message, track);
    }

    @Override
    public EhcacheEntityResponse invokeAndWaitForRetired(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
        return this.invokeInternalAndWait((InvocationBuilder<EhcacheEntityMessage, EhcacheEntityResponse>)this.endpoint.beginInvoke().blockGetOnRetire(true), message, track);
    }

    private EhcacheEntityResponse invokeInternalAndWait(InvocationBuilder<EhcacheEntityMessage, EhcacheEntityResponse> invocationBuilder, EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
        return this.invokeInternalAndWait(invocationBuilder, this.getTimeoutDuration(message), (EhcacheEntityMessage)message, track);
    }

    private EhcacheEntityResponse invokeInternalAndWait(InvocationBuilder<EhcacheEntityMessage, EhcacheEntityResponse> invocationBuilder, Duration timeLimit, EhcacheEntityMessage message, boolean track) throws ClusterException, TimeoutException {
        try {
            LongSupplier nanosRemaining = Timeouts.nanosStartingFromNow(timeLimit);
            InvokeFuture<EhcacheEntityResponse> future = this.invokeInternal(invocationBuilder, Duration.ofNanos(nanosRemaining.getAsLong()), message, track);
            EhcacheEntityResponse response = SimpleClusterTierClientEntity.waitFor(nanosRemaining.getAsLong(), future);
            if (EhcacheResponseType.FAILURE.equals((Object)response.getResponseType())) {
                throw ((EhcacheEntityResponse.Failure)response).getCause();
            }
            return response;
        }
        catch (EntityException e) {
            throw new RuntimeException(message + " error: " + e.toString(), e);
        }
        catch (TimeoutException e) {
            String msg = "Timeout exceeded for " + message + " message; " + timeLimit;
            TimeoutException timeoutException = new TimeoutException(msg);
            timeoutException.initCause(e);
            LOGGER.info(msg, (Throwable)timeoutException);
            throw timeoutException;
        }
    }

    /*
     * Loose catch block
     */
    private InvokeFuture<EhcacheEntityResponse> invokeInternal(InvocationBuilder<EhcacheEntityMessage, EhcacheEntityResponse> invocationBuilder, Duration timeout, EhcacheEntityMessage message, boolean track) throws TimeoutException {
        boolean interrupted = Thread.interrupted();
        try {
            LongSupplier nanosRemaining = Timeouts.nanosStartingFromNow(timeout);
            while (true) {
                block11: {
                    long nanos = nanosRemaining.getAsLong();
                    if (nanos <= 0L) break block11;
                    InvokeFuture invokeFuture = invocationBuilder.message((EntityMessage)message).invokeWithTimeout(nanos, TimeUnit.NANOSECONDS);
                    return invokeFuture;
                    {
                        catch (InterruptedException e) {
                            interrupted = true;
                            continue;
                        }
                    }
                }
                throw new TimeoutException("Timed out waiting for server response to message: " + message);
                break;
            }
            catch (MessageCodecException e) {
                throw new RuntimeException(message + " error: " + e.getMessage(), e);
            }
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private Duration getTimeoutDuration(EhcacheOperationMessage message) {
        if (GET_STORE_OPS.contains(message.getMessageType())) {
            return this.timeouts.getReadOperationTimeout();
        }
        return this.timeouts.getWriteOperationTimeout();
    }

    private static <T extends EntityResponse> T waitFor(long nanos, InvokeFuture<T> future) throws EntityException, TimeoutException {
        boolean interrupted = false;
        long deadlineTimeout = System.nanoTime() + nanos;
        while (true) {
            try {
                long timeRemaining = deadlineTimeout - System.nanoTime();
                EntityResponse entityResponse = future.getWithTimeout(timeRemaining, TimeUnit.NANOSECONDS);
                return (T)entityResponse;
            }
            catch (InterruptedException e) {
                interrupted = true;
                continue;
            }
            break;
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

