/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.format.compatible.kafka.connect.json;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import lombok.NonNull;
import org.apache.commons.collections4.MapUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.format.json.JsonToRowConverters;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;

public class NativeKafkaConnectDeserializationSchema
implements DeserializationSchema<SeaTunnelRow> {
    private static final String INCLUDE_SCHEMA_METHOD = "convertToJsonWithEnvelope";
    private static final String EXCLUDE_SCHEMA_METHOD = "convertToJsonWithoutEnvelope";
    private static final String KAFKA_CONNECT_SINK_RECORD_PAYLOAD = "payload";
    public static final String FORMAT = "Kafka.Connect";
    private transient JsonConverter keyConverter;
    private transient JsonConverter valueConverter;
    private transient Method keyConverterMethod;
    private transient Method valueConverterMethod;
    private final SeaTunnelRowType seaTunnelRowType;
    private final JsonToRowConverters.JsonToObjectConverter runtimeConverter;
    private final boolean keySchemaEnable;
    private final boolean valueSchemaEnable;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final CatalogTable catalogTable;

    public NativeKafkaConnectDeserializationSchema(@NonNull CatalogTable catalogTable, boolean keySchemaEnable, boolean valueSchemaEnable, boolean failOnMissingField, boolean ignoreParseErrors) {
        if (catalogTable == null) {
            throw new NullPointerException("catalogTable is marked non-null but is null");
        }
        this.catalogTable = catalogTable;
        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
        this.keySchemaEnable = keySchemaEnable;
        this.valueSchemaEnable = valueSchemaEnable;
        this.runtimeConverter = new JsonToRowConverters(failOnMissingField, ignoreParseErrors).createRowConverter((SeaTunnelRowType)Preconditions.checkNotNull((Object)this.seaTunnelRowType));
    }

    public SeaTunnelRow deserialize(byte[] message) throws IOException {
        throw new UnsupportedOperationException("Please invoke DeserializationSchema#deserialize(byte[], Collector<SeaTunnelRow>) instead.");
    }

    public void deserialize(ConsumerRecord<byte[], byte[]> msg, Collector<SeaTunnelRow> out) {
        this.tryInitConverter();
        if (msg == null) {
            return;
        }
        Map record = this.convertToSinkRecord(msg);
        RowKind rowKind = RowKind.INSERT;
        Optional<TablePath> tablePath = Optional.ofNullable(this.catalogTable).map(CatalogTable::getTablePath);
        SeaTunnelRow row = this.convertJsonNode(record);
        row.setRowKind(rowKind);
        if (tablePath.isPresent()) {
            row.setTableId(tablePath.toString());
        }
        out.collect((Object)row);
    }

    private SeaTunnelRow convertJsonNode(Map<String, Object> record) {
        if (MapUtils.isEmpty(record)) {
            return null;
        }
        try {
            JsonNode jsonData = JsonUtils.toJsonNode(record);
            return (SeaTunnelRow)this.runtimeConverter.convert(jsonData, null);
        }
        catch (Throwable t) {
            throw CommonError.jsonOperationError((String)FORMAT, (String)record.toString(), (Throwable)t);
        }
    }

    private Map convertToSinkRecord(ConsumerRecord<byte[], byte[]> msg) {
        HashMap<String, String> headersMap = new HashMap<String, String>();
        for (Header header : msg.headers()) {
            String key = header.key();
            String value = new String(header.value());
            headersMap.put(key, value);
        }
        HashMap<String, Object> map = new HashMap<String, Object>();
        map.put("partition", msg.partition());
        map.put("offset", msg.offset());
        map.put("key", msg.key());
        map.put("value", msg.value());
        map.put("timestamp", msg.timestamp());
        map.put("timestampType", msg.timestampType().toString());
        map.put("headers", headersMap);
        return map;
    }

    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return this.seaTunnelRowType;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryInitConverter() {
        NativeKafkaConnectDeserializationSchema nativeKafkaConnectDeserializationSchema;
        if (this.keyConverter == null) {
            nativeKafkaConnectDeserializationSchema = this;
            synchronized (nativeKafkaConnectDeserializationSchema) {
                if (this.keyConverter == null) {
                    this.keyConverter = new JsonConverter();
                    this.keyConverter.configure(Collections.singletonMap("schemas.enable", this.keySchemaEnable), true);
                    this.keyConverterMethod = (Method)ReflectionUtils.getDeclaredMethod(JsonConverter.class, (String)(this.keySchemaEnable ? INCLUDE_SCHEMA_METHOD : EXCLUDE_SCHEMA_METHOD), (Class[])new Class[]{Schema.class, Object.class}).get();
                }
            }
        }
        if (this.valueConverter == null) {
            nativeKafkaConnectDeserializationSchema = this;
            synchronized (nativeKafkaConnectDeserializationSchema) {
                if (this.valueConverter == null) {
                    this.valueConverter = new JsonConverter();
                    this.valueConverter.configure(Collections.singletonMap("schemas.enable", this.valueSchemaEnable), false);
                    this.valueConverterMethod = (Method)ReflectionUtils.getDeclaredMethod(JsonConverter.class, (String)(this.valueSchemaEnable ? INCLUDE_SCHEMA_METHOD : EXCLUDE_SCHEMA_METHOD), (Class[])new Class[]{Schema.class, Object.class}).get();
                }
            }
        }
    }

    public NativeKafkaConnectDeserializationSchema(SeaTunnelRowType seaTunnelRowType, JsonToRowConverters.JsonToObjectConverter runtimeConverter, boolean keySchemaEnable, boolean valueSchemaEnable, CatalogTable catalogTable) {
        this.seaTunnelRowType = seaTunnelRowType;
        this.runtimeConverter = runtimeConverter;
        this.keySchemaEnable = keySchemaEnable;
        this.valueSchemaEnable = valueSchemaEnable;
        this.catalogTable = catalogTable;
    }
}

