/*
 * Decompiled with CFR 0.152.
 */
package com.terracottatech.frs.log;

import com.terracottatech.frs.io.Chunk;
import com.terracottatech.frs.io.Direction;
import com.terracottatech.frs.io.IOManager;
import com.terracottatech.frs.log.ChunkProcessing;
import com.terracottatech.frs.log.LogManager;
import com.terracottatech.frs.log.LogRecord;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Formatter;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChunkExchange
implements Iterable<LogRecord>,
Future<Void> {
    private final String forceLogRegionFormat;
    private final BlockingQueue<Future<List<LogRecord>>> queue;
    private final ExecutorService chunkProcessor;
    private final IOManager io;
    private volatile boolean ioDone = false;
    private volatile int count = 0;
    private final AtomicInteger returned = new AtomicInteger(0);
    private long lastLsn = -1L;
    private long lowestLsn = -1L;
    private Exception exception;
    private Thread runner;
    private final RecordIterator master;
    private long totalRead;
    private static final Logger LOGGER = LoggerFactory.getLogger(LogManager.class);

    ChunkExchange(IOManager io, String forceLogRegionFormat, int maxQueue) {
        this.io = io;
        this.forceLogRegionFormat = forceLogRegionFormat;
        this.queue = new LinkedBlockingQueue<Future<List<LogRecord>>>(maxQueue);
        this.chunkProcessor = Executors.newCachedThreadPool(new ThreadFactory(){
            int count = 1;

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("unpack thread - " + this.count++);
                return t;
            }
        });
        this.master = new RecordIterator();
    }

    public int returned() {
        return this.returned.get();
    }

    public int count() {
        return this.count;
    }

    public synchronized long getLastLsn() throws InterruptedException {
        while (this.exception == null && this.lastLsn < 0L) {
            this.wait();
        }
        this.checkReadException();
        return this.lastLsn;
    }

    public synchronized long getLowestLsn() throws InterruptedException {
        while (this.exception == null && this.lastLsn < 0L) {
            this.wait();
        }
        this.checkReadException();
        return this.lowestLsn;
    }

    private synchronized void checkReadException() {
        if (this.exception != null) {
            throw new RuntimeException(this.exception);
        }
    }

    public synchronized void offerLsns(long lowest, long last) {
        if (this.lastLsn > 0L) {
            return;
        }
        if (last < 100L) {
            last = 99L;
        }
        this.lastLsn = last;
        this.lowestLsn = lowest;
        this.notify();
    }

    private synchronized void exceptionThrownInRecovery(Exception exp) {
        this.exception = exp;
        this.notifyAll();
    }

    void recover() {
        this.runner = new Thread(){

            @Override
            public void run() {
                ChunkExchange.this.readLoop();
            }
        };
        this.runner.setDaemon(true);
        this.runner.setName("Recovery Exchange");
        this.runner.start();
    }

    private long readLoop() {
        long waiting = 0L;
        long reading = 0L;
        long fill = 0L;
        Chunk chunk = null;
        try {
            this.io.seek(IOManager.Seek.END.getValue());
            chunk = this.io.read(Direction.REVERSE);
            long last = System.nanoTime();
            boolean first = true;
            while (chunk != null && !this.master.isDone()) {
                this.totalRead += chunk.length();
                reading += System.nanoTime() - last;
                last = System.nanoTime();
                fill += (long)this.queue.size();
                ChunkProcessing cp = new ChunkProcessing(chunk, this.forceLogRegionFormat);
                Future<List<LogRecord>> f = this.chunkProcessor.submit(cp);
                while (f != null) {
                    try {
                        this.queue.put(f);
                        f = null;
                    }
                    catch (InterruptedException ie) {
                        if (!this.master.isDone()) continue;
                        for (LogRecord l : f.get()) {
                            l.close();
                        }
                        chunk = null;
                        throw ie;
                    }
                }
                ++this.count;
                waiting += System.nanoTime() - last;
                last = System.nanoTime();
                chunk = this.io.read(Direction.REVERSE);
                if (!first) continue;
                this.offerLsns(this.io.getMinimumMarker(), this.io.getCurrentMarker());
                first = false;
            }
            if (first) {
                this.offerLsns(99L, 99L);
            }
        }
        catch (InterruptedException i) {
            if (!this.master.isDone()) {
                this.exceptionThrownInRecovery(i);
            }
        }
        catch (IOException ioe) {
            if (!this.master.isDone()) {
                this.exceptionThrownInRecovery(ioe);
            }
        }
        catch (RuntimeException t) {
            if (!this.master.isDone()) {
                this.exceptionThrownInRecovery(t);
            }
        }
        catch (Throwable t) {
            throw new AssertionError((Object)t);
        }
        finally {
            block33: {
                if (chunk != null && chunk instanceof Closeable) {
                    try {
                        ((Closeable)((Object)chunk)).close();
                    }
                    catch (IOException ioe) {
                        if (this.master.isDone()) break block33;
                        this.exceptionThrownInRecovery(ioe);
                    }
                }
            }
            this.cleanup();
            this.ioDone = true;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(new Formatter(new StringBuilder()).format("==PERFORMANCE(logread)== waiting: %.3f active: %.3f queue: %d", (double)waiting * 1.0E-6, (double)reading * 1.0E-6, this.count == 0 ? 0L : fill / (long)this.count).out().toString());
        }
        return this.totalRead;
    }

    private void cleanup() {
        try {
            this.io.seek(IOManager.Seek.BEGINNING.getValue());
            this.chunkProcessor.shutdown();
            while (!this.chunkProcessor.isTerminated()) {
                try {
                    this.chunkProcessor.awaitTermination(10L, TimeUnit.SECONDS);
                }
                catch (InterruptedException ie) {
                    LOGGER.debug("cleanup interrupted", (Throwable)ie);
                }
            }
        }
        catch (IOException ioe) {
            LOGGER.info("unable to shutdown recovery", (Throwable)ioe);
        }
    }

    long getTotalRead() {
        return this.totalRead;
    }

    private synchronized void waitForDone(long t, TimeUnit tu) throws InterruptedException {
        this.runner.join(tu.toMillis(t));
        this.master.waitForIterator();
    }

    @Override
    public boolean cancel(boolean bln) {
        this.ioDone = true;
        this.master.setDone();
        return true;
    }

    @Override
    public Void get() throws InterruptedException, ExecutionException {
        this.waitForDone(0L, TimeUnit.MILLISECONDS);
        return null;
    }

    @Override
    public Void get(long l, TimeUnit tu) throws InterruptedException, ExecutionException, TimeoutException {
        this.waitForDone(l, tu);
        return null;
    }

    @Override
    public boolean isCancelled() {
        return this.ioDone;
    }

    private void drainQueue() {
        Future result = (Future)this.queue.poll();
        while (result != null) {
            try {
                List list = (List)result.get();
                for (LogRecord lr : list) {
                    lr.close();
                }
            }
            catch (IOException ioe) {
                LOGGER.warn("possible resource leak", (Throwable)ioe);
            }
            catch (ExecutionException ex) {
                LOGGER.warn("possible resource leak", ex.getCause());
            }
            catch (InterruptedException ie) {
                LOGGER.warn("possible resource leak", (Throwable)ie);
            }
            result = (Future)this.queue.poll();
        }
    }

    @Override
    public synchronized boolean isDone() {
        return this.ioDone && this.master.isDone();
    }

    @Override
    public Iterator<LogRecord> iterator() {
        return this.master;
    }

    class RecordIterator
    implements Iterator<LogRecord> {
        long loaded = 0L;
        long unloaded = 0L;
        long recordCount = 0L;
        long recordWait = 0L;
        long recordMiss = 0L;
        long lsn;
        volatile boolean isDone = false;
        List<LogRecord> list = Collections.emptyList();

        public String toString() {
            return "RecordIterator{loaded=" + this.loaded + ", unloaded=" + this.unloaded + ", recordCount=" + this.recordCount + ", recordMiss=" + this.recordMiss + '}';
        }

        @Override
        public boolean hasNext() {
            if (this.isDone) {
                return false;
            }
            while (!(!this.list.isEmpty() || ChunkExchange.this.ioDone && ChunkExchange.this.queue.isEmpty())) {
                try {
                    Future pre = (Future)ChunkExchange.this.queue.poll(3L, TimeUnit.MILLISECONDS);
                    if (pre != null) {
                        long nano = System.nanoTime();
                        this.list = (List)pre.get();
                        this.recordWait += System.nanoTime() - nano;
                        ++this.recordCount;
                        continue;
                    }
                    ChunkExchange.this.checkReadException();
                    ++this.recordMiss;
                }
                catch (ExecutionException ex) {
                    throw new RuntimeException(ex.getCause());
                }
                catch (InterruptedException ie) {
                    throw new RuntimeException(ie);
                }
            }
            if (this.list.isEmpty() || this.list.get(0).getLsn() < ChunkExchange.this.lowestLsn) {
                this.setDone();
                return false;
            }
            return true;
        }

        @Override
        public LogRecord next() {
            if (this.isDone) {
                throw new NoSuchElementException("no more records to recover");
            }
            if (this.list.isEmpty() && !this.hasNext()) {
                throw new NoSuchElementException();
            }
            LogRecord head = this.list.remove(0);
            if (head.getLsn() < ChunkExchange.this.lowestLsn) {
                this.setDone();
                throw new NoSuchElementException("earliest valid record has been already been recovered " + head.getLsn() + " < " + ChunkExchange.this.lowestLsn);
            }
            this.lsn = head.getLsn();
            assert (this.lsn <= ChunkExchange.this.lastLsn);
            ++this.recordCount;
            return head;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        synchronized void waitForIterator() {
            try {
                while (!this.isDone) {
                    this.wait();
                }
            }
            catch (InterruptedException ie) {
                throw new RuntimeException(ie);
            }
        }

        boolean isDone() {
            return this.isDone;
        }

        synchronized void setDone() {
            ChunkExchange.this.checkReadException();
            if (ChunkExchange.this.lowestLsn >= 100L && this.lsn != ChunkExchange.this.lowestLsn) {
                throw new RuntimeException("bad recovery lowest lsn: " + ChunkExchange.this.lowestLsn + " lsn:" + this.lsn);
            }
            LOGGER.debug("lowest lsn: " + ChunkExchange.this.lowestLsn + " lsn:" + this.lsn);
            this.isDone = true;
            this.notifyAll();
            for (LogRecord lr : this.list) {
                try {
                    lr.close();
                }
                catch (IOException ioe) {
                    throw new RuntimeException(ioe);
                }
            }
            ChunkExchange.this.runner.interrupt();
            try {
                ChunkExchange.this.runner.join();
            }
            catch (InterruptedException ie) {
                throw new RuntimeException("recovery interrupted");
            }
            ChunkExchange.this.drainQueue();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(new Formatter(new StringBuilder()).format("==PERFORMANCE(readIterator)== loaded: %d unloaded: %d count: %d miss: %d avg. wait: %d", this.loaded, this.unloaded, this.recordCount, this.recordMiss, this.recordCount == 0L ? 0L : this.recordWait / this.recordCount).out().toString());
            }
        }
    }
}

