/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.table;

import com.esri.core.geometry.ogc.OGCGeometry;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Optional;
import org.apache.flink.cdc.connectors.shaded.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.cdc.connectors.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.cdc.connectors.shaded.com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.apache.flink.cdc.debezium.table.DeserializationRuntimeConverter;
import org.apache.flink.cdc.debezium.table.DeserializationRuntimeConverterFactory;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;

public class MySqlDeserializationConverterFactory {
    public static DeserializationRuntimeConverterFactory instance() {
        return new DeserializationRuntimeConverterFactory(){
            private static final long serialVersionUID = 1L;

            @Override
            public Optional<DeserializationRuntimeConverter> createUserDefinedConverter(LogicalType logicalType, ZoneId serverTimeZone) {
                switch (logicalType.getTypeRoot()) {
                    case CHAR: 
                    case VARCHAR: {
                        return MySqlDeserializationConverterFactory.createStringConverter();
                    }
                    case ARRAY: {
                        return MySqlDeserializationConverterFactory.createArrayConverter((ArrayType)logicalType);
                    }
                }
                return Optional.empty();
            }
        };
    }

    private static Optional<DeserializationRuntimeConverter> createStringConverter() {
        final ObjectMapper objectMapper = new ObjectMapper();
        final ObjectWriter objectWriter = objectMapper.writer();
        return Optional.of(new DeserializationRuntimeConverter(){
            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) throws Exception {
                if ("io.debezium.data.geometry.Point".equals(schema.name()) || "io.debezium.data.geometry.Geometry".equals(schema.name())) {
                    try {
                        Struct geometryStruct = (Struct)dbzObj;
                        byte[] wkb = geometryStruct.getBytes("wkb");
                        String geoJson = OGCGeometry.fromBinary(ByteBuffer.wrap(wkb)).asGeoJson();
                        JsonNode originGeoNode = objectMapper.readTree(geoJson);
                        Optional<Integer> srid = Optional.ofNullable(geometryStruct.getInt32("srid"));
                        HashMap<String, Object> geometryInfo = new HashMap<String, Object>();
                        String geometryType = originGeoNode.get("type").asText();
                        geometryInfo.put("type", geometryType);
                        if (geometryType.equals("GeometryCollection")) {
                            geometryInfo.put("geometries", originGeoNode.get("geometries"));
                        } else {
                            geometryInfo.put("coordinates", originGeoNode.get("coordinates"));
                        }
                        geometryInfo.put("srid", srid.orElse(0));
                        return StringData.fromString((String)objectWriter.writeValueAsString(geometryInfo));
                    }
                    catch (Exception e) {
                        throw new IllegalArgumentException(String.format("Failed to convert %s to geometry JSON.", dbzObj), e);
                    }
                }
                return StringData.fromString((String)dbzObj.toString());
            }
        });
    }

    private static Optional<DeserializationRuntimeConverter> createArrayConverter(ArrayType arrayType) {
        if (MySqlDeserializationConverterFactory.hasFamily(arrayType.getElementType(), LogicalTypeFamily.CHARACTER_STRING)) {
            return Optional.of(new DeserializationRuntimeConverter(){
                private static final long serialVersionUID = 1L;

                @Override
                public Object convert(Object dbzObj, Schema schema) throws Exception {
                    if ("io.debezium.data.EnumSet".equals(schema.name()) && dbzObj instanceof String) {
                        String[] enums = ((String)dbzObj).split(",");
                        Object[] elements = new StringData[enums.length];
                        for (int i = 0; i < enums.length; ++i) {
                            elements[i] = StringData.fromString((String)enums[i]);
                        }
                        return new GenericArrayData(elements);
                    }
                    throw new IllegalArgumentException(String.format("Unable convert to Flink ARRAY type from unexpected value '%s', only SET type could be converted to ARRAY type for MySQL", dbzObj));
                }
            });
        }
        return Optional.empty();
    }

    private static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily family) {
        return logicalType.getTypeRoot().getFamilies().contains(family);
    }
}

