/*
 * Decompiled with CFR 0.152.
 */
package co.cask.tephra.persist;

import co.cask.tephra.persist.AbstractTransactionLog;
import co.cask.tephra.persist.HDFSUtil;
import co.cask.tephra.persist.TransactionEdit;
import co.cask.tephra.persist.TransactionLogReader;
import co.cask.tephra.persist.TransactionLogWriter;
import com.google.common.base.Throwables;
import java.io.EOFException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HDFSTransactionLog
extends AbstractTransactionLog {
    private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLog.class);
    private final FileSystem fs;
    private final Configuration hConf;
    private final Path logPath;

    public HDFSTransactionLog(FileSystem fs, Configuration hConf, Path logPath, long timestamp) {
        super(timestamp);
        this.fs = fs;
        this.hConf = hConf;
        this.logPath = logPath;
    }

    @Override
    protected TransactionLogWriter createWriter() throws IOException {
        return new LogWriter(this.fs, this.hConf, this.logPath);
    }

    @Override
    public String getName() {
        return this.logPath.getName();
    }

    @Override
    public TransactionLogReader getReader() throws IOException {
        FileStatus status2 = this.fs.getFileStatus(this.logPath);
        long length = status2.getLen();
        LogReader reader = null;
        if (length <= 0L) {
            LOG.warn("File " + this.logPath + " might be still open, length is 0");
        }
        HDFSUtil hdfsUtil = new HDFSUtil();
        hdfsUtil.recoverFileLease(this.fs, this.logPath, this.hConf);
        try {
            FileStatus newStatus = this.fs.getFileStatus(this.logPath);
            LOG.info("New file size for " + this.logPath + " is " + newStatus.getLen());
            SequenceFile.Reader fileReader = new SequenceFile.Reader(this.fs, this.logPath, this.hConf);
            reader = new LogReader(fileReader);
        }
        catch (EOFException e) {
            if (length <= 0L) {
                LOG.warn("Could not open " + this.logPath + " for reading. File is empty", (Throwable)e);
                return null;
            }
            return null;
        }
        return reader;
    }

    private static final class LogReader
    implements TransactionLogReader {
        private boolean closed;
        private SequenceFile.Reader reader;
        private LongWritable key = new LongWritable();

        public LogReader(SequenceFile.Reader reader) {
            this.reader = reader;
        }

        @Override
        public TransactionEdit next() {
            try {
                return this.next(new TransactionEdit());
            }
            catch (IOException ioe) {
                throw Throwables.propagate((Throwable)ioe);
            }
        }

        @Override
        public TransactionEdit next(TransactionEdit reuse) throws IOException {
            if (this.closed) {
                return null;
            }
            boolean successful = this.reader.next((Writable)this.key, (Writable)reuse);
            if (successful) {
                return reuse;
            }
            return null;
        }

        @Override
        public void close() throws IOException {
            if (this.closed) {
                return;
            }
            this.reader.close();
            this.closed = true;
        }
    }

    private static final class LogWriter
    implements TransactionLogWriter {
        private final SequenceFile.Writer internalWriter;

        public LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException {
            this.internalWriter = SequenceFile.createWriter((FileSystem)fs, (Configuration)hConf, (Path)logPath, LongWritable.class, TransactionEdit.class);
            LOG.info("Created a new TransactionLog writer for " + logPath);
        }

        @Override
        public void append(AbstractTransactionLog.Entry entry) throws IOException {
            this.internalWriter.append((Writable)entry.getKey(), (Writable)entry.getEdit());
        }

        @Override
        public void sync() throws IOException {
            this.internalWriter.syncFs();
        }

        @Override
        public void close() throws IOException {
            this.internalWriter.close();
        }
    }
}

