/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.common.io;

import java.io.FilterInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.BlockInfo;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Public
public abstract class BinaryInputFormat<T>
extends FileInputFormat<T>
implements CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(BinaryInputFormat.class);
    public static final long NATIVE_BLOCK_SIZE = Long.MIN_VALUE;
    private long blockSize = Long.MIN_VALUE;
    private transient DataInputViewStreamWrapper dataInputStream;
    private transient BlockInfo blockInfo;
    private transient BlockBasedInput blockBasedInput = null;
    private long readRecords = 0L;

    @Override
    public void configure(Configuration parameters) {
        super.configure(parameters);
    }

    public void setBlockSize(long blockSize) {
        if (blockSize < 1L && blockSize != Long.MIN_VALUE) {
            throw new IllegalArgumentException("The block size parameter must be set and larger than 0.");
        }
        if (blockSize > Integer.MAX_VALUE) {
            throw new UnsupportedOperationException("Currently only block sizes up to Integer.MAX_VALUE are supported");
        }
        this.blockSize = blockSize;
    }

    public long getBlockSize() {
        return this.blockSize;
    }

    @Override
    public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
        List<FileStatus> files = this.getFiles();
        ArrayList<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(minNumSplits);
        for (FileStatus file : files) {
            FileSystem fs = file.getPath().getFileSystem();
            long blockSize = this.blockSize == Long.MIN_VALUE ? fs.getDefaultBlockSize() : this.blockSize;
            long length = file.getLen();
            for (long pos = 0L; pos < length; pos += blockSize) {
                long remainingLength = Math.min(pos + blockSize, length) - pos;
                Object[] blocks = fs.getFileBlockLocations(file, pos, remainingLength);
                Arrays.sort(blocks);
                inputSplits.add(new FileInputSplit(inputSplits.size(), file.getPath(), pos, remainingLength, blocks[0].getHosts()));
            }
        }
        if (inputSplits.size() < minNumSplits) {
            LOG.warn(String.format("With the given block size %d, the files %s cannot be split into %d blocks. Filling up with empty splits...", this.blockSize, Arrays.toString(this.getFilePaths()), minNumSplits));
            FileStatus last = files.get(files.size() - 1);
            BlockLocation[] blocks = last.getPath().getFileSystem().getFileBlockLocations(last, 0L, last.getLen());
            for (int index = inputSplits.size(); index < minNumSplits; ++index) {
                inputSplits.add(new FileInputSplit(index, last.getPath(), last.getLen(), 0L, blocks[0].getHosts()));
            }
        }
        return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
    }

    protected List<FileStatus> getFiles() throws IOException {
        ArrayList<FileStatus> files = new ArrayList<FileStatus>();
        for (Path filePath : this.getFilePaths()) {
            FileSystem fs = filePath.getFileSystem();
            FileStatus pathFile = fs.getFileStatus(filePath);
            if (pathFile.isDir()) {
                FileStatus[] partials;
                for (FileStatus partial : partials = fs.listStatus(filePath)) {
                    if (partial.isDir()) continue;
                    files.add(partial);
                }
                continue;
            }
            files.add(pathFile);
        }
        return files;
    }

    @Override
    public SequentialStatistics getStatistics(BaseStatistics cachedStats) {
        block6: {
            FileInputFormat.FileBaseStatistics cachedFileStats = cachedStats instanceof FileInputFormat.FileBaseStatistics ? (FileInputFormat.FileBaseStatistics)cachedStats : null;
            try {
                ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>(1);
                FileInputFormat.FileBaseStatistics stats = this.getFileStats(cachedFileStats, this.getFilePaths(), allFiles);
                if (stats == null) {
                    return null;
                }
                if (stats instanceof SequentialStatistics) {
                    return (SequentialStatistics)stats;
                }
                return this.createStatistics(allFiles, stats);
            }
            catch (IOException ioex) {
                if (LOG.isWarnEnabled()) {
                    LOG.warn(String.format("Could not determine complete statistics for files '%s' due to an I/O error", Arrays.toString(this.getFilePaths())), (Throwable)ioex);
                }
            }
            catch (Throwable t) {
                if (!LOG.isErrorEnabled()) break block6;
                LOG.error(String.format("Unexpected problem while getting the file statistics for files '%s'", Arrays.toString(this.getFilePaths())), t);
            }
        }
        return null;
    }

    protected FileInputSplit[] getInputSplits() throws IOException {
        return this.createInputSplits(0);
    }

    public BlockInfo createBlockInfo() {
        return new BlockInfo();
    }

    private BlockInfo createAndReadBlockInfo() throws IOException {
        BlockInfo blockInfo = new BlockInfo();
        if (this.splitLength > (long)blockInfo.getInfoSize()) {
            this.stream.seek(this.splitStart + this.splitLength - (long)blockInfo.getInfoSize());
            blockInfo.read(new DataInputViewStreamWrapper(this.stream));
        }
        return blockInfo;
    }

    protected SequentialStatistics createStatistics(List<FileStatus> files, FileInputFormat.FileBaseStatistics stats) throws IOException {
        if (files.isEmpty()) {
            return null;
        }
        BlockInfo blockInfo = new BlockInfo();
        long totalCount = 0L;
        for (FileStatus file : files) {
            if (file.getLen() < (long)blockInfo.getInfoSize()) continue;
            FileSystem fs = file.getPath().getFileSystem();
            FSDataInputStream fdis = fs.open(file.getPath(), blockInfo.getInfoSize());
            try {
                fdis.seek(file.getLen() - (long)blockInfo.getInfoSize());
                blockInfo.read(new DataInputViewStreamWrapper(fdis));
                totalCount += blockInfo.getAccumulatedRecordCount();
            }
            finally {
                if (fdis == null) continue;
                fdis.close();
            }
        }
        float avgWidth = totalCount == 0L ? 0.0f : (float)stats.getTotalInputSize() / (float)totalCount;
        return new SequentialStatistics(stats.getLastModificationTime(), stats.getTotalInputSize(), avgWidth, totalCount);
    }

    @Override
    public void open(FileInputSplit split) throws IOException {
        super.open(split);
        this.blockInfo = this.createAndReadBlockInfo();
        this.readRecords = 0L;
        this.stream.seek(this.splitStart + this.blockInfo.getFirstRecordStart());
        this.blockBasedInput = new BlockBasedInput(this.stream, (int)this.blockInfo.getFirstRecordStart(), this.splitLength);
        this.dataInputStream = new DataInputViewStreamWrapper(this.blockBasedInput);
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return this.readRecords >= this.blockInfo.getRecordCount();
    }

    @Override
    public T nextRecord(T record) throws IOException {
        if (this.reachedEnd()) {
            return null;
        }
        record = this.deserialize(record, this.dataInputStream);
        ++this.readRecords;
        return record;
    }

    protected abstract T deserialize(T var1, DataInputView var2) throws IOException;

    @Override
    @PublicEvolving
    public Tuple2<Long, Long> getCurrentState() throws IOException {
        if (this.blockBasedInput == null) {
            throw new RuntimeException("You must have forgotten to call open() on your input format.");
        }
        return new Tuple2<Long, Long>(this.blockBasedInput.getCurrBlockPos(), this.readRecords);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @PublicEvolving
    public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
        Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
        Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
        try {
            this.open(split);
        }
        finally {
            this.blockInfo = this.createAndReadBlockInfo();
            long blockPos = (Long)state.f0;
            this.readRecords = (Long)state.f1;
            this.stream.seek(this.splitStart + blockPos);
            this.blockBasedInput = new BlockBasedInput(this.stream, (int)blockPos, this.splitLength);
            this.dataInputStream = new DataInputViewStreamWrapper(this.blockBasedInput);
        }
    }

    protected class BlockBasedInput
    extends FilterInputStream {
        private final int maxPayloadSize;
        private int blockPos;

        public BlockBasedInput(FSDataInputStream in, int blockSize) {
            super(in);
            this.blockPos = (int)BinaryInputFormat.this.blockInfo.getFirstRecordStart();
            this.maxPayloadSize = blockSize - BinaryInputFormat.this.blockInfo.getInfoSize();
        }

        public BlockBasedInput(FSDataInputStream in, int startPos, long length) {
            super(in);
            this.blockPos = startPos;
            this.maxPayloadSize = (int)(length - (long)BinaryInputFormat.this.blockInfo.getInfoSize());
        }

        @Override
        public int read() throws IOException {
            if (this.blockPos++ >= this.maxPayloadSize) {
                this.skipHeader();
            }
            return this.in.read();
        }

        private long getCurrBlockPos() {
            return this.blockPos;
        }

        private void skipHeader() throws IOException {
            byte[] dummy = new byte[BinaryInputFormat.this.blockInfo.getInfoSize()];
            this.in.read(dummy, 0, dummy.length);
            this.blockPos = 0;
        }

        @Override
        public int read(byte[] b) throws IOException {
            return this.read(b, 0, b.length);
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            int read;
            int totalRead = 0;
            int offset = off;
            for (int remainingLength = len; remainingLength > 0; remainingLength -= read) {
                int blockLen = Math.min(remainingLength, this.maxPayloadSize - this.blockPos);
                read = this.in.read(b, offset, blockLen);
                if (read < 0) {
                    return read;
                }
                totalRead += read;
                this.blockPos += read;
                offset += read;
                if (this.blockPos < this.maxPayloadSize) continue;
                this.skipHeader();
            }
            return totalRead;
        }
    }

    private static class SequentialStatistics
    extends FileInputFormat.FileBaseStatistics {
        private final long numberOfRecords;

        public SequentialStatistics(long fileModTime, long fileSize, float avgBytesPerRecord, long numberOfRecords) {
            super(fileModTime, fileSize, avgBytesPerRecord);
            this.numberOfRecords = numberOfRecords;
        }

        @Override
        public long getNumberOfRecords() {
            return this.numberOfRecords;
        }
    }
}

