package com.aliyun.datahub.client.impl.batch.arrow;

import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.impl.batch.BatchConstants;
import com.aliyun.datahub.client.impl.batch.BatchSerializer;
import com.aliyun.datahub.client.impl.batch.BatchType;
import com.aliyun.datahub.client.impl.batch.BatchUtils;
import com.aliyun.datahub.client.impl.batch.arrow.ArrowObjectCache;
import com.aliyun.datahub.client.impl.batch.header.BatchHeader;
import com.aliyun.datahub.client.impl.batch.header.BatchHeaderV1;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.client.model.protobuf.DatahubProtos;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.complex.impl.UnionMapWriter;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/datahub/client/impl/batch/arrow/ArrowSerializer.class */
public class ArrowSerializer extends BatchSerializer {
    private static final Logger LOGGER = LoggerFactory.getLogger(ArrowSerializer.class);
    private ArrowBuf attrBuffer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.aliyun.datahub.client.impl.batch.arrow.ArrowSerializer$1, reason: invalid class name */
    /* loaded from: input_file:com/aliyun/datahub/client/impl/batch/arrow/ArrowSerializer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$aliyun$datahub$client$model$FieldType = new int[FieldType.values().length];

        static {
            try {
                $SwitchMap$com$aliyun$datahub$client$model$FieldType[FieldType.BOOLEAN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$aliyun$datahub$client$model$FieldType[FieldType.TINYINT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$aliyun$datahub$client$model$FieldType[FieldType.SMALLINT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$aliyun$datahub$client$model$FieldType[FieldType.INTEGER.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$aliyun$datahub$client$model$FieldType[FieldType.BIGINT.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$aliyun$datahub$client$model$FieldType[FieldType.TIMESTAMP.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$aliyun$datahub$client$model$FieldType[FieldType.FLOAT.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$aliyun$datahub$client$model$FieldType[FieldType.DOUBLE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$aliyun$datahub$client$model$FieldType[FieldType.STRING.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$com$aliyun$datahub$client$model$FieldType[FieldType.JSON.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$com$aliyun$datahub$client$model$FieldType[FieldType.DECIMAL.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
        }
    }

    @Override // com.aliyun.datahub.client.impl.batch.BatchSerializer
    public byte[] serializeRecord(List<RecordEntry> list) {
        try {
            ArrowObjectCache.ArrowCacheKey arrowCacheKey = new ArrowObjectCache.ArrowCacheKey(getProjectName(), getTopicName(), getRecordSchema(list.get(0)));
            VectorSchemaRoot borrowArrowObject = ArrowObjectCache.borrowArrowObject(arrowCacheKey);
            convertRecord(list, borrowArrowObject);
            byte[] serializeByChannel = serializeByChannel(borrowArrowObject);
            this.attrBuffer.close();
            ArrowObjectCache.returnArrowObject(arrowCacheKey, borrowArrowObject);
            return serializeByChannel;
        } catch (Exception e) {
            LOGGER.error("Serialize arrow record failed", e);
            throw new DatahubClientException(e.getMessage());
        }
    }

    @Override // com.aliyun.datahub.client.impl.batch.BatchSerializer
    protected BatchHeader getHeader() {
        BatchHeaderV1 batchHeaderV1 = new BatchHeaderV1();
        batchHeaderV1.setDataType(BatchType.ARROW);
        return batchHeaderV1;
    }

    public void convertRecord(List<RecordEntry> list, VectorSchemaRoot vectorSchemaRoot) throws Exception {
        if (list.get(0).getRecordData() instanceof TupleRecordData) {
            convertTupleData(list, vectorSchemaRoot);
        } else {
            convertBlobData(list, vectorSchemaRoot);
        }
        convertAttribute(list, vectorSchemaRoot);
        vectorSchemaRoot.setRowCount(list.size());
    }

    private void convertTupleData(List<RecordEntry> list, VectorSchemaRoot vectorSchemaRoot) {
        for (int i = 0; i < list.size(); i++) {
            setColumnValue((TupleRecordData) list.get(i).getRecordData(), vectorSchemaRoot, i);
        }
    }

    private void setColumnValue(TupleRecordData tupleRecordData, VectorSchemaRoot vectorSchemaRoot, int i) {
        for (int i2 = 0; i2 < tupleRecordData.getRecordSchema().getFields().size(); i2++) {
            Object field = tupleRecordData.getField(i2);
            if (field == null) {
                vectorSchemaRoot.getVector(i2).setNull(i);
            } else {
                FieldType type = tupleRecordData.getRecordSchema().getField(i2).getType();
                switch (AnonymousClass1.$SwitchMap$com$aliyun$datahub$client$model$FieldType[type.ordinal()]) {
                    case 1:
                        vectorSchemaRoot.getVector(i2).setSafe(i, ((Boolean) field).booleanValue() ? 1 : 0);
                        break;
                    case 2:
                        vectorSchemaRoot.getVector(i2).setSafe(i, BatchUtils.getByteValue(field));
                        break;
                    case 3:
                        vectorSchemaRoot.getVector(i2).setSafe(i, BatchUtils.getShortValue(field));
                        break;
                    case 4:
                        vectorSchemaRoot.getVector(i2).setSafe(i, BatchUtils.getIntValue(field));
                        break;
                    case 5:
                    case 6:
                        vectorSchemaRoot.getVector(i2).setSafe(i, BatchUtils.getLongValue(field));
                        break;
                    case DatahubProtos.RecordEntry.SYSTEM_TIME_FIELD_NUMBER /* 7 */:
                        vectorSchemaRoot.getVector(i2).setSafe(i, BatchUtils.getFloatValue(field));
                        break;
                    case DatahubProtos.RecordEntry.ATTRIBUTES_FIELD_NUMBER /* 8 */:
                        vectorSchemaRoot.getVector(i2).setSafe(i, BatchUtils.getDoubleValue(field));
                        break;
                    case DatahubProtos.RecordEntry.DATA_FIELD_NUMBER /* 9 */:
                    case 10:
                        vectorSchemaRoot.getVector(i2).setSafe(i, field.toString().getBytes(StandardCharsets.UTF_8));
                        break;
                    case 11:
                        vectorSchemaRoot.getVector(i2).setSafe(i, ((BigDecimal) field).toPlainString().getBytes(StandardCharsets.UTF_8));
                        break;
                    default:
                        throw new InvalidParameterException("Unknown value type: " + type);
                }
            }
        }
    }

    private void convertBlobData(List<RecordEntry> list, VectorSchemaRoot vectorSchemaRoot) {
        VarBinaryVector vector = vectorSchemaRoot.getVector(BatchConstants.BLOB_COLUMN_NAME);
        for (int i = 0; i < list.size(); i++) {
            vector.setSafe(i, ((BlobRecordData) list.get(i).getRecordData()).getData());
        }
    }

    private ArrowBuf preAllocateAttributeBuffer(List<RecordEntry> list) {
        long j = 0;
        Iterator<RecordEntry> it = list.iterator();
        while (it.hasNext()) {
            Map<String, String> attributes = it.next().getAttributes();
            if (attributes != null && !attributes.isEmpty()) {
                for (Map.Entry<String, String> entry : attributes.entrySet()) {
                    j += entry.getKey().length() + entry.getValue().length();
                }
            }
        }
        return ArrowUtils.getBufferAllocator().buffer(j);
    }

    private void convertAttribute(List<RecordEntry> list, VectorSchemaRoot vectorSchemaRoot) {
        this.attrBuffer = preAllocateAttributeBuffer(list);
        UnionMapWriter writer = vectorSchemaRoot.getVector(BatchConstants.ATTRIBUTE_COLUMN_NAME).getWriter();
        for (RecordEntry recordEntry : list) {
            writer.startMap();
            setAttribute(recordEntry.getAttributes(), writer, this.attrBuffer);
            writer.endMap();
        }
    }

    private void setAttribute(Map<String, String> map, UnionMapWriter unionMapWriter, ArrowBuf arrowBuf) {
        if (map == null || map.isEmpty()) {
            return;
        }
        for (Map.Entry<String, String> entry : map.entrySet()) {
            unionMapWriter.startEntry();
            byte[] bytes = entry.getKey().getBytes(StandardCharsets.UTF_8);
            int writerIndex = (int) arrowBuf.writerIndex();
            arrowBuf.writeBytes(bytes);
            unionMapWriter.key().varChar().writeVarChar(writerIndex, writerIndex + bytes.length, arrowBuf);
            byte[] bytes2 = entry.getValue().getBytes(StandardCharsets.UTF_8);
            int writerIndex2 = (int) arrowBuf.writerIndex();
            arrowBuf.writeBytes(bytes2);
            unionMapWriter.value().varChar().writeVarChar(writerIndex2, writerIndex2 + bytes2.length, arrowBuf);
            unionMapWriter.endEntry();
        }
    }

    private byte[] serializeByChannel(VectorSchemaRoot vectorSchemaRoot) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(1048576);
        WritableByteChannel newChannel = Channels.newChannel(byteArrayOutputStream);
        ArrowRecordBatch recordBatch = new VectorUnloader(vectorSchemaRoot).getRecordBatch();
        Throwable th = null;
        try {
            WriteChannel writeChannel = new WriteChannel(newChannel);
            Throwable th2 = null;
            try {
                MessageSerializer.serialize(writeChannel, recordBatch);
                byte[] byteArray = byteArrayOutputStream.toByteArray();
                if (writeChannel != null) {
                    if (0 != 0) {
                        try {
                            writeChannel.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        writeChannel.close();
                    }
                }
                return byteArray;
            } catch (Throwable th4) {
                if (writeChannel != null) {
                    if (0 != 0) {
                        try {
                            writeChannel.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        writeChannel.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (recordBatch != null) {
                if (0 != 0) {
                    try {
                        recordBatch.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    recordBatch.close();
                }
            }
        }
    }
}
