/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.db.commitlog.AbstractCommitLogSegmentManager;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.DirectorySizeCalculator;
import org.apache.cassandra.utils.NoSpamLogger;
import org.cassandraunit.shaded.com.google.common.annotations.VisibleForTesting;
import org.cassandraunit.shaded.com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLogSegmentManagerCDC
extends AbstractCommitLogSegmentManager {
    static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManagerCDC.class);
    private final CDCSizeTracker cdcSizeTracker = new CDCSizeTracker(this, new File(DatabaseDescriptor.getCDCLogLocation()));

    public CommitLogSegmentManagerCDC(CommitLog commitLog, String storageDirectory) {
        super(commitLog, storageDirectory);
    }

    @Override
    void start() {
        this.cdcSizeTracker.start();
        super.start();
    }

    @Override
    public void discard(CommitLogSegment segment, boolean delete) {
        segment.close();
        this.addSize(-segment.onDiskSize());
        this.cdcSizeTracker.processDiscardedSegment(segment);
        if (segment.getCDCState() == CommitLogSegment.CDCState.CONTAINS) {
            FileUtils.renameWithConfirm(segment.logFile.getAbsolutePath(), DatabaseDescriptor.getCDCLogLocation() + File.separator + segment.logFile.getName());
        } else if (delete) {
            FileUtils.deleteWithConfirm(segment.logFile);
        }
    }

    @Override
    public void shutdown() {
        this.cdcSizeTracker.shutdown();
        super.shutdown();
    }

    @Override
    public CommitLogSegment.Allocation allocate(Mutation mutation, int size) throws WriteTimeoutException {
        CommitLogSegment.Allocation alloc;
        CommitLogSegment segment = this.allocatingFrom();
        this.throwIfForbidden(mutation, segment);
        while (null == (alloc = segment.allocate(mutation, size))) {
            this.advanceAllocatingFrom(segment);
            segment = this.allocatingFrom();
            this.throwIfForbidden(mutation, segment);
        }
        if (mutation.trackedByCDC()) {
            segment.setCDCState(CommitLogSegment.CDCState.CONTAINS);
        }
        return alloc;
    }

    private void throwIfForbidden(Mutation mutation, CommitLogSegment segment) throws WriteTimeoutException {
        if (mutation.trackedByCDC() && segment.getCDCState() == CommitLogSegment.CDCState.FORBIDDEN) {
            this.cdcSizeTracker.submitOverflowSizeRecalculation();
            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 10L, TimeUnit.SECONDS, "Rejecting Mutation containing CDC-enabled table. Free up space in {}.", DatabaseDescriptor.getCDCLogLocation());
            throw new WriteTimeoutException(WriteType.CDC, ConsistencyLevel.LOCAL_ONE, 0, 1);
        }
    }

    @Override
    void handleReplayedSegment(File file) {
        logger.trace("Moving (Unopened) segment {} to cdc_raw directory after replay", (Object)file);
        FileUtils.renameWithConfirm(file.getAbsolutePath(), DatabaseDescriptor.getCDCLogLocation() + File.separator + file.getName());
        this.cdcSizeTracker.addFlushedSize(file.length());
    }

    @Override
    public CommitLogSegment createSegment() {
        CommitLogSegment segment = CommitLogSegment.createSegment(this.commitLog, this);
        this.cdcSizeTracker.processNewSegment(segment);
        return segment;
    }

    @VisibleForTesting
    public long updateCDCTotalSize() {
        this.cdcSizeTracker.submitOverflowSizeRecalculation();
        try {
            Thread.sleep(DatabaseDescriptor.getCDCDiskCheckInterval() + 10);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        return this.cdcSizeTracker.totalCDCSizeOnDisk();
    }

    private static class CDCSizeTracker
    extends DirectorySizeCalculator {
        private final RateLimiter rateLimiter = RateLimiter.create(1000.0 / (double)DatabaseDescriptor.getCDCDiskCheckInterval());
        private ExecutorService cdcSizeCalculationExecutor;
        private CommitLogSegmentManagerCDC segmentManager;
        private volatile long unflushedCDCSize;
        private volatile long sizeInProgress = 0L;

        CDCSizeTracker(CommitLogSegmentManagerCDC segmentManager, File path) {
            super(path);
            this.segmentManager = segmentManager;
        }

        public void start() {
            this.size = 0L;
            this.unflushedCDCSize = 0L;
            this.cdcSizeCalculationExecutor = new ThreadPoolExecutor(1, 1, 1000L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.DiscardPolicy());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void processNewSegment(CommitLogSegment segment) {
            Object object = segment.cdcStateLock;
            synchronized (object) {
                segment.setCDCState((long)this.defaultSegmentSize() + this.totalCDCSizeOnDisk() > this.allowableCDCBytes() ? CommitLogSegment.CDCState.FORBIDDEN : CommitLogSegment.CDCState.PERMITTED);
                if (segment.getCDCState() == CommitLogSegment.CDCState.PERMITTED) {
                    this.unflushedCDCSize += (long)this.defaultSegmentSize();
                }
            }
            this.submitOverflowSizeRecalculation();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void processDiscardedSegment(CommitLogSegment segment) {
            Object object = segment.cdcStateLock;
            synchronized (object) {
                if (segment.getCDCState() == CommitLogSegment.CDCState.CONTAINS) {
                    this.size += segment.onDiskSize();
                }
                if (segment.getCDCState() != CommitLogSegment.CDCState.FORBIDDEN) {
                    this.unflushedCDCSize -= (long)this.defaultSegmentSize();
                }
            }
            this.submitOverflowSizeRecalculation();
        }

        private long allowableCDCBytes() {
            return (long)DatabaseDescriptor.getCDCSpaceInMB() * 1024L * 1024L;
        }

        public void submitOverflowSizeRecalculation() {
            try {
                this.cdcSizeCalculationExecutor.submit(() -> this.recalculateOverflowSize());
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }

        private void recalculateOverflowSize() {
            this.rateLimiter.acquire();
            this.calculateSize();
            CommitLogSegment allocatingFrom = this.segmentManager.allocatingFrom();
            if (allocatingFrom.getCDCState() == CommitLogSegment.CDCState.FORBIDDEN) {
                this.processNewSegment(allocatingFrom);
            }
        }

        private int defaultSegmentSize() {
            return DatabaseDescriptor.getCommitLogSegmentSize();
        }

        private void calculateSize() {
            try {
                this.sizeInProgress = 0L;
                Files.walkFileTree(this.path.toPath(), this);
                this.size = this.sizeInProgress;
            }
            catch (IOException ie) {
                CommitLog.handleCommitError("Failed CDC Size Calculation", ie);
            }
        }

        @Override
        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
            this.sizeInProgress += attrs.size();
            return FileVisitResult.CONTINUE;
        }

        private void addFlushedSize(long toAdd) {
            this.size += toAdd;
        }

        private long totalCDCSizeOnDisk() {
            return this.unflushedCDCSize + this.size;
        }

        public void shutdown() {
            this.cdcSizeCalculationExecutor.shutdown();
        }
    }
}

