/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.formats.parquet;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.CheckpointedPosition;
import org.apache.flink.connector.file.src.util.Pool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.ParquetInputFile;
import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
import org.apache.flink.formats.parquet.vector.ColumnBatchFactory;
import org.apache.flink.formats.parquet.vector.ParquetDecimalVector;
import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
import org.apache.flink.table.data.columnar.vector.ColumnVector;
import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ParquetVectorizedInputFormat<T, SplitT extends FileSourceSplit>
implements BulkFormat<T, SplitT> {
    private static final Logger LOG = LoggerFactory.getLogger(ParquetVectorizedInputFormat.class);
    private static final long serialVersionUID = 1L;
    protected final SerializableConfiguration hadoopConfig;
    private final String[] projectedFields;
    private final LogicalType[] projectedTypes;
    private final ColumnBatchFactory<SplitT> batchFactory;
    private final int batchSize;
    protected final boolean isUtcTimestamp;
    private final boolean isCaseSensitive;

    public ParquetVectorizedInputFormat(SerializableConfiguration hadoopConfig, RowType projectedType, ColumnBatchFactory<SplitT> batchFactory, int batchSize, boolean isUtcTimestamp, boolean isCaseSensitive) {
        this.hadoopConfig = hadoopConfig;
        this.projectedFields = projectedType.getFieldNames().toArray(new String[0]);
        this.projectedTypes = projectedType.getChildren().toArray(new LogicalType[0]);
        this.batchFactory = batchFactory;
        this.batchSize = batchSize;
        this.isUtcTimestamp = isUtcTimestamp;
        this.isCaseSensitive = isCaseSensitive;
    }

    public ParquetReader createReader(Configuration config, SplitT split) throws IOException {
        Path filePath = split.path();
        long splitOffset = split.offset();
        long splitLength = split.length();
        FileSystem fs = filePath.getFileSystem();
        ParquetInputFile inputFile = new ParquetInputFile(fs.open(filePath), fs.getFileStatus(filePath).getLen());
        FilterCompat.Filter filter = ParquetInputFormat.getFilter(this.hadoopConfig.conf());
        ParquetReadOptions parquetReadOptions = ParquetReadOptions.builder().withRange(splitOffset, splitOffset + splitLength).withRecordFilter(filter).build();
        ParquetFileReader parquetFileReader = ParquetFileReader.open(inputFile, parquetReadOptions);
        HashSet<Integer> unknownFieldsIndices = new HashSet<Integer>();
        MessageType fileSchema = parquetFileReader.getFooter().getFileMetaData().getSchema();
        MessageType requestedSchema = this.clipParquetSchema(fileSchema, unknownFieldsIndices);
        parquetFileReader.setRequestedSchema(requestedSchema);
        this.checkSchema(fileSchema, requestedSchema);
        long totalRowCount = parquetFileReader.getFilteredRecordCount();
        Pool<ParquetReaderBatch<T>> poolOfBatches = this.createPoolOfBatches(split, requestedSchema, this.numBatchesToCirculate(config));
        return new ParquetReader(parquetFileReader, requestedSchema, unknownFieldsIndices, totalRowCount, poolOfBatches);
    }

    protected int numBatchesToCirculate(Configuration config) {
        return config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY);
    }

    public ParquetReader restoreReader(Configuration config, SplitT split) throws IOException {
        assert (split.getReaderPosition().isPresent());
        CheckpointedPosition checkpointedPosition = (CheckpointedPosition)split.getReaderPosition().get();
        Preconditions.checkArgument((checkpointedPosition.getOffset() == -1L ? 1 : 0) != 0, (Object)"The offset of CheckpointedPosition should always be NO_OFFSET");
        BulkFormat.Reader reader = this.createReader(config, (FileSourceSplit)split);
        reader.seek(checkpointedPosition.getRecordsAfterOffset());
        return reader;
    }

    public boolean isSplittable() {
        return true;
    }

    private MessageType clipParquetSchema(GroupType parquetSchema, Collection<Integer> unknownFieldsIndices) {
        Type[] types = new Type[this.projectedFields.length];
        if (this.isCaseSensitive) {
            for (int i = 0; i < this.projectedFields.length; ++i) {
                String fieldName = this.projectedFields[i];
                if (!parquetSchema.containsField(fieldName)) {
                    LOG.warn("{} does not exist in {}, will fill the field with null.", (Object)fieldName, (Object)parquetSchema);
                    types[i] = ParquetSchemaConverter.convertToParquetType(fieldName, this.projectedTypes[i]);
                    unknownFieldsIndices.add(i);
                    continue;
                }
                types[i] = parquetSchema.getType(fieldName);
            }
        } else {
            HashMap<String, Type> caseInsensitiveFieldMap = new HashMap<String, Type>();
            for (Type type : parquetSchema.getFields()) {
                caseInsensitiveFieldMap.compute(type.getName().toLowerCase(Locale.ROOT), (key, previousType) -> {
                    if (previousType != null) {
                        throw new FlinkRuntimeException("Parquet with case insensitive mode should have no duplicate key: " + key);
                    }
                    return type;
                });
            }
            for (int i = 0; i < this.projectedFields.length; ++i) {
                Type type;
                type = (Type)caseInsensitiveFieldMap.get(this.projectedFields[i].toLowerCase(Locale.ROOT));
                if (type == null) {
                    LOG.warn("{} does not exist in {}, will fill the field with null.", (Object)this.projectedFields[i], (Object)parquetSchema);
                    type = ParquetSchemaConverter.convertToParquetType(this.projectedFields[i].toLowerCase(Locale.ROOT), this.projectedTypes[i]);
                    unknownFieldsIndices.add(i);
                }
                types[i] = type;
            }
        }
        return (MessageType)((Types.GroupBuilder)Types.buildMessage().addFields(types)).named("flink-parquet");
    }

    private void checkSchema(MessageType fileSchema, MessageType requestedSchema) throws IOException, UnsupportedOperationException {
        if (this.projectedFields.length != requestedSchema.getFieldCount()) {
            throw new RuntimeException("The quality of field type is incompatible with the request schema!");
        }
        for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
            Object[] colPath = requestedSchema.getPaths().get(i);
            if (fileSchema.containsPath((String[])colPath)) {
                ColumnDescriptor fd = fileSchema.getColumnDescription((String[])colPath);
                if (fd.equals(requestedSchema.getColumns().get(i))) continue;
                throw new UnsupportedOperationException("Schema evolution not supported.");
            }
            if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() != 0) continue;
            throw new IOException("Required column is missing in data file. Col: " + Arrays.toString(colPath));
        }
    }

    private Pool<ParquetReaderBatch<T>> createPoolOfBatches(SplitT split, MessageType requestedSchema, int numBatches) {
        Pool pool = new Pool(numBatches);
        for (int i = 0; i < numBatches; ++i) {
            pool.add(this.createReaderBatch(split, requestedSchema, pool.recycler()));
        }
        return pool;
    }

    private ParquetReaderBatch<T> createReaderBatch(SplitT split, MessageType requestedSchema, Pool.Recycler<ParquetReaderBatch<T>> recycler) {
        WritableColumnVector[] writableVectors = this.createWritableVectors(requestedSchema);
        VectorizedColumnBatch columnarBatch = this.batchFactory.create(split, this.createReadableVectors(writableVectors));
        return this.createReaderBatch(writableVectors, columnarBatch, recycler);
    }

    private WritableColumnVector[] createWritableVectors(MessageType requestedSchema) {
        WritableColumnVector[] columns = new WritableColumnVector[this.projectedTypes.length];
        List<Type> types = requestedSchema.getFields();
        for (int i = 0; i < this.projectedTypes.length; ++i) {
            columns[i] = ParquetSplitReaderUtil.createWritableColumnVector(this.batchSize, this.projectedTypes[i], types.get(i), requestedSchema.getColumns(), 0);
        }
        return columns;
    }

    private ColumnVector[] createReadableVectors(WritableColumnVector[] writableVectors) {
        ColumnVector[] vectors = new ColumnVector[writableVectors.length];
        for (int i = 0; i < writableVectors.length; ++i) {
            vectors[i] = this.projectedTypes[i].getTypeRoot() == LogicalTypeRoot.DECIMAL ? new ParquetDecimalVector((ColumnVector)writableVectors[i]) : writableVectors[i];
        }
        return vectors;
    }

    protected abstract ParquetReaderBatch<T> createReaderBatch(WritableColumnVector[] var1, VectorizedColumnBatch var2, Pool.Recycler<ParquetReaderBatch<T>> var3);

    protected static abstract class ParquetReaderBatch<T> {
        private final WritableColumnVector[] writableVectors;
        protected final VectorizedColumnBatch columnarBatch;
        private final Pool.Recycler<ParquetReaderBatch<T>> recycler;

        protected ParquetReaderBatch(WritableColumnVector[] writableVectors, VectorizedColumnBatch columnarBatch, Pool.Recycler<ParquetReaderBatch<T>> recycler) {
            this.writableVectors = writableVectors;
            this.columnarBatch = columnarBatch;
            this.recycler = recycler;
        }

        public void recycle() {
            this.recycler.recycle((Object)this);
        }

        public abstract BulkFormat.RecordIterator<T> convertAndGetIterator(long var1) throws IOException;
    }

    private class ParquetReader
    implements BulkFormat.Reader<T> {
        private ParquetFileReader reader;
        private final MessageType requestedSchema;
        private final Set<Integer> unknownFieldsIndices;
        private final long totalRowCount;
        private final Pool<ParquetReaderBatch<T>> pool;
        private long rowsReturned;
        private long totalCountLoadedSoFar;
        private ColumnReader[] columnReaders;
        private long recordsToSkip;

        private ParquetReader(ParquetFileReader reader, MessageType requestedSchema, Set<Integer> unknownFieldsIndices, long totalRowCount, Pool<ParquetReaderBatch<T>> pool) {
            this.reader = reader;
            this.requestedSchema = requestedSchema;
            this.unknownFieldsIndices = unknownFieldsIndices;
            this.totalRowCount = totalRowCount;
            this.pool = pool;
            this.rowsReturned = 0L;
            this.totalCountLoadedSoFar = 0L;
            this.recordsToSkip = 0L;
        }

        @Nullable
        public BulkFormat.RecordIterator<T> readBatch() throws IOException {
            ParquetReaderBatch batch = this.getCachedEntry();
            long rowsReturnedBefore = this.rowsReturned;
            if (!this.nextBatch(batch)) {
                batch.recycle();
                return null;
            }
            BulkFormat.RecordIterator records = batch.convertAndGetIterator(rowsReturnedBefore);
            this.skipRecord(records);
            return records;
        }

        private boolean nextBatch(ParquetReaderBatch<T> batch) throws IOException {
            for (WritableColumnVector v : batch.writableVectors) {
                v.reset();
            }
            batch.columnarBatch.setNumRows(0);
            if (this.rowsReturned >= this.totalRowCount) {
                return false;
            }
            if (this.rowsReturned == this.totalCountLoadedSoFar) {
                this.readNextRowGroup();
            }
            int num = (int)Math.min((long)ParquetVectorizedInputFormat.this.batchSize, this.totalCountLoadedSoFar - this.rowsReturned);
            for (int i = 0; i < this.columnReaders.length; ++i) {
                if (this.columnReaders[i] == null) {
                    batch.writableVectors[i].fillWithNulls();
                    continue;
                }
                this.columnReaders[i].readToVector(num, batch.writableVectors[i]);
            }
            this.rowsReturned += (long)num;
            batch.columnarBatch.setNumRows(num);
            return true;
        }

        private void readNextRowGroup() throws IOException {
            PageReadStore pages = this.reader.readNextRowGroup();
            if (pages == null) {
                throw new IOException("expecting more rows but reached last block. Read " + this.rowsReturned + " out of " + this.totalRowCount);
            }
            List<Type> types = this.requestedSchema.getFields();
            this.columnReaders = new ColumnReader[types.size()];
            for (int i = 0; i < types.size(); ++i) {
                if (this.unknownFieldsIndices.contains(i)) continue;
                this.columnReaders[i] = ParquetSplitReaderUtil.createColumnReader(ParquetVectorizedInputFormat.this.isUtcTimestamp, ParquetVectorizedInputFormat.this.projectedTypes[i], types.get(i), this.requestedSchema.getColumns(), pages, 0);
            }
            this.totalCountLoadedSoFar += pages.getRowCount();
        }

        public void seek(long rowCount) {
            if (this.totalCountLoadedSoFar != 0L) {
                throw new UnsupportedOperationException("Only support seek at first.");
            }
            List<BlockMetaData> blockMetaData = this.reader.getRowGroups();
            for (BlockMetaData metaData : blockMetaData) {
                if (metaData.getRowCount() > rowCount) break;
                this.reader.skipNextRowGroup();
                this.rowsReturned += metaData.getRowCount();
                this.totalCountLoadedSoFar += metaData.getRowCount();
                rowCount -= metaData.getRowCount();
            }
            this.recordsToSkip = rowCount;
        }

        private ParquetReaderBatch<T> getCachedEntry() throws IOException {
            try {
                return (ParquetReaderBatch)this.pool.pollEntry();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted");
            }
        }

        private void skipRecord(BulkFormat.RecordIterator<T> records) {
            while (this.recordsToSkip > 0L && records.next() != null) {
                --this.recordsToSkip;
            }
        }

        public void close() throws IOException {
            if (this.reader != null) {
                this.reader.close();
                this.reader = null;
            }
        }
    }
}

