/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.reader;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.function.Function;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.Converter;
import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.DefaultConverter;
import org.apache.seatunnel.shade.org.apache.arrow.memory.BufferAllocator;
import org.apache.seatunnel.shade.org.apache.arrow.memory.RootAllocator;
import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector;
import org.apache.seatunnel.shade.org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.seatunnel.shade.org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types;
import org.apache.seatunnel.shade.org.apache.arrow.vector.util.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ArrowToSeatunnelRowReader
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(ArrowToSeatunnelRowReader.class);
    private final SeaTunnelDataType<?>[] seaTunnelDataTypes;
    private int offsetInRowBatch = 0;
    private int rowCountInOneBatch = 0;
    private int readRowCount = 0;
    private List<FieldVector> fieldVectors;
    private VectorSchemaRoot root;
    private ArrowStreamReader arrowStreamReader;
    private RootAllocator rootAllocator;
    private final Map<String, Integer> fieldIndexMap = new HashMap<String, Integer>();
    private final List<SeaTunnelRow> seatunnelRowBatch = new ArrayList<SeaTunnelRow>();
    private static final List<Converter> converters = new ArrayList<Converter>();
    private final DefaultConverter defaultConverter = new DefaultConverter();
    private final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    private final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss");
    private final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public ArrowToSeatunnelRowReader(byte[] byteArray, SeaTunnelRowType seaTunnelRowType) {
        this.seaTunnelDataTypes = seaTunnelRowType.getFieldTypes();
        this.initFieldIndexMap(seaTunnelRowType);
        this.initArrowReader(byteArray);
    }

    private void initFieldIndexMap(SeaTunnelRowType seaTunnelRowType) {
        for (int i = 0; i < seaTunnelRowType.getFieldNames().length; ++i) {
            this.fieldIndexMap.put(seaTunnelRowType.getFieldNames()[i], i);
        }
    }

    private void initArrowReader(byte[] byteArray) {
        this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
        this.arrowStreamReader = new ArrowStreamReader((InputStream)new ByteArrayInputStream(byteArray), (BufferAllocator)this.rootAllocator);
    }

    public ArrowToSeatunnelRowReader readArrow() {
        try {
            this.root = this.arrowStreamReader.getVectorSchemaRoot();
            while (this.arrowStreamReader.loadNextBatch()) {
                this.fieldVectors = this.root.getFieldVectors();
                if (this.fieldVectors.isEmpty() || this.root.getRowCount() == 0) {
                    log.debug("one batch in arrow has no data.");
                    continue;
                }
                log.info("one batch in arrow row count size '{}'", (Object)this.root.getRowCount());
                this.rowCountInOneBatch = this.root.getRowCount();
                for (int i = 0; i < this.rowCountInOneBatch; ++i) {
                    this.seatunnelRowBatch.add(new SeaTunnelRow(this.seaTunnelDataTypes.length));
                }
                this.convertSeatunnelRow();
                this.readRowCount += this.root.getRowCount();
            }
            ArrowToSeatunnelRowReader i = this;
            return i;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        finally {
            this.close();
        }
    }

    public boolean hasNext() {
        return this.offsetInRowBatch < this.readRowCount;
    }

    public SeaTunnelRow next() {
        if (!this.hasNext()) {
            throw new IllegalStateException("no more rows to read.");
        }
        return this.seatunnelRowBatch.get(this.offsetInRowBatch++);
    }

    private void convertSeatunnelRow() {
        for (FieldVector fieldVector : this.fieldVectors) {
            String name = fieldVector.getField().getName();
            Integer fieldIndex = this.fieldIndexMap.get(name);
            Types.MinorType minorType = fieldVector.getMinorType();
            for (int i = 0; i < this.seatunnelRowBatch.size(); ++i) {
                if (fieldIndex == null) continue;
                SeaTunnelDataType<?> seaTunnelDataType = this.seaTunnelDataTypes[fieldIndex];
                Object fieldValue = this.convertArrowData(this.readRowCount + i, minorType, fieldVector, seaTunnelDataType);
                fieldValue = this.convertSeatunnelRowValue(seaTunnelDataType.getSqlType(), minorType, fieldValue);
                this.seatunnelRowBatch.get(this.readRowCount + i).setField(fieldIndex.intValue(), fieldValue);
            }
        }
    }

    public int getReadRowCount() {
        return this.readRowCount;
    }

    private Object convertSeatunnelRowValue(SqlType currentType, Types.MinorType minorType, Object fieldValue) {
        switch (currentType) {
            case STRING: {
                if (fieldValue instanceof byte[]) {
                    return new String((byte[])fieldValue);
                }
                if (fieldValue instanceof Text) {
                    return ((Text)fieldValue).toString();
                }
                return fieldValue;
            }
            case DECIMAL: {
                if (fieldValue instanceof String) {
                    return new BigDecimal((String)fieldValue);
                }
                if (fieldValue instanceof Text) {
                    return new BigDecimal(((Text)fieldValue).toString());
                }
                return fieldValue;
            }
            case DATE: {
                if (fieldValue instanceof Integer) {
                    return LocalDate.ofEpochDay(((Integer)fieldValue).intValue());
                }
                if (fieldValue instanceof Long) {
                    return LocalDate.ofEpochDay((Long)fieldValue);
                }
                if (fieldValue instanceof String) {
                    return LocalDate.parse((String)fieldValue, this.DATE_FORMATTER);
                }
                if (fieldValue instanceof Text) {
                    return LocalDate.parse(((Text)fieldValue).toString(), this.DATE_FORMATTER);
                }
                if (fieldValue instanceof LocalDateTime) {
                    return ((LocalDateTime)fieldValue).toLocalDate();
                }
                return fieldValue;
            }
            case TIME: {
                if (fieldValue instanceof Integer) {
                    return LocalTime.ofSecondOfDay(((Integer)fieldValue).intValue());
                }
                if (fieldValue instanceof Long) {
                    return Instant.ofEpochMilli((Long)fieldValue).atZone(ZoneId.systemDefault()).toLocalDateTime().toLocalTime();
                }
                if (fieldValue instanceof String) {
                    return LocalTime.parse((String)fieldValue, this.TIME_FORMATTER);
                }
                if (fieldValue instanceof Text) {
                    return LocalTime.parse(((Text)fieldValue).toString(), this.TIME_FORMATTER);
                }
                return fieldValue;
            }
            case TIMESTAMP: {
                if (fieldValue instanceof Long) {
                    if (Types.MinorType.TIMESTAMPSEC == minorType || Types.MinorType.TIMESTAMPSECTZ == minorType) {
                        return Instant.ofEpochSecond((Long)fieldValue).atZone(ZoneId.systemDefault()).toLocalDateTime();
                    }
                    return Instant.ofEpochMilli((Long)fieldValue).atZone(ZoneId.systemDefault()).toLocalDateTime();
                }
                if (fieldValue instanceof String) {
                    return LocalDateTime.parse((String)fieldValue, this.DATETIME_FORMATTER);
                }
                if (fieldValue instanceof Text) {
                    return LocalDateTime.parse(((Text)fieldValue).toString(), this.DATETIME_FORMATTER);
                }
                return fieldValue;
            }
        }
        return fieldValue;
    }

    private Object convertArrowData(int rowIndex, Types.MinorType minorType, FieldVector fieldVector, SeaTunnelDataType<?> seaTunnelDataType) {
        if (seaTunnelDataType == null) {
            throw new IllegalArgumentException("seaTunnelDataType cannot be null");
        }
        for (Converter converter : converters) {
            if (!converter.support(minorType)) continue;
            SqlType sqlType = seaTunnelDataType.getSqlType();
            switch (sqlType) {
                case MAP: {
                    return this.convertMap(rowIndex, converter, fieldVector, (MapType)seaTunnelDataType);
                }
                case ARRAY: {
                    return this.convertArray(rowIndex, converter, fieldVector, (ArrayType)seaTunnelDataType);
                }
                case ROW: {
                    return this.convertRow(rowIndex, converter, fieldVector, (SeaTunnelRowType)seaTunnelDataType);
                }
            }
            return converter.convert(rowIndex, fieldVector);
        }
        return this.defaultConverter.convert(rowIndex, fieldVector);
    }

    private Object convertMap(int rowIndex, Converter converter, FieldVector fieldVector, MapType mapType) {
        SqlType keyType = mapType.getKeyType().getSqlType();
        SqlType valueType = mapType.getValueType().getSqlType();
        HashMap<String, Function> fieldConverters = new HashMap<String, Function>();
        fieldConverters.put("KEY", this.genericsConvert(keyType));
        fieldConverters.put("VALUE", this.genericsConvert(valueType));
        return converter.convert(rowIndex, fieldVector, fieldConverters);
    }

    private Object convertArray(int rowIndex, Converter converter, FieldVector fieldVector, ArrayType arrayType) {
        SqlType elementType = arrayType.getElementType().getSqlType();
        HashMap<String, Function> fieldConverters = new HashMap<String, Function>();
        fieldConverters.put("ARRAY", this.genericsConvert(elementType));
        return converter.convert(rowIndex, fieldVector, fieldConverters);
    }

    private Object convertRow(int rowIndex, Converter converter, FieldVector fieldVector, SeaTunnelRowType rowType) {
        String[] fieldNames = rowType.getFieldNames();
        List fieldTypes = rowType.getChildren();
        HashMap<String, Function> fieldConverters = new HashMap<String, Function>();
        for (int i = 0; i < fieldTypes.size(); ++i) {
            fieldConverters.put(fieldNames[i], this.genericsConvert(((SeaTunnelDataType)fieldTypes.get(i)).getSqlType()));
        }
        return converter.convert(rowIndex, fieldVector, fieldConverters);
    }

    private Function<Object, Object> genericsConvert(SqlType sqlType) {
        return value -> this.convertSeatunnelRowValue(sqlType, null, value);
    }

    @Override
    public void close() {
        try {
            if (this.root != null) {
                this.root.close();
            }
            if (this.rootAllocator != null) {
                this.rootAllocator.close();
            }
            if (this.arrowStreamReader != null) {
                this.arrowStreamReader.close();
            }
        }
        catch (IOException e) {
            throw new RuntimeException("failed to close arrow stream reader.", e);
        }
    }

    static {
        ServiceLoader.load(Converter.class).forEach(converters::add);
    }
}

