/*
 * Decompiled with CFR 0.152.
 */
package org.shoal.ha.cache.impl.interceptor;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.shoal.adapter.store.commands.AbstractSaveCommand;
import org.shoal.adapter.store.commands.NoOpCommand;
import org.shoal.adapter.store.commands.SaveCommand;
import org.shoal.ha.cache.api.DataStoreAlreadyClosedException;
import org.shoal.ha.cache.api.DataStoreContext;
import org.shoal.ha.cache.api.DataStoreException;
import org.shoal.ha.cache.impl.command.Command;
import org.shoal.ha.cache.impl.interceptor.CommandCollector;
import org.shoal.ha.cache.impl.interceptor.ReplicationFramePayloadCommand;
import org.shoal.ha.cache.impl.util.ASyncReplicationManager;

public class ReplicationCommandTransmitterWithMap<K, V>
implements Runnable,
CommandCollector<K, V> {
    private static final Logger _logger = Logger.getLogger("org.shoal.ha.cache.interceptor.transmit");
    private static final Logger _statsLogger = Logger.getLogger("org.shoal.ha.cache.stats");
    private DataStoreContext<K, V> dsc;
    private volatile String targetName;
    private ScheduledFuture future;
    private static final String TRANSMITTER_FREQUECNCY_PROP_NAME = "org.shoal.cache.transmitter.frequency.in.millis";
    private static final String MAX_BATCH_SIZE_PROP_NAME = "org.shoal.cache.transmitter.max.batch.size";
    private static int TRANSMITTER_FREQUECNCY_IN_MILLIS = 100;
    private static int MAX_BATCH_SIZE = 30;
    private AtomicReference<BatchedCommandMapDataFrame> mapRef;
    ASyncReplicationManager asyncReplicationManager = ASyncReplicationManager._getInstance();
    private long timeStamp = System.currentTimeMillis();
    ThreadPoolExecutor executor;
    private AtomicBoolean openStatus = new AtomicBoolean(true);
    private AtomicInteger activeBatchCount = new AtomicInteger(1);
    private CountDownLatch latch = new CountDownLatch(1);
    private static AtomicInteger _sendBatchCount;

    @Override
    public void initialize(String targetName, DataStoreContext<K, V> rsInfo) {
        this.executor = ASyncReplicationManager._getInstance().getExecutorService();
        this.targetName = targetName;
        this.dsc = rsInfo;
        BatchedCommandMapDataFrame batch = new BatchedCommandMapDataFrame(this.openStatus.get());
        this.mapRef = new AtomicReference<BatchedCommandMapDataFrame>(batch);
        this.future = this.asyncReplicationManager.getScheduledThreadPoolExecutor().scheduleAtFixedRate(this, TRANSMITTER_FREQUECNCY_IN_MILLIS, TRANSMITTER_FREQUECNCY_IN_MILLIS, TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() {
        try {
            if (this.openStatus.compareAndSet(true, false)) {
                this.future.cancel(false);
                if (_logger.isLoggable(Level.FINE)) {
                    _logger.log(Level.FINE, "(ReplicationCommandTransmitterWithMap) BEGIN Flushing all batched data upon shutdown..." + this.activeBatchCount.get() + " to be flushed...");
                }
                BatchedCommandMapDataFrame closedBatch = new BatchedCommandMapDataFrame(false);
                BatchedCommandMapDataFrame batch = this.mapRef.getAndSet(closedBatch);
                this.asyncReplicationManager.getExecutorService().submit(batch);
                this.dsc.getDataStoreMBean().incrementBatchSentCount();
                for (int loopCount = 0; loopCount < 5; ++loopCount) {
                    if (this.activeBatchCount.get() <= 0) continue;
                    try {
                        this.latch.await(5L, TimeUnit.SECONDS);
                        continue;
                    }
                    catch (InterruptedException inEx) {
                        // empty catch block
                    }
                }
                if (_logger.isLoggable(Level.FINE)) {
                    _logger.log(Level.FINE, "(ReplicationCommandTransmitterWithMap) DONE Flushing all batched data upon shutdown...");
                }
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public void addCommand(Command<K, V> cmd) throws DataStoreException {
        this.addCommandToBatch(cmd, true);
    }

    @Override
    public void removeCommand(Command<K, V> cmd) throws DataStoreException {
        this.addCommandToBatch(cmd, false);
    }

    private void addCommandToBatch(Command<K, V> cmd, boolean isAdd) throws DataStoreException {
        boolean done = false;
        while (!done) {
            BatchedCommandMapDataFrame batch = this.mapRef.get();
            done = batch.doAddOrRemove(cmd, isAdd);
            if (done) continue;
            BatchedCommandMapDataFrame frame = new BatchedCommandMapDataFrame(this.openStatus.get());
            frame.doAddOrRemove(cmd, isAdd);
            done = this.mapRef.compareAndSet(batch, frame);
            if (!done || !frame.isValid()) continue;
            this.activeBatchCount.incrementAndGet();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            this.dsc.acquireReadLock();
            if (this.openStatus.get()) {
                BatchedCommandMapDataFrame batch = this.mapRef.get();
                batch.flushAndTransmit();
            }
        }
        catch (DataStoreAlreadyClosedException dsEx) {
        }
        catch (DataStoreException dsEx) {
            _logger.log(Level.WARNING, "Error during flush...");
        }
        finally {
            this.dsc.releaseReadLock();
        }
    }

    static {
        try {
            TRANSMITTER_FREQUECNCY_IN_MILLIS = Integer.valueOf(System.getProperty(TRANSMITTER_FREQUECNCY_PROP_NAME, "" + TRANSMITTER_FREQUECNCY_IN_MILLIS));
            _statsLogger.log(Level.CONFIG, "USING org.shoal.cache.transmitter.frequency.in.millis = " + TRANSMITTER_FREQUECNCY_IN_MILLIS);
        }
        catch (Exception ex) {
            _statsLogger.log(Level.CONFIG, "USING org.shoal.cache.transmitter.frequency.in.millis = " + TRANSMITTER_FREQUECNCY_IN_MILLIS);
        }
        try {
            MAX_BATCH_SIZE = Integer.valueOf(System.getProperty(MAX_BATCH_SIZE_PROP_NAME, "" + MAX_BATCH_SIZE));
            _statsLogger.log(Level.CONFIG, "USING org.shoal.cache.transmitter.max.batch.size = " + MAX_BATCH_SIZE);
        }
        catch (Exception ex) {
            _statsLogger.log(Level.CONFIG, "USING org.shoal.cache.transmitter.max.batch.size = " + MAX_BATCH_SIZE);
        }
        _logger.log(Level.FINE, "USING ReplicationCommandTransmitterWithMap");
        _sendBatchCount = new AtomicInteger(0);
    }

    private class BatchedCommandMapDataFrame
    implements Runnable {
        private int myBatchNumber;
        private AtomicInteger inFlightCount = new AtomicInteger(0);
        private AtomicBoolean batchThresholdReached = new AtomicBoolean(false);
        private AtomicBoolean alreadySent = new AtomicBoolean(false);
        private volatile ConcurrentHashMap<Object, ConcurrentLinkedQueue<Command>> map = new ConcurrentHashMap();
        private AtomicInteger removedKeysSize = new AtomicInteger(0);
        private volatile ConcurrentLinkedQueue removedKeys = new ConcurrentLinkedQueue();
        private volatile long lastTS = System.currentTimeMillis();
        private boolean validBatch;

        BatchedCommandMapDataFrame(boolean validBatch) {
            this.validBatch = validBatch;
            this.myBatchNumber = _sendBatchCount.incrementAndGet();
        }

        private boolean isValid() {
            return this.validBatch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean doAddOrRemove(Command cmd, boolean isAdd) throws DataStoreException {
            if (!this.validBatch) {
                throw new DataStoreAlreadyClosedException("Cannot add a command to a Batch after the DataStore has been closed");
            }
            boolean result = false;
            if (!this.batchThresholdReached.get()) {
                int inCount = 0;
                try {
                    this.inFlightCount.incrementAndGet();
                    if (!this.batchThresholdReached.get()) {
                        if (isAdd) {
                            ConcurrentLinkedQueue cmdList = this.map.get(cmd.getKey());
                            if (cmdList == null) {
                                cmdList = new ConcurrentLinkedQueue();
                                ConcurrentLinkedQueue<Command> cmdList1 = this.map.putIfAbsent(cmd.getKey(), cmdList);
                                cmdList = cmdList1 != null ? cmdList1 : cmdList;
                            }
                            cmdList.add(cmd);
                            result = true;
                            if (this.map.size() >= MAX_BATCH_SIZE) {
                                this.batchThresholdReached.compareAndSet(false, true);
                            }
                        } else {
                            this.map.remove(cmd.getKey());
                            this.removedKeys.add(cmd.getKey());
                            int removedSz = this.removedKeysSize.incrementAndGet();
                            result = true;
                            if (removedSz >= 2 * MAX_BATCH_SIZE) {
                                this.batchThresholdReached.compareAndSet(false, true);
                            }
                        }
                    }
                }
                finally {
                    inCount = this.inFlightCount.decrementAndGet();
                }
                if (this.batchThresholdReached.get() && inCount == 0 && this.alreadySent.compareAndSet(false, true)) {
                    if (_statsLogger.isLoggable(Level.FINE)) {
                        _statsLogger.log(Level.FINE, "doAddOrRemove batchThresholdReached.get()=" + this.batchThresholdReached.get() + "; inFlightCount = " + inCount + "; ");
                        _statsLogger.log(Level.FINE, "Sending batch# " + this.myBatchNumber + " to " + ReplicationCommandTransmitterWithMap.this.targetName + "; wasActive for (" + (System.currentTimeMillis() - this.lastTS) + " millis");
                    }
                    ReplicationCommandTransmitterWithMap.this.asyncReplicationManager.getExecutorService().submit(this);
                    ReplicationCommandTransmitterWithMap.this.dsc.getDataStoreMBean().incrementBatchSentCount();
                }
            }
            return result;
        }

        void flushAndTransmit() throws DataStoreException {
            ReplicationCommandTransmitterWithMap.this.dsc.getDataStoreMBean().incrementFlushThreadWakeupCount();
            if (!(this.alreadySent.get() || this.map.size() <= 0 && this.removedKeysSize.get() <= 0)) {
                if (this.lastTS == ReplicationCommandTransmitterWithMap.this.timeStamp) {
                    if (_statsLogger.isLoggable(Level.FINE)) {
                        _statsLogger.log(Level.FINE, "flushAndTransmit will flush data because lastTS = " + this.lastTS + "; timeStamp = " + ReplicationCommandTransmitterWithMap.this.timeStamp + "; lastTS = " + this.lastTS + "; map.size() = " + this.map.size() + "; removedKeys.size() = " + this.removedKeysSize.get());
                    }
                    NoOpCommand nc = null;
                    while (this.doAddOrRemove(nc = new NoOpCommand(), true)) {
                    }
                    ReplicationCommandTransmitterWithMap.this.dsc.getDataStoreMBean().incrementFlushThreadFlushedCount();
                } else {
                    if (_statsLogger.isLoggable(Level.FINER)) {
                        _statsLogger.log(Level.FINER, "flushAndTransmit will NOT flush data because lastTS = " + this.lastTS + "; timeStamp = " + ReplicationCommandTransmitterWithMap.this.timeStamp + "; lastTS = " + this.lastTS + "; map.size() = " + this.map.size() + "; removedKeys.size() = " + this.removedKeysSize.get());
                    }
                    ReplicationCommandTransmitterWithMap.this.timeStamp = this.lastTS;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                ReplicationFramePayloadCommand rfCmd = new ReplicationFramePayloadCommand();
                rfCmd.setTargetInstance(ReplicationCommandTransmitterWithMap.this.targetName);
                try {
                    for (ConcurrentLinkedQueue<Command> cmdList : this.map.values()) {
                        AbstractSaveCommand saveCmd = null;
                        for (Command cmd : cmdList) {
                            if (cmd.getOpcode() == 102) continue;
                            if (cmd.getOpcode() == 33) {
                                SaveCommand thisSaveCommand = (SaveCommand)cmd;
                                if (saveCmd != null && saveCmd.getVersion() >= thisSaveCommand.getVersion()) continue;
                                saveCmd = thisSaveCommand;
                                continue;
                            }
                            rfCmd.addComamnd(cmd);
                        }
                        if (saveCmd == null) continue;
                        rfCmd.addComamnd(saveCmd);
                    }
                    rfCmd.setRemovedKeys(this.removedKeys);
                    ReplicationCommandTransmitterWithMap.this.dsc.getCommandManager().execute(rfCmd);
                }
                catch (IOException ioEx) {
                    _logger.log(Level.WARNING, "Batch operation (ASyncCommandList failed...", ioEx);
                }
            }
            finally {
                if (this.validBatch && ReplicationCommandTransmitterWithMap.this.activeBatchCount.decrementAndGet() <= 0 && !ReplicationCommandTransmitterWithMap.this.openStatus.get()) {
                    ReplicationCommandTransmitterWithMap.this.latch.countDown();
                }
                if (_logger.isLoggable(Level.FINE)) {
                    _logger.log(Level.FINE, "(ReplicationCommandTransmitterWithMap) Completed one batch. Still " + ReplicationCommandTransmitterWithMap.this.activeBatchCount.get() + " to be flushed...");
                }
            }
        }
    }
}

