/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOError;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.UnserializableColumnFamilyException;
import org.apache.cassandra.db.commitlog.BatchCommitLogExecutorService;
import org.apache.cassandra.db.commitlog.CommitLogHeader;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.commitlog.ICommitLogExecutorService;
import org.apache.cassandra.db.commitlog.PeriodicCommitLogExecutorService;
import org.apache.cassandra.io.DeletionService;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLog {
    private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
    static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
    public static final CommitLog instance = new CommitLog();
    private final Deque<CommitLogSegment> segments = new ArrayDeque<CommitLogSegment>();
    private final ICommitLogExecutorService executor;
    private volatile int segmentSize = 0x8000000;

    private CommitLog() {
        try {
            DatabaseDescriptor.createAllDirectories();
            this.segmentSize = DatabaseDescriptor.getCommitLogSegmentSize();
        }
        catch (IOException e) {
            throw new IOError(e);
        }
        this.segments.add(new CommitLogSegment());
        this.executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch ? new BatchCommitLogExecutorService() : new PeriodicCommitLogExecutorService(this);
    }

    public void resetUnsafe() {
        this.segments.clear();
        this.segments.add(new CommitLogSegment());
    }

    private boolean manages(String name) {
        for (CommitLogSegment segment : this.segments) {
            if (!segment.getPath().endsWith(name)) continue;
            return true;
        }
        return false;
    }

    public static void recover() throws IOException {
        String directory = DatabaseDescriptor.getCommitLogLocation();
        Object[] files = new File(directory).listFiles(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return CommitLogSegment.possibleCommitLogFile(name) && !instance.manages(name);
            }
        });
        if (files.length == 0) {
            logger.info("No commitlog files found; skipping replay");
            return;
        }
        Arrays.sort(files, new FileUtils.FileComparator());
        logger.info("Replaying " + StringUtils.join((Object[])files, (String)", "));
        CommitLog.recover((File[])files);
        for (Object f : files) {
            FileUtils.delete(CommitLogHeader.getHeaderPathFromSegmentPath(((File)f).getAbsolutePath()));
            if (((File)f).delete()) continue;
            logger.error("Unable to remove " + f + "; you should remove it manually or next restart will replay it again (harmless, but time-consuming)");
        }
        logger.info("Log replay complete");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void recover(File[] clogs) throws IOException {
        HashSet<Table> tablesRecovered = new HashSet<Table>();
        ArrayList futures = new ArrayList();
        byte[] bytes = new byte[4096];
        HashMap<Integer, AtomicInteger> invalidMutations = new HashMap<Integer, AtomicInteger>();
        block10: for (File file : clogs) {
            int bufferSize = (int)Math.min(Math.max(file.length(), 1L), 0x2000000L);
            BufferedRandomAccessFile reader = new BufferedRandomAccessFile(new File(file.getAbsolutePath()), "r", bufferSize, true);
            try {
                CommitLogHeader clHeader = null;
                int replayPosition = 0;
                String headerPath = CommitLogHeader.getHeaderPathFromSegmentPath(file.getAbsolutePath());
                try {
                    clHeader = CommitLogHeader.readCommitLogHeader(headerPath);
                    replayPosition = clHeader.getReplayPosition();
                }
                catch (IOException ioe) {
                    logger.info(headerPath + " incomplete, missing or corrupt.  Everything is ok, don't panic.  CommitLog will be replayed from the beginning");
                    logger.debug("exception was", (Throwable)ioe);
                }
                if (replayPosition < 0 || (long)replayPosition > reader.length()) {
                    logger.debug("skipping replay of fully-flushed {}", (Object)file);
                    continue;
                }
                reader.seek(replayPosition);
                if (logger.isDebugEnabled()) {
                    logger.debug("Replaying " + file + " starting at " + reader.getFilePointer());
                }
                while (!reader.isEOF()) {
                    long claimedCRC32;
                    int serializedSize;
                    if (logger.isDebugEnabled()) {
                        logger.debug("Reading mutation at " + reader.getFilePointer());
                    }
                    CRC32 checksum = new CRC32();
                    try {
                        serializedSize = reader.readInt();
                        if (serializedSize < 10) {
                            continue block10;
                        }
                        long claimedSizeChecksum = reader.readLong();
                        checksum.update(serializedSize);
                        if (checksum.getValue() != claimedSizeChecksum) {
                            continue block10;
                        }
                        if (serializedSize > bytes.length) {
                            bytes = new byte[(int)(1.2 * (double)serializedSize)];
                        }
                        reader.readFully(bytes, 0, serializedSize);
                        claimedCRC32 = reader.readLong();
                    }
                    catch (EOFException eof) {
                        continue block10;
                    }
                    checksum.update(bytes, 0, serializedSize);
                    if (claimedCRC32 != checksum.getValue()) continue;
                    ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes, 0, serializedSize);
                    RowMutation rm = null;
                    try {
                        rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn));
                    }
                    catch (UnserializableColumnFamilyException ex) {
                        AtomicInteger i = (AtomicInteger)invalidMutations.get(ex.cfId);
                        if (i == null) {
                            i = new AtomicInteger(1);
                            invalidMutations.put(ex.cfId, i);
                            continue;
                        }
                        i.incrementAndGet();
                        continue;
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug(String.format("replaying mutation for %s.%s: %s", rm.getTable(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies(), (String)", ") + "}"));
                    }
                    Table table = Table.open(rm.getTable());
                    tablesRecovered.add(table);
                    final ArrayList<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
                    final long entryLocation = reader.getFilePointer();
                    final CommitLogHeader finalHeader = clHeader;
                    final RowMutation frm = rm;
                    WrappedRunnable runnable = new WrappedRunnable(){

                        @Override
                        public void runMayThrow() throws IOException {
                            RowMutation newRm = new RowMutation(frm.getTable(), frm.key());
                            for (ColumnFamily columnFamily : columnFamilies) {
                                if (CFMetaData.getCF(columnFamily.id()) == null || finalHeader != null && (!finalHeader.isDirty(columnFamily.id()) || entryLocation <= (long)finalHeader.getPosition(columnFamily.id()))) continue;
                                newRm.add(columnFamily);
                            }
                            if (!newRm.isEmpty()) {
                                Table.open(newRm.getTable()).apply(newRm, null, false);
                            }
                        }
                    };
                    futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
                    if (futures.size() <= 1024) continue;
                    FBUtilities.waitOnFutures(futures);
                    futures.clear();
                }
            }
            finally {
                FileUtils.closeQuietly(reader);
                logger.info("Finished reading " + file);
            }
        }
        for (Map.Entry entry : invalidMutations.entrySet()) {
            logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %d", ((AtomicInteger)entry.getValue()).intValue(), entry.getKey()));
        }
        FBUtilities.waitOnFutures(futures);
        logger.debug("Finished waiting on mutations from recovery");
        futures.clear();
        for (Table table : tablesRecovered) {
            futures.addAll(table.flush());
        }
        FBUtilities.waitOnFutures(futures);
    }

    private CommitLogSegment currentSegment() {
        return this.segments.getLast();
    }

    public CommitLogSegment.CommitLogContext getContext() {
        Callable<CommitLogSegment.CommitLogContext> task = new Callable<CommitLogSegment.CommitLogContext>(){

            @Override
            public CommitLogSegment.CommitLogContext call() throws Exception {
                return CommitLog.this.currentSegment().getContext();
            }
        };
        try {
            return this.executor.submit(task).get();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void add(RowMutation rowMutation, Object serializedRow) throws IOException {
        this.executor.add(new LogRecordAdder(rowMutation, serializedRow));
    }

    public void discardCompletedSegments(final Integer cfId, final CommitLogSegment.CommitLogContext context) throws IOException {
        Callable task = new Callable(){

            public Object call() throws IOException {
                CommitLog.this.discardCompletedSegmentsInternal(context, cfId);
                return null;
            }
        };
        try {
            this.executor.submit(task).get();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private void discardCompletedSegmentsInternal(CommitLogSegment.CommitLogContext context, Integer id) throws IOException {
        if (logger.isDebugEnabled()) {
            logger.debug("discard completed log segments for " + context + ", column family " + id + ".");
        }
        Iterator<CommitLogSegment> iter = this.segments.iterator();
        while (iter.hasNext()) {
            CommitLogSegment segment = iter.next();
            CommitLogHeader header = segment.getHeader();
            if (segment.equals(context.getSegment())) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Marking replay position " + context.position + " on commit log " + segment);
                }
                header.turnOn(id, context.position);
                segment.writeHeader();
                break;
            }
            header.turnOff(id);
            if (header.isSafeToDelete() && iter.hasNext()) {
                logger.info("Discarding obsolete commit log:" + segment);
                segment.close();
                DeletionService.executeDelete(segment.getHeaderPath());
                DeletionService.executeDelete(segment.getPath());
                iter.remove();
                continue;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Not safe to delete commit log " + segment + "; dirty is " + header.dirtyString());
            }
            segment.writeHeader();
        }
    }

    void sync() throws IOException {
        this.currentSegment().sync();
    }

    public void shutdownBlocking() throws InterruptedException {
        this.executor.shutdown();
        this.executor.awaitTermination();
    }

    class LogRecordAdder
    implements Callable,
    Runnable {
        final RowMutation rowMutation;
        final Object serializedRow;

        LogRecordAdder(RowMutation rm, Object serializedRow) {
            this.rowMutation = rm;
            this.serializedRow = serializedRow;
        }

        @Override
        public void run() {
            try {
                CommitLog.this.currentSegment().write(this.rowMutation, this.serializedRow);
                if (CommitLog.this.currentSegment().length() >= (long)CommitLog.this.segmentSize) {
                    CommitLog.this.sync();
                    CommitLog.this.segments.add(new CommitLogSegment());
                }
            }
            catch (IOException e) {
                throw new IOError(e);
            }
        }

        public Object call() throws Exception {
            this.run();
            return null;
        }
    }
}

