/*
 * Decompiled with CFR 0.152.
 */
package org.ehcache.loaderwriter.writebehind;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.ehcache.config.writebehind.ResilientCacheWriter;
import org.ehcache.exceptions.BulkCacheWritingException;
import org.ehcache.exceptions.CacheWritingException;
import org.ehcache.loaderwriter.writebehind.WriteBehind;
import org.ehcache.loaderwriter.writebehind.operations.DeleteOperation;
import org.ehcache.loaderwriter.writebehind.operations.OperationsFilter;
import org.ehcache.loaderwriter.writebehind.operations.SingleOperation;
import org.ehcache.loaderwriter.writebehind.operations.SingleOperationType;
import org.ehcache.loaderwriter.writebehind.operations.WriteOperation;
import org.ehcache.spi.loaderwriter.CacheLoaderWriter;
import org.ehcache.spi.loaderwriter.WriteBehindConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWriteBehindQueue<K, V>
implements WriteBehind<K, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(WriteBehind.class);
    private static final int MS_IN_SEC = 1000;
    private final long minWriteDelayMs;
    private final long maxWriteDelayMs;
    private final int rateLimitPerSecond;
    private final int maxQueueSize;
    private final boolean writeBatching;
    private final int writeBatchSize;
    private final int retryAttempts;
    private final int retryAttemptDelaySeconds;
    private final Thread processingThread;
    private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock queueReadLock = this.queueLock.readLock();
    private final ReentrantReadWriteLock.WriteLock queueWriteLock = this.queueLock.writeLock();
    private final Condition queueIsFull = this.queueWriteLock.newCondition();
    private final Condition queueIsEmpty = this.queueWriteLock.newCondition();
    private final Condition queueIsStopped = this.queueWriteLock.newCondition();
    private final CacheLoaderWriter<K, V> cacheLoaderWriter;
    private boolean stopping = false;
    private boolean stopped = true;
    private volatile OperationsFilter<SingleOperation<K, V>> filter = null;
    private final AtomicLong lastProcessing = new AtomicLong(System.currentTimeMillis());
    private final AtomicLong lastWorkDone = new AtomicLong(System.currentTimeMillis());
    private final AtomicBoolean busyProcessing = new AtomicBoolean(false);

    public AbstractWriteBehindQueue(WriteBehindConfiguration config, CacheLoaderWriter<K, V> cacheLoaderWriter) {
        this.minWriteDelayMs = config.getMinWriteDelay() * 1000;
        this.maxWriteDelayMs = config.getMaxWriteDelay() * 1000;
        this.rateLimitPerSecond = config.getRateLimitPerSecond();
        this.maxQueueSize = config.getWriteBehindMaxQueueSize();
        this.writeBatching = config.isWriteBatching();
        this.writeBatchSize = config.getWriteBatchSize();
        this.retryAttempts = config.getRetryAttempts();
        this.retryAttemptDelaySeconds = config.getRetryAttemptDelaySeconds();
        this.cacheLoaderWriter = cacheLoaderWriter;
        this.processingThread = new Thread((Runnable)new ProcessingThread(), cacheLoaderWriter.getClass().getName() + " write-behind");
        this.processingThread.setDaemon(true);
    }

    protected abstract List<SingleOperation<K, V>> quarantineItems();

    protected abstract void addItem(SingleOperation<K, V> var1);

    protected abstract void reinsertUnprocessedItems(List<SingleOperation<K, V>> var1);

    protected abstract SingleOperation<K, V> getLatestOperation(K var1);

    protected abstract void removeOperation(SingleOperation<K, V> var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start() {
        this.queueWriteLock.lock();
        try {
            if (!this.stopped) {
                throw new RuntimeException("The write-behind queue for cache '" + this.cacheLoaderWriter.getClass().getName() + "' can't be started more than once");
            }
            if (this.processingThread.isAlive()) {
                throw new RuntimeException("The thread with name " + this.processingThread.getName() + " already exists and is still running");
            }
            this.stopping = false;
            this.stopped = false;
            this.processingThread.start();
        }
        finally {
            this.queueWriteLock.unlock();
        }
    }

    @Override
    public V load(K key) throws Exception {
        SingleOperation<K, V> operation = this.getLatestOperation(key);
        return (V)(operation == null ? this.cacheLoaderWriter.load(key) : (operation.getClass() == WriteOperation.class ? ((WriteOperation)operation).getValue() : null));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void write(K key, V value) throws CacheWritingException {
        this.queueWriteLock.lock();
        try {
            this.waitForQueueSizeToDrop();
            if (this.stopping || this.stopped) {
                throw new CacheWritingException("The element '" + value + "' couldn't be added through the write-behind queue for cache '" + this.cacheLoaderWriter.getClass().getName() + "' since it's not started.");
            }
            this.addItem(new WriteOperation<K, V>(key, value));
            if (this.getQueueSize() + 1L < (long)this.maxQueueSize) {
                this.queueIsFull.signal();
            }
            this.queueIsEmpty.signal();
        }
        finally {
            this.queueWriteLock.unlock();
        }
    }

    private void waitForQueueSizeToDrop() {
        if (this.maxQueueSize > 0) {
            while (this.getQueueSize() >= (long)this.maxQueueSize) {
                try {
                    this.queueIsFull.await();
                }
                catch (InterruptedException e) {
                    this.stop();
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void delete(K key) throws CacheWritingException {
        this.queueWriteLock.lock();
        try {
            this.waitForQueueSizeToDrop();
            if (this.stopping || this.stopped) {
                throw new CacheWritingException("The entry for key '" + key + "' couldn't be deleted through the write-behind " + "queue for cache '" + this.cacheLoaderWriter.getClass().getName() + "' since it's not started.");
            }
            this.addItem(new DeleteOperation(key));
            if (this.getQueueSize() + 1L < (long)this.maxQueueSize) {
                this.queueIsFull.signal();
            }
            this.queueIsEmpty.signal();
        }
        finally {
            this.queueWriteLock.unlock();
        }
    }

    @Override
    public void stop() {
        this.queueWriteLock.lock();
        try {
            if (this.stopped) {
                return;
            }
            this.stopping = true;
            this.queueIsEmpty.signal();
            while (!this.stopped) {
                this.queueIsStopped.await();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        finally {
            this.queueWriteLock.unlock();
        }
    }

    @Override
    public void setOperationsFilter(OperationsFilter<SingleOperation<K, V>> filter) {
        this.filter = filter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processItems() throws RuntimeException {
        if (this.busyProcessing.get()) {
            throw new RuntimeException("The write behind queue for cache '" + this.cacheLoaderWriter.getClass().getName() + "' is already busy processing.");
        }
        this.busyProcessing.set(true);
        this.lastProcessing.set(System.currentTimeMillis());
        try {
            int workSize;
            List<SingleOperation<K, V>> quarantinedItems;
            this.queueWriteLock.lock();
            try {
                quarantinedItems = this.getQueueSize() > 0L ? this.quarantineItems() : null;
                workSize = quarantinedItems != null ? quarantinedItems.size() : 0;
            }
            finally {
                this.queueWriteLock.unlock();
            }
            if (0 == workSize) {
                LOGGER.debug("{} : processItems() : nothing to process", (Object)this.getThreadName());
                return;
            }
            this.filterQuarantined(quarantinedItems);
            if (this.writeBatching && this.writeBatchSize > 0) {
                if (workSize < this.writeBatchSize && this.maxWriteDelayMs > this.lastProcessing.get() - this.lastWorkDone.get()) {
                    this.waitUntilEnoughWorkItemsAvailable(quarantinedItems, workSize);
                    return;
                }
                if (this.rateLimitPerSecond > 0) {
                    long secondsSinceLastWorkDone = (System.currentTimeMillis() - this.lastWorkDone.get()) / 1000L;
                    long maxBatchSizeSinceLastWorkDone = (long)this.rateLimitPerSecond * secondsSinceLastWorkDone;
                    int batchSize = this.determineBatchSize(quarantinedItems);
                    if ((long)batchSize > maxBatchSizeSinceLastWorkDone) {
                        this.waitUntilEnoughTimeHasPassed(quarantinedItems, batchSize, secondsSinceLastWorkDone);
                        return;
                    }
                }
            }
            try {
                this.lastWorkDone.set(System.currentTimeMillis());
                LOGGER.debug("{} : processItems() : processing started", (Object)this.getThreadName());
                this.processQuarantinedItems(quarantinedItems);
            }
            catch (RuntimeException e) {
                this.reassemble(quarantinedItems);
                throw e;
            }
            catch (Exception e) {
                this.reassemble(quarantinedItems);
                throw new CacheWritingException((Throwable)e);
            }
        }
        finally {
            this.busyProcessing.set(false);
            LOGGER.debug("{} : processItems() : processing finished", (Object)this.getThreadName());
        }
    }

    private void processQuarantinedItems(List<SingleOperation<K, V>> quarantinedItems) throws Exception {
        LOGGER.debug("{} : processItems() : processing  quarantined items", (Object)this.getThreadName());
        if (this.writeBatching && this.writeBatchSize > 0) {
            this.processBatchedOperations(quarantinedItems);
        } else {
            this.processSingleOperation(quarantinedItems);
        }
    }

    private void processBatchedOperations(List<SingleOperation<K, V>> quarantinedItems) throws Exception {
        int batchSize = this.determineBatchSize(quarantinedItems);
        TreeMap<SingleOperationType, ArrayList<SingleOperation<K, V>>> separatedItemsPerType = new TreeMap<SingleOperationType, ArrayList<SingleOperation<K, V>>>();
        for (int i = 0; i < batchSize; ++i) {
            SingleOperation<K, V> item = quarantinedItems.get(i);
            LOGGER.debug("{} : processItems() : adding {} to next batch", (Object)this.getThreadName(), item);
            ArrayList<SingleOperation<K, V>> itemsPerType = (ArrayList<SingleOperation<K, V>>)separatedItemsPerType.get((Object)item.getType());
            if (null == itemsPerType) {
                itemsPerType = new ArrayList<SingleOperation<K, V>>();
                separatedItemsPerType.put(item.getType(), itemsPerType);
            }
            itemsPerType.add(item);
        }
        Map failures = null;
        Set successes = null;
        block8: for (List itemsPerType : separatedItemsPerType.values()) {
            int executionsLeft = this.retryAttempts + 1;
            while (executionsLeft-- > 0) {
                try {
                    ((SingleOperation)itemsPerType.get(0)).createBatchOperation(itemsPerType).performBatchOperation(this.cacheLoaderWriter);
                    continue block8;
                }
                catch (BulkCacheWritingException bulkCacheWritingException) {
                    failures = bulkCacheWritingException.getFailures();
                    successes = bulkCacheWritingException.getSuccesses();
                    for (SingleOperation singleOperation : itemsPerType) {
                        if (!successes.contains(singleOperation.getKey())) continue;
                        itemsPerType.remove(singleOperation);
                    }
                    if (executionsLeft <= 0) {
                        if (failures == null) continue;
                        for (Map.Entry entry : failures.entrySet()) {
                            LOGGER.warn("Exception while processing key '{}' write behind queue", entry.getKey());
                            if (!(this.cacheLoaderWriter instanceof ResilientCacheWriter)) continue;
                            ((ResilientCacheWriter)this.cacheLoaderWriter).throwAway(entry.getKey(), null, (Exception)entry.getValue());
                        }
                        continue;
                    }
                    LOGGER.warn("Exception while processing write behind queue, retrying in {} seconds, {} retries left : {} ", new Object[]{this.retryAttemptDelaySeconds, executionsLeft, bulkCacheWritingException});
                    try {
                        Thread.sleep(this.retryAttemptDelaySeconds * 1000);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                        throw bulkCacheWritingException;
                    }
                }
                catch (Exception e) {
                    if (executionsLeft <= 0) {
                        LOGGER.warn("Exception while bulk processing in write behind queue", (Throwable)e);
                        if (!(this.cacheLoaderWriter instanceof ResilientCacheWriter)) continue;
                        for (SingleOperation singleOperation : itemsPerType) {
                            ((ResilientCacheWriter)this.cacheLoaderWriter).throwAway(singleOperation.getKey(), singleOperation.getType() == SingleOperationType.WRITE ? ((WriteOperation)singleOperation).getValue() : null, e);
                        }
                        continue;
                    }
                    LOGGER.warn("Exception while processing write behind queue, retrying in {} seconds, {} retries left : {} ", new Object[]{this.retryAttemptDelaySeconds, executionsLeft, e});
                    try {
                        Thread.sleep(this.retryAttemptDelaySeconds * 1000);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                        throw e;
                    }
                }
            }
        }
        boolean reassenbleRequired = false;
        for (int i = 0; i < batchSize; ++i) {
            this.removeOperation(quarantinedItems.remove(0));
        }
        boolean bl = reassenbleRequired = !quarantinedItems.isEmpty();
        if (reassenbleRequired) {
            this.reassemble(quarantinedItems);
        }
    }

    private void processSingleOperation(List<SingleOperation<K, V>> quarantinedItems) throws Exception {
        while (!quarantinedItems.isEmpty()) {
            SingleOperation<K, V> item = quarantinedItems.get(0);
            LOGGER.debug("{} : processItems() : processing {} ", (Object)this.getThreadName(), item);
            int executionsLeft = this.retryAttempts + 1;
            while (executionsLeft-- > 0) {
                try {
                    item.performSingleOperation(this.cacheLoaderWriter);
                    break;
                }
                catch (Exception e) {
                    if (executionsLeft <= 0) {
                        LOGGER.warn("Exception while processing key '{}' write behind queue : {}", item.getKey(), (Object)e);
                        if (!(this.cacheLoaderWriter instanceof ResilientCacheWriter)) continue;
                        ((ResilientCacheWriter)this.cacheLoaderWriter).throwAway(item.getKey(), item.getType() == SingleOperationType.WRITE ? ((WriteOperation)item).getValue() : null, e);
                        continue;
                    }
                    LOGGER.warn("Exception while processing write behind queue, retrying in {} seconds, {} retries left : {}", new Object[]{this.retryAttemptDelaySeconds, executionsLeft, e});
                    try {
                        Thread.sleep(this.retryAttemptDelaySeconds * 1000);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                        throw new Exception("Exception while processing key '" + item.getKey() + "' write behind queue", e);
                    }
                }
            }
            this.removeOperation(quarantinedItems.remove(0));
        }
    }

    private int determineBatchSize(List<SingleOperation<K, V>> quarantinedItems) {
        int batchSize = this.writeBatchSize;
        if (quarantinedItems.size() < batchSize) {
            batchSize = quarantinedItems.size();
        }
        return batchSize;
    }

    private void waitUntilEnoughWorkItemsAvailable(List<SingleOperation<K, V>> quarantinedItems, int workSize) {
        LOGGER.debug("{} : processItems() : only {} work items available, waiting for {} items to fill up a batch", new Object[]{this.getThreadName(), workSize, this.writeBatchSize});
        this.reassemble(quarantinedItems);
    }

    private void waitUntilEnoughTimeHasPassed(List<SingleOperation<K, V>> quarantinedItems, int batchSize, long secondsSinceLastWorkDone) {
        LOGGER.debug("{} : processItems() : last work was done {} seconds ago, processing {} batch items would exceed the rate limit of {} , waiting for a while.", new Object[]{this.getThreadName(), secondsSinceLastWorkDone, batchSize, this.rateLimitPerSecond});
        this.reassemble(quarantinedItems);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reassemble(List<SingleOperation<K, V>> quarantinedItems) {
        this.queueWriteLock.lock();
        try {
            if (null == quarantinedItems) {
                return;
            }
            this.reinsertUnprocessedItems(quarantinedItems);
            this.queueIsEmpty.signal();
        }
        finally {
            this.queueWriteLock.unlock();
        }
    }

    @Override
    public abstract long getQueueSize();

    private String getThreadName() {
        return this.processingThread.getName();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isStopped() {
        this.queueReadLock.lock();
        try {
            boolean bl = this.stopped;
            return bl;
        }
        finally {
            this.queueReadLock.unlock();
        }
    }

    private long getLastProcessing() {
        return this.lastProcessing.get();
    }

    private void filterQuarantined(List<SingleOperation<K, V>> quarantinedItems) {
        OperationsFilter<SingleOperation<K, V>> operationsFilter = this.filter;
        if (operationsFilter != null) {
            operationsFilter.filter(quarantinedItems);
        }
    }

    private final class ProcessingThread
    implements Runnable {
        private ProcessingThread() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (!AbstractWriteBehindQueue.this.isStopped()) {
                    AbstractWriteBehindQueue.this.processItems();
                    AbstractWriteBehindQueue.this.queueWriteLock.lock();
                    try {
                        try {
                            if (AbstractWriteBehindQueue.this.minWriteDelayMs != 0L) {
                                long actualDelay;
                                long delay = AbstractWriteBehindQueue.this.minWriteDelayMs;
                                do {
                                    boolean cond = AbstractWriteBehindQueue.this.queueIsEmpty.await(delay, TimeUnit.MILLISECONDS);
                                } while ((delay = (actualDelay = System.currentTimeMillis() - AbstractWriteBehindQueue.this.getLastProcessing()) < AbstractWriteBehindQueue.this.minWriteDelayMs ? AbstractWriteBehindQueue.this.minWriteDelayMs - actualDelay : 0L) > 0L);
                            } else {
                                while (!AbstractWriteBehindQueue.this.stopping && AbstractWriteBehindQueue.this.getQueueSize() == 0L) {
                                    AbstractWriteBehindQueue.this.queueIsEmpty.await();
                                }
                            }
                        }
                        catch (InterruptedException e) {
                            AbstractWriteBehindQueue.this.stop();
                            Thread.currentThread().interrupt();
                        }
                        if (AbstractWriteBehindQueue.this.stopping && AbstractWriteBehindQueue.this.getQueueSize() == 0L) {
                            this.stopTheQueueThread();
                        }
                        AbstractWriteBehindQueue.this.queueIsFull.signal();
                    }
                    finally {
                        AbstractWriteBehindQueue.this.queueWriteLock.unlock();
                    }
                }
            }
            finally {
                this.stopTheQueueThread();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void stopTheQueueThread() {
            AbstractWriteBehindQueue.this.queueWriteLock.lock();
            try {
                AbstractWriteBehindQueue.this.stopped = true;
                AbstractWriteBehindQueue.this.stopping = false;
                AbstractWriteBehindQueue.this.queueIsStopped.signalAll();
            }
            finally {
                AbstractWriteBehindQueue.this.queueWriteLock.unlock();
            }
        }
    }
}

