/*
 * Decompiled with CFR 0.152.
 */
package net.sf.ehcache.distribution;

import java.io.Serializable;
import java.rmi.UnmarshalException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import net.sf.ehcache.CacheException;
import net.sf.ehcache.Ehcache;
import net.sf.ehcache.Element;
import net.sf.ehcache.Status;
import net.sf.ehcache.distribution.CachePeer;
import net.sf.ehcache.distribution.EventMessage;
import net.sf.ehcache.distribution.RMISynchronousCacheReplicator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RMIAsynchronousCacheReplicator
extends RMISynchronousCacheReplicator {
    private static final Logger LOG = LoggerFactory.getLogger(RMIAsynchronousCacheReplicator.class.getName());
    protected Thread replicationThread = new ReplicationThread();
    protected int asynchronousReplicationInterval;
    protected final Queue<CacheEventMessage> replicationQueue = new ConcurrentLinkedQueue<CacheEventMessage>();

    public RMIAsynchronousCacheReplicator(boolean replicatePuts, boolean replicatePutsViaCopy, boolean replicateUpdates, boolean replicateUpdatesViaCopy, boolean replicateRemovals, int asynchronousReplicationInterval) {
        super(replicatePuts, replicatePutsViaCopy, replicateUpdates, replicateUpdatesViaCopy, replicateRemovals);
        this.asynchronousReplicationInterval = asynchronousReplicationInterval;
        this.status = Status.STATUS_ALIVE;
        this.replicationThread.start();
    }

    private void replicationThreadMain() {
        while (true) {
            if (this.alive() && this.replicationQueue != null && this.replicationQueue.isEmpty()) {
                try {
                    Thread.sleep(this.asynchronousReplicationInterval);
                }
                catch (InterruptedException e) {
                    LOG.debug("Spool Thread interrupted.");
                    return;
                }
            }
            if (this.notAlive()) {
                return;
            }
            try {
                this.flushReplicationQueue();
                continue;
            }
            catch (Throwable e) {
                LOG.error("Exception on flushing of replication queue: " + e.getMessage() + ". Continuing...", e);
                continue;
            }
            break;
        }
    }

    public final void notifyElementPut(Ehcache cache, Element element) throws CacheException {
        if (this.notAlive()) {
            return;
        }
        if (!this.replicatePuts) {
            return;
        }
        if (this.replicatePutsViaCopy) {
            if (!element.isSerializable()) {
                if (LOG.isWarnEnabled()) {
                    LOG.warn("Object with key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
                }
                return;
            }
            this.addToReplicationQueue(new CacheEventMessage(0, cache, element, null));
        } else {
            if (!element.isKeySerializable()) {
                if (LOG.isWarnEnabled()) {
                    LOG.warn("Object with key " + element.getObjectKey() + " does not have a Serializable key and cannot be replicated via invalidate.");
                }
                return;
            }
            this.addToReplicationQueue(new CacheEventMessage(1, cache, null, element.getKey()));
        }
    }

    public final void notifyElementUpdated(Ehcache cache, Element element) throws CacheException {
        if (this.notAlive()) {
            return;
        }
        if (!this.replicateUpdates) {
            return;
        }
        if (this.replicateUpdatesViaCopy) {
            if (!element.isSerializable()) {
                if (LOG.isWarnEnabled()) {
                    LOG.warn("Object with key " + element.getObjectKey() + " is not Serializable and cannot be updated via copy.");
                }
                return;
            }
            this.addToReplicationQueue(new CacheEventMessage(0, cache, element, null));
        } else {
            if (!element.isKeySerializable()) {
                if (LOG.isWarnEnabled()) {
                    LOG.warn("Object with key " + element.getObjectKey() + " does not have a Serializable key and cannot be replicated via invalidate.");
                }
                return;
            }
            this.addToReplicationQueue(new CacheEventMessage(1, cache, null, element.getKey()));
        }
    }

    public final void notifyElementRemoved(Ehcache cache, Element element) throws CacheException {
        if (this.notAlive()) {
            return;
        }
        if (!this.replicateRemovals) {
            return;
        }
        if (!element.isKeySerializable()) {
            if (LOG.isWarnEnabled()) {
                LOG.warn("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
            }
            return;
        }
        this.addToReplicationQueue(new CacheEventMessage(1, cache, null, element.getKey()));
    }

    public void notifyRemoveAll(Ehcache cache) {
        if (this.notAlive()) {
            return;
        }
        if (!this.replicateRemovals) {
            return;
        }
        this.addToReplicationQueue(new CacheEventMessage(3, cache, null, null));
    }

    protected void addToReplicationQueue(CacheEventMessage cacheEventMessage) {
        if (!this.replicationThread.isAlive()) {
            LOG.error("CacheEventMessages cannot be added to the replication queue because the replication thread has died.");
        } else {
            this.replicationQueue.add(cacheEventMessage);
        }
    }

    private void flushReplicationQueue() {
        int eventMessagesNotResolved;
        CacheEventMessage head = this.replicationQueue.peek();
        if (head == null) {
            return;
        }
        Ehcache cache = head.cache;
        List cachePeers = RMIAsynchronousCacheReplicator.listRemoteCachePeers(cache);
        int limit = this.replicationQueue.size();
        List resolvedEventMessages = this.extractAndResolveEventMessages(limit);
        for (int j = 0; j < cachePeers.size(); ++j) {
            CachePeer cachePeer = (CachePeer)cachePeers.get(j);
            try {
                cachePeer.send(resolvedEventMessages);
                continue;
            }
            catch (UnmarshalException e) {
                String message = e.getMessage();
                if (message.contains("Read time out") || message.contains("Read timed out")) {
                    LOG.warn("Unable to send message to remote peer due to socket read timeout. Consider increasing the socketTimeoutMillis setting in the cacheManagerPeerListenerFactory. Message was: " + message);
                    continue;
                }
                LOG.debug("Unable to send message to remote peer.  Message was: " + message);
                continue;
            }
            catch (Throwable t) {
                LOG.warn("Unable to send message to remote peer.  Message was: " + t.getMessage(), t);
            }
        }
        if (LOG.isWarnEnabled() && (eventMessagesNotResolved = limit - resolvedEventMessages.size()) > 0) {
            LOG.warn(eventMessagesNotResolved + " messages were discarded on replicate due to reclamation of " + "SoftReferences by the VM. Consider increasing the maximum heap size and/or setting the " + "starting heap size to a higher value.");
        }
    }

    private List extractAndResolveEventMessages(int limit) {
        CacheEventMessage message;
        ArrayList<EventMessage> list = new ArrayList<EventMessage>();
        for (int i = 0; i < limit && (message = this.replicationQueue.poll()) != null; ++i) {
            EventMessage eventMessage = message.getEventMessage();
            if (eventMessage == null || !eventMessage.isValid()) continue;
            list.add(eventMessage);
        }
        return list;
    }

    public final void dispose() {
        this.status = Status.STATUS_SHUTDOWN;
        this.flushReplicationQueue();
    }

    public Object clone() throws CloneNotSupportedException {
        super.clone();
        return new RMIAsynchronousCacheReplicator(this.replicatePuts, this.replicatePutsViaCopy, this.replicateUpdates, this.replicateUpdatesViaCopy, this.replicateRemovals, this.asynchronousReplicationInterval);
    }

    private static class CacheEventMessage {
        private final Ehcache cache;
        private final EventMessage eventMessage;

        public CacheEventMessage(int event, Ehcache cache, Element element, Serializable key) {
            this.eventMessage = new EventMessage(event, key, element);
            this.cache = cache;
        }

        public final EventMessage getEventMessage() {
            return this.eventMessage;
        }
    }

    private final class ReplicationThread
    extends Thread {
        public ReplicationThread() {
            super("Replication Thread");
            this.setDaemon(true);
            this.setPriority(5);
        }

        public final void run() {
            RMIAsynchronousCacheReplicator.this.replicationThreadMain();
        }
    }
}

