/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Memtable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MeteredFlusher
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(MeteredFlusher.class);
    public static final MeteredFlusher instance = new MeteredFlusher();
    private final ScheduledExecutorService executor = new DebuggableScheduledThreadPoolExecutor("MeteredFlusher");

    private MeteredFlusher() {
    }

    public void start() {
        this.executor.scheduleWithFixedDelay(this, 1L, 1L, TimeUnit.SECONDS);
    }

    @Override
    public void run() {
        long allowedSize = this.calculateAllowedSize();
        long flushingSize = this.calculateFlushingSize();
        if (flushingSize > 0L) {
            logger.debug("Currently flushing {} bytes of {} max", (Object)flushingSize, (Object)allowedSize);
        }
        List<ColumnFamilyStore> affectedCFs = this.affectedColumnFamilies();
        long liveSize = 0L;
        for (ColumnFamilyStore cfs : affectedCFs) {
            int maxInFlight = (int)Math.ceil((double)(2 + DatabaseDescriptor.getFlushWriters() + DatabaseDescriptor.getFlushQueueSize()) / (double)(1 + cfs.indexManager.getIndexesBackedByCfs().size()));
            long size = cfs.getTotalMemtableLiveSize();
            if (allowedSize > flushingSize && size > (allowedSize - flushingSize) / (long)maxInFlight) {
                logger.info("Flushing high-traffic column family {} (estimated {} bytes)", (Object)cfs, (Object)size);
                cfs.forceFlush();
                continue;
            }
            liveSize += size;
        }
        if (liveSize + flushingSize <= allowedSize) {
            return;
        }
        logger.info("Estimated {} live and {} flushing bytes used by all memtables", (Object)liveSize, (Object)flushingSize);
        Collections.sort(affectedCFs, new Comparator<ColumnFamilyStore>(){

            @Override
            public int compare(ColumnFamilyStore lhs, ColumnFamilyStore rhs) {
                return Long.compare(lhs.getTotalMemtableLiveSize(), rhs.getTotalMemtableLiveSize());
            }
        });
        while (!affectedCFs.isEmpty() && liveSize + (flushingSize = this.calculateFlushingSize()) > allowedSize) {
            ColumnFamilyStore cfs = affectedCFs.remove(affectedCFs.size() - 1);
            long size = cfs.getTotalMemtableLiveSize();
            if (size <= 0L) continue;
            logger.info("Flushing {} to free up {} bytes", (Object)cfs, (Object)size);
            liveSize -= size;
            cfs.forceFlush();
        }
        logger.trace("Memtable memory usage is {} bytes with {} live", (Object)(liveSize + flushingSize), (Object)liveSize);
    }

    private List<ColumnFamilyStore> affectedColumnFamilies() {
        ArrayList<ColumnFamilyStore> affected = new ArrayList<ColumnFamilyStore>();
        for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) {
            if (!cfs.getCompactionStrategy().isAffectedByMeteredFlusher()) continue;
            affected.add(cfs);
        }
        return affected;
    }

    private long calculateAllowedSize() {
        long allowed = (long)DatabaseDescriptor.getTotalMemtableSpaceInMB() * 0x100000L;
        for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) {
            if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher()) continue;
            allowed -= cfs.getCompactionStrategy().getMemtableReservedSize();
        }
        return allowed;
    }

    private long calculateFlushingSize() {
        ColumnFamilyStore measuredCFS = Memtable.activelyMeasuring;
        long flushing = measuredCFS != null && measuredCFS.getCompactionStrategy().isAffectedByMeteredFlusher() ? measuredCFS.getMemtableThreadSafe().getLiveSize() : 0L;
        for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) {
            if (!cfs.getCompactionStrategy().isAffectedByMeteredFlusher()) continue;
            for (Memtable memtable : cfs.getMemtablesPendingFlush()) {
                flushing += memtable.getLiveSize();
            }
        }
        return flushing;
    }
}

