/*
 * Decompiled with CFR 0.152.
 */
package com.terracottatech.frs.log;

import com.terracottatech.frs.Snapshot;
import com.terracottatech.frs.config.Configuration;
import com.terracottatech.frs.config.FrsProperty;
import com.terracottatech.frs.io.BufferSource;
import com.terracottatech.frs.io.Chunk;
import com.terracottatech.frs.io.IOManager;
import com.terracottatech.frs.io.IOStatistics;
import com.terracottatech.frs.io.MaskingBufferSource;
import com.terracottatech.frs.io.SplittingBufferSource;
import com.terracottatech.frs.log.AtomicCommitList;
import com.terracottatech.frs.log.ChunkExchange;
import com.terracottatech.frs.log.CommitList;
import com.terracottatech.frs.log.CopyingPacker;
import com.terracottatech.frs.log.LogMachineState;
import com.terracottatech.frs.log.LogManager;
import com.terracottatech.frs.log.LogRecord;
import com.terracottatech.frs.log.LogRegionFactory;
import com.terracottatech.frs.log.LogWriteError;
import com.terracottatech.frs.log.Signature;
import com.terracottatech.frs.log.SnapshotRecord;
import com.terracottatech.frs.log.StackingCommitList;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.Collections;
import java.util.Formatter;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StagingLogManager
implements LogManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(LogManager.class);
    private IODaemon daemon;
    private volatile CommitList currentRegion;
    private final AtomicLong currentLsn = new AtomicLong(100L);
    private final AtomicLong lowestLsn = new AtomicLong(0L);
    private final AtomicLong highestOnDisk = new AtomicLong(99L);
    private Signature checksumStyle;
    private final IOManager io;
    private volatile LogMachineState state = LogMachineState.IDLE;
    private int MAX_QUEUE_SIZE;
    private int RECOVERY_QUEUE_SIZE = 64;
    private String forceLogRegionFormat;
    private ChunkExchange exchanger;
    private final BlockingQueue<WritingPackage> queue = new ArrayBlockingQueue<WritingPackage>(8);
    private BufferSource buffers;

    public StagingLogManager(IOManager io) {
        this(Signature.ADLER32, new AtomicCommitList(100L, 1024, 200), io, null);
    }

    public StagingLogManager(IOManager io, BufferSource src, Configuration config) {
        this(Signature.ADLER32, new AtomicCommitList(100L, 1024, 200), io, src);
        String checksum = config.getString(FrsProperty.IO_CHECKSUM);
        this.checksumStyle = Signature.valueOf(checksum);
        this.forceLogRegionFormat = config.getString(FrsProperty.FORCE_LOG_REGION_FORMAT);
        this.MAX_QUEUE_SIZE = config.getInt(FrsProperty.IO_COMMIT_QUEUE_SIZE);
        this.RECOVERY_QUEUE_SIZE = config.getInt(FrsProperty.IO_RECOVERY_QUEUE_SIZE);
        String commitList = config.getString(FrsProperty.IO_COMMITLIST);
        if (commitList.equals("ATOMIC")) {
            this.currentRegion = new AtomicCommitList(100L, this.MAX_QUEUE_SIZE, config.getInt(FrsProperty.IO_WAIT));
        } else if (commitList.equals("STACKING")) {
            this.currentRegion = new StackingCommitList(100L, this.MAX_QUEUE_SIZE, config.getInt(FrsProperty.IO_WAIT));
        }
    }

    public StagingLogManager(Signature check, CommitList list, IOManager io, BufferSource src) {
        this.currentRegion = list;
        this.io = io;
        this.currentLsn.set(list.getBaseLsn());
        this.checksumStyle = check;
        this.forceLogRegionFormat = (String)FrsProperty.FORCE_LOG_REGION_FORMAT.defaultValue();
        this.MAX_QUEUE_SIZE = 1024;
        this.buffers = src != null ? src : new MaskingBufferSource(new SplittingBufferSource(512, 0x1000000));
    }

    @Override
    public long currentLsn() {
        return this.currentLsn.get();
    }

    long firstCommitListLsn() {
        return this.currentRegion.getBaseLsn();
    }

    @Override
    public void updateLowestLsn(long lsn) {
        long cl = this.lowestLsn.get();
        long onDisk = this.highestOnDisk.get();
        if (this.exchanger == null || !this.exchanger.isDone()) {
            throw new AssertionError((Object)"cannot update lowest lsn until recovery is finished");
        }
        if (!this.state.acceptRecords()) {
            return;
        }
        if (lsn > onDisk) {
            lsn = onDisk;
        }
        if (lsn > cl) {
            try {
                if (this.lowestLsn.compareAndSet(cl, lsn)) {
                    this.io.setMinimumMarker(lsn);
                    this.io.clean(0L);
                }
            }
            catch (ClosedByInterruptException in) {
                LOGGER.debug("cleaning was interrupted", (Throwable)in);
                Thread.currentThread().interrupt();
            }
            catch (IOException ioe) {
                throw new RuntimeException(ioe);
            }
        }
    }

    @Override
    public long lowestLsn() {
        return this.lowestLsn.get();
    }

    private synchronized void enterNormalState(long lastLsn, long lowest) {
        if (!this.state.isBootstrapping()) {
            return;
        }
        this.currentLsn.set(lastLsn + 1L);
        this.highestOnDisk.set(lastLsn);
        if (lowest < 100L) {
            lowest = 100L;
        }
        this.lowestLsn.set(lowest);
        this.currentRegion = this.currentRegion.create(lastLsn + 1L);
        this.state = this.state.progress();
        this.notifyAll();
    }

    private synchronized void waitForNormalState() throws InterruptedException {
        while (this.state.starting()) {
            this.wait();
        }
        if (!this.state.acceptRecords()) {
            throw new RuntimeException("normal state not achieved");
        }
    }

    private Future<Void> recover() {
        ChunkExchange ex = new ChunkExchange(this.io, this.forceLogRegionFormat, this.RECOVERY_QUEUE_SIZE);
        LOGGER.debug("recovery queue size: " + this.RECOVERY_QUEUE_SIZE);
        ex.recover();
        return ex;
    }

    @Override
    public Iterator<LogRecord> startup() {
        if (this.state != LogMachineState.IDLE) {
            this.state = this.state.reset();
        }
        this.state = this.state.bootstrap();
        try {
            if (this.state == LogMachineState.BOOTSTRAP) {
                this.exchanger = (ChunkExchange)this.recover();
                try {
                    this.enterNormalState(this.exchanger.getLastLsn(), this.exchanger.getLowestLsn());
                }
                catch (InterruptedException ioe) {
                    throw new RuntimeException(ioe);
                }
                Iterator<LogRecord> iterator = this.exchanger.iterator();
                return iterator;
            }
            Iterator<LogRecord> iterator = null;
            return iterator;
        }
        finally {
            this.daemon = new IODaemon();
            this.daemon.start();
        }
    }

    @Override
    public void shutdown() {
        block16: {
            try {
                this.state = this.state.shutdown();
                if (this.state == LogMachineState.SHUTDOWN) break block16;
                try {
                    this.io.close();
                }
                catch (IOException ioe) {
                    LOGGER.error("error closing io", (Throwable)ioe);
                }
                return;
            }
            catch (Throwable t) {
                try {
                    this.io.close();
                }
                catch (IOException ioe) {
                    LOGGER.error("error closing io", (Throwable)ioe);
                }
                LOGGER.error("was in " + (Object)((Object)this.state) + " at shutdown", t);
                return;
            }
        }
        CommitList current = this.currentRegion;
        current.close(this.currentLsn.get() - 1L);
        this.queueEmptyWritingPackageForShutdown();
        try {
            this.daemon.join();
        }
        catch (InterruptedException ie) {
            LOGGER.error("error waiting for write thread to close", (Throwable)ie);
        }
        if (this.daemon.isAlive()) {
            throw new AssertionError();
        }
        if (!this.state.isErrorState() && this.currentLsn.get() - 1L != this.highestOnDisk.get()) {
            throw new AssertionError();
        }
        try {
            this.exchanger.cancel(true);
            this.exchanger.get();
        }
        catch (ExecutionException ee) {
            LOGGER.error("error during shutdown", (Throwable)ee);
        }
        catch (InterruptedException ie) {
            LOGGER.error("error during shutdown", (Throwable)ie);
        }
        catch (RuntimeException re) {
            LOGGER.error("error during shutdown", (Throwable)re);
        }
        try {
            this.io.close();
        }
        catch (IOException ioe) {
            LOGGER.error("error closing io", (Throwable)ioe);
        }
        this.state = this.state.idle();
    }

    private void queueEmptyWritingPackageForShutdown() {
        this.queue.offer(new WritingPackage(new CommitList(){

            @Override
            public Future<Void> getWriteFuture() {
                return CompletableFuture.completedFuture(null);
            }

            @Override
            public Iterator<LogRecord> iterator() {
                return Collections.EMPTY_LIST.iterator();
            }

            @Override
            public boolean append(LogRecord record, boolean sync) {
                throw new UnsupportedOperationException();
            }

            @Override
            public boolean close(long lsn) {
                throw new UnsupportedOperationException();
            }

            @Override
            public void waitForContiguous() throws InterruptedException {
                throw new UnsupportedOperationException();
            }

            @Override
            public CommitList next() {
                throw new UnsupportedOperationException();
            }

            @Override
            public boolean isSyncRequested() {
                throw new UnsupportedOperationException();
            }

            @Override
            public boolean isEmpty() {
                return true;
            }

            @Override
            public long getEndLsn() {
                throw new UnsupportedOperationException();
            }

            @Override
            public long getBaseLsn() {
                throw new UnsupportedOperationException();
            }

            @Override
            public void written() {
                throw new UnsupportedOperationException();
            }

            @Override
            public void exceptionThrown(Exception exp) {
                throw new UnsupportedOperationException();
            }

            @Override
            public CommitList create(long baseLsn) {
                throw new UnsupportedOperationException();
            }
        }, null));
    }

    private CommitList _append(LogRecord record, boolean sync) {
        if (!this.state.acceptRecords()) {
            throw new LogWriteError();
        }
        while (this.state.starting()) {
            try {
                this.waitForNormalState();
            }
            catch (InterruptedException it) {
                throw new RuntimeException(it);
            }
        }
        CommitList mine = this.currentRegion;
        long lsn = this.currentLsn.getAndIncrement();
        try {
            record.updateLsn(lsn);
        }
        catch (Error e) {
            throw e;
        }
        finally {
            int spincount = 0;
            int waitspin = 2 + Math.round((float)(Math.random() * 1024.0));
            while (!mine.append(record, sync)) {
                if (spincount++ > waitspin) {
                    this.futureWait(mine);
                    waitspin += Math.round((float)(Math.random() * 512.0));
                }
                mine = mine.next();
            }
        }
        return mine;
    }

    private void futureWait(CommitList mine) {
        try {
            mine.getWriteFuture().get();
        }
        catch (InterruptedException interruptedException) {
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
    }

    @Override
    public Future<Void> append(LogRecord record) {
        return this._append(record, false).getWriteFuture();
    }

    @Override
    public Future<Void> appendAndSync(LogRecord record) {
        return this._append(record, true).getWriteFuture();
    }

    @Override
    public Snapshot snapshot() throws ExecutionException {
        SnapshotRecord snapshot = new SnapshotRecord();
        boolean interrupted = false;
        Future<Void> write = this.append(snapshot);
        while (true) {
            try {
                write.get();
            }
            catch (InterruptedException e) {
                interrupted = true;
                continue;
            }
            break;
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
            return null;
        }
        return snapshot;
    }

    @Override
    public Future<Snapshot> snapshotAsync() {
        SnapshotRecord snapshot = new SnapshotRecord();
        Future<Void> write = this.append(snapshot);
        return new SnapshotFuture(snapshot, write);
    }

    @Override
    public IOStatistics getIOStatistics() {
        try {
            return this.io.getStatistics();
        }
        catch (IOException ioe) {
            LOGGER.error("error collecting io statistics", (Throwable)ioe);
            return new IOStatistics(){

                @Override
                public long getTotalAvailable() {
                    return 0L;
                }

                @Override
                public long getTotalUsed() {
                    return 0L;
                }

                @Override
                public long getTotalWritten() {
                    return 0L;
                }

                @Override
                public long getTotalRead() {
                    return 0L;
                }

                @Override
                public long getLiveSize() {
                    return 0L;
                }

                @Override
                public long getExpiredSize() {
                    return 0L;
                }
            };
        }
    }

    static /* synthetic */ Signature access$000(StagingLogManager x0) {
        return x0.checksumStyle;
    }

    static /* synthetic */ String access$100(StagingLogManager x0) {
        return x0.forceLogRegionFormat;
    }

    private static class SnapshotFuture
    implements Future<Snapshot> {
        private final Snapshot snapshot;
        private final Future<Void> write;

        private SnapshotFuture(Snapshot snapshot, Future<Void> write) {
            this.snapshot = snapshot;
            this.write = write;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return this.write.isDone();
        }

        @Override
        public Snapshot get() throws InterruptedException, ExecutionException {
            this.write.get();
            return this.snapshot;
        }

        @Override
        public Snapshot get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            this.write.get(timeout, unit);
            return this.write.isDone() ? this.snapshot : null;
        }
    }

    static class WritingPackage
    implements Runnable {
        private final CommitList list;
        private volatile LogRegionFactory factory;
        private volatile Chunk data;

        WritingPackage(CommitList list, LogRegionFactory factory) {
            this.list = list;
            this.factory = factory;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (this.data == null) {
                CommitList commitList = this.list;
                synchronized (commitList) {
                    if (this.data == null) {
                        this.data = this.factory.pack(this.list);
                    }
                }
            }
        }

        boolean isEmpty() {
            return this.list.isEmpty();
        }

        public long endLsn() {
            return this.list.getEndLsn();
        }

        public long baseLsn() {
            return this.list.getBaseLsn();
        }

        public boolean doSync() {
            return this.list.isSyncRequested();
        }

        public void written() {
            this.list.written();
        }

        public Chunk take() {
            try {
                this.run();
                assert (this.data != null);
                Chunk chunk = this.data;
                return chunk;
            }
            finally {
                this.data = null;
                this.factory = null;
            }
        }
    }

    private class IODaemon
    extends Thread {
        long waiting;
        long writing;
        long written;

        IODaemon() {
            this.setDaemon(true);
            this.setName("IO - " + StagingLogManager.this.io.toString());
            this.setPriority(10);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            WriteQueuer queuer = new WriteQueuer();
            queuer.start();
            while (StagingLogManager.this.state.acceptRecords() || StagingLogManager.this.currentLsn.get() - 1L != StagingLogManager.this.highestOnDisk.get()) {
                long start = System.nanoTime();
                try {
                    WritingPackage packer = (WritingPackage)StagingLogManager.this.queue.take();
                    long taken = System.nanoTime();
                    try {
                        this.waiting += taken - start;
                        if (packer.isEmpty()) {
                            StagingLogManager.this.io.sync();
                            continue;
                        }
                        Chunk c = packer.take();
                        if (StagingLogManager.this.io.getCurrentMarker() + 1L != packer.baseLsn()) {
                            throw new AssertionError((Object)("lsns not sequenced " + StagingLogManager.this.io.getCurrentMarker() + 1 + " != " + packer.baseLsn()));
                        }
                        this.written += StagingLogManager.this.io.write(c, packer.endLsn());
                        if (c instanceof Closeable) {
                            ((Closeable)((Object)c)).close();
                        }
                        if (packer.doSync()) {
                            StagingLogManager.this.io.sync();
                        }
                        StagingLogManager.this.highestOnDisk.set(packer.endLsn());
                        packer.written();
                    }
                    catch (Exception e) {
                        packer.list.exceptionThrown(e);
                        StagingLogManager.this.state = StagingLogManager.this.state.checkException(e);
                        break;
                    }
                    finally {
                        this.writing += System.nanoTime() - taken;
                    }
                }
                catch (InterruptedException ie) {
                    StagingLogManager.this.state = StagingLogManager.this.state.checkException(ie);
                }
            }
            try {
                if (StagingLogManager.this.state.isErrorState()) {
                    long floatingLsn = StagingLogManager.this.highestOnDisk.get();
                    while (StagingLogManager.this.currentLsn.get() - 1L != floatingLsn) {
                        CommitList next = ((WritingPackage)StagingLogManager.this.queue.take()).list;
                        floatingLsn = next.getEndLsn();
                    }
                } else if (!StagingLogManager.this.queue.isEmpty()) {
                    while (!StagingLogManager.this.queue.isEmpty()) {
                        if (!((WritingPackage)StagingLogManager.this.queue.poll()).list.isEmpty()) {
                            throw new AssertionError((Object)"non-empty queue");
                        }
                    }
                }
                queuer.done();
                queuer.join();
            }
            catch (InterruptedException ie) {
                throw new RuntimeException(ie);
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(new Formatter(new StringBuilder()).format("==PERFORMANCE(logwrite)== waiting: %.3f active: %.3f written: %d", (double)this.waiting * 1.0E-6, (double)this.writing * 1.0E-6, this.written).out().toString());
                LOGGER.debug("==PERFORMANCE(memory)==" + StagingLogManager.this.buffers.toString());
            }
        }
    }

    private class WriteQueuer
    extends Thread {
        long waiting;
        long processing;
        volatile boolean stopped = false;
        private final LogRegionFactory regionFactory = new CopyingPacker(StagingLogManager.access$000(StagingLogManager.this), StagingLogManager.access$100(StagingLogManager.this), StagingLogManager.access$200(StagingLogManager.this));
        private final ExecutorService asyncPacker = Executors.newCachedThreadPool(new ThreadFactory(){
            int count = 1;

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("async packing thread - " + this.count++);
                t.setDaemon(true);
                return t;
            }
        });

        WriteQueuer() {
            this.setDaemon(true);
            this.setName("Write Queue Manager - " + StagingLogManager.this.io.toString());
            this.setPriority(10);
        }

        void done() {
            this.stopped = true;
            this.interrupt();
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            long last = System.nanoTime();
            long turns = 0L;
            long size = 0L;
            int fill = 0;
            try {
                while (!this.stopped) {
                    CommitList oldRegion = StagingLogManager.this.currentRegion;
                    try {
                        WritingPackage wp;
                        if (!StagingLogManager.this.state.acceptRecords() && StagingLogManager.this.currentLsn.get() - 1L >= oldRegion.getBaseLsn()) {
                            oldRegion.close(StagingLogManager.this.currentLsn.get() - 1L);
                        }
                        long mark = System.nanoTime();
                        this.processing += mark - last;
                        oldRegion.waitForContiguous();
                        last = System.nanoTime();
                        this.waiting += last - mark;
                        last = System.nanoTime();
                        StagingLogManager.this.currentRegion = oldRegion.next();
                        if (oldRegion.isEmpty()) {
                            oldRegion.written();
                            if (StagingLogManager.this.state.acceptRecords()) continue;
                        }
                        if ((wp = new WritingPackage(oldRegion, this.regionFactory)).isEmpty()) continue;
                        wp.run();
                        while (!StagingLogManager.this.queue.offer(wp, 200L, TimeUnit.MICROSECONDS) && !this.stopped) {
                        }
                        size += (long)StagingLogManager.this.queue.size();
                        int lf = (int)(oldRegion.getEndLsn() - oldRegion.getBaseLsn());
                        fill += lf;
                        ++turns;
                        if (!StagingLogManager.this.state.acceptRecords() || !oldRegion.isSyncRequested()) continue;
                        try {
                            oldRegion.getWriteFuture().get();
                        }
                        catch (ExecutionException executionException) {}
                    }
                    catch (InterruptedException ie) {
                        oldRegion.exceptionThrown(ie);
                        StagingLogManager.this.state = StagingLogManager.this.state.checkException(ie);
                    }
                    catch (Exception t) {
                        oldRegion.exceptionThrown(t);
                        StagingLogManager.this.state = StagingLogManager.this.state.checkException(t);
                    }
                }
                this.asyncPacker.shutdown();
                if (turns == 0L) {
                    turns = 1L;
                }
                if (!LOGGER.isDebugEnabled()) return;
                LOGGER.debug(new Formatter(new StringBuilder()).format("==PERFORMANCE(processing)== waiting: %.3f active: %.3f ave. queue: %d fill: %d", (double)this.waiting * 1.0E-6, (double)this.processing * 1.0E-6, size / turns, (long)fill / turns).out().toString());
                return;
            }
            catch (OutOfMemoryError oome) {
                LOGGER.error("on write queue thread", (Throwable)oome);
            }
        }
    }
}

