/*
 * Decompiled with CFR 0.152.
 */
package com.terracottatech.frs.recovery;

import com.terracottatech.frs.DeleteFilter;
import com.terracottatech.frs.action.Action;
import com.terracottatech.frs.action.ActionManager;
import com.terracottatech.frs.config.Configuration;
import com.terracottatech.frs.config.FrsProperty;
import com.terracottatech.frs.log.LogManager;
import com.terracottatech.frs.log.LogRecord;
import com.terracottatech.frs.recovery.AbstractFilter;
import com.terracottatech.frs.recovery.Filter;
import com.terracottatech.frs.recovery.RecoveryException;
import com.terracottatech.frs.recovery.RecoveryListener;
import com.terracottatech.frs.recovery.RecoveryManager;
import com.terracottatech.frs.recovery.SkipsFilter;
import com.terracottatech.frs.transaction.TransactionFilter;
import com.terracottatech.frs.util.NullFuture;
import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecoveryManagerImpl
implements RecoveryManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(RecoveryManager.class);
    private final LogManager logManager;
    private final ActionManager actionManager;
    private final boolean compressedSkipSet;
    private final ReplayFilter replayFilter;
    private final Configuration configuration;

    public RecoveryManagerImpl(LogManager logManager, ActionManager actionManager, Configuration configuration) {
        this.logManager = logManager;
        this.actionManager = actionManager;
        this.compressedSkipSet = configuration.getBoolean(FrsProperty.RECOVERY_COMPRESSED_SKIP_SET);
        this.replayFilter = new ReplayFilter(configuration.getInt(FrsProperty.RECOVERY_MIN_THREAD_COUNT), configuration.getInt(FrsProperty.RECOVERY_MAX_THREAD_COUNT), configuration.getInt(FrsProperty.RECOVERY_REPLAY_BATCH_SIZE), configuration.getDBHome());
        this.configuration = configuration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Future<Void> recover(RecoveryListener ... listeners) throws RecoveryException, InterruptedException {
        Iterator<LogRecord> i = this.logManager.startup();
        long filter = 0L;
        long put = 0L;
        long ntime = System.nanoTime();
        DeleteFilter deleteFilter = new DeleteFilter(this.replayFilter);
        TransactionFilter transactionFilter = new TransactionFilter(deleteFilter);
        SkipsFilter skipsFilter = new SkipsFilter(transactionFilter, this.logManager.lowestLsn(), this.compressedSkipSet);
        ProgressLoggingFilter progressLoggingFilter = new ProgressLoggingFilter(this.replayFilter.dbHome, skipsFilter, this.logManager.lowestLsn());
        long lastRecoveredLsn = Long.MAX_VALUE;
        try {
            while (i.hasNext()) {
                LogRecord logRecord = i.next();
                Action action = this.actionManager.extract(logRecord);
                long ctime = System.nanoTime();
                filter += ctime - ntime;
                progressLoggingFilter.filter(action, logRecord.getLsn(), false);
                ntime = System.nanoTime();
                put += ntime - ctime;
                this.replayFilter.checkError();
                lastRecoveredLsn = logRecord.getLsn();
            }
        }
        finally {
            this.replayFilter.finish();
            this.replayFilter.checkError();
        }
        if (lastRecoveredLsn != Long.MAX_VALUE && lastRecoveredLsn > this.logManager.lowestLsn()) {
            throw new RecoveryException("Recovery is incomplete for log " + this.configuration.getDBHome() + ". Files may be missing.");
        }
        for (RecoveryListener listener : listeners) {
            listener.recovered();
        }
        LOGGER.debug("count " + this.replayFilter.getReplayCount() + " put " + put + " filter " + filter);
        LOGGER.debug(skipsFilter.toString());
        return new NullFuture();
    }

    private static class ReplayElement {
        private final Action action;
        private final long lsn;

        private ReplayElement(Action action, long lsn) {
            this.action = action;
            this.lsn = lsn;
        }

        void replay() {
            this.action.replay(this.lsn);
        }
    }

    private static class ReplayFilter
    implements Filter<Action>,
    ThreadFactory {
        private final AtomicInteger threadId = new AtomicInteger();
        private final AtomicReference<Throwable> firstError = new AtomicReference();
        private final ExecutorService executorService;
        private final File dbHome;
        private final int replayBatchSize;
        private long replayed = 0L;
        private List<ReplayElement> batch;

        ReplayFilter(int minThreadCount, int maxThreadCount, int replayBatchSize, File dbHome) {
            this.executorService = new ThreadPoolExecutor(minThreadCount, maxThreadCount, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), this, new ThreadPoolExecutor.CallerRunsPolicy());
            this.dbHome = dbHome;
            this.replayBatchSize = replayBatchSize;
            this.batch = new ArrayList<ReplayElement>(replayBatchSize);
        }

        public long getReplayCount() {
            return this.replayed;
        }

        @Override
        public boolean filter(Action element, long lsn, boolean filtered) {
            if (filtered) {
                return false;
            }
            ++this.replayed;
            this.batch.add(new ReplayElement(element, lsn));
            if (this.batch.size() >= this.replayBatchSize) {
                this.submitJob();
            }
            return true;
        }

        private void submitJob() {
            if (this.batch.isEmpty()) {
                return;
            }
            final List<ReplayElement> go = this.batch;
            this.batch = new ArrayList<ReplayElement>(this.replayBatchSize);
            this.executorService.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        for (ReplayElement a : go) {
                            a.replay();
                        }
                    }
                    catch (Throwable t) {
                        ReplayFilter.this.firstError.compareAndSet(null, t);
                        LOGGER.error("Error replaying record: " + t.getMessage());
                    }
                }
            });
        }

        void checkError() throws RecoveryException {
            Throwable t = this.firstError.get();
            if (t != null) {
                throw new RecoveryException("Caught an error recovering from log at " + this.dbHome.getAbsolutePath(), t);
            }
        }

        void finish() throws InterruptedException {
            this.submitJob();
            this.executorService.shutdown();
            this.executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("Replay Thread - " + this.threadId.getAndIncrement());
            t.setDaemon(true);
            return t;
        }
    }

    private static class ProgressLoggingFilter
    extends AbstractFilter<Action> {
        private final long lowestLsn;
        private int position = 10;
        private long count = 0L;

        ProgressLoggingFilter(File home, Filter<Action> delegate, long lowestLsn) {
            super(delegate);
            LOGGER.info("Starting recovery for " + home.getAbsolutePath());
            this.lowestLsn = lowestLsn;
        }

        @Override
        public boolean filter(Action element, long lsn, boolean filtered) {
            if (this.count-- <= 0L && this.position > 0) {
                LOGGER.info("Recovery progress " + (10 - this.position) * 10 + "%");
                this.count = (lsn - this.lowestLsn) / (long)this.position--;
            }
            if (lsn == this.lowestLsn) {
                LOGGER.info("Recovery progress 100%");
            }
            return this.delegate(element, lsn, filtered);
        }
    }
}

