/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.debezium.row;

import io.debezium.data.Envelope;
import io.debezium.relational.TableId;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventHandler;
import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.cdc.debezium.AbstractDebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;
import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SeaTunnelRowDebeziumDeserializeSchema
extends AbstractDebeziumDeserializationSchema<SeaTunnelRow> {
    private static final Logger log = LoggerFactory.getLogger(SeaTunnelRowDebeziumDeserializeSchema.class);
    private static final long serialVersionUID = 1L;
    private static final String DEFAULT_TABLE_NAME_KEY = null;
    private final MetadataConverter[] metadataConverters;
    private final ZoneId serverTimeZone;
    private final DebeziumDeserializationConverterFactory userDefinedConverterFactory;
    private final SchemaChangeResolver schemaChangeResolver;
    private final TableSchemaChangeEventHandler tableSchemaChangeHandler;
    private List<CatalogTable> tables;
    private Map<String, SeaTunnelRowDebeziumDeserializationConverters> tableRowConverters;

    SeaTunnelRowDebeziumDeserializeSchema(MetadataConverter[] metadataConverters, List<CatalogTable> tables, ZoneId serverTimeZone, DebeziumDeserializationConverterFactory userDefinedConverterFactory, SchemaChangeResolver schemaChangeResolver, Map<TableId, Struct> tableIdTableChangeMap) {
        super(tableIdTableChangeMap);
        this.metadataConverters = metadataConverters;
        this.serverTimeZone = serverTimeZone;
        this.userDefinedConverterFactory = userDefinedConverterFactory;
        this.tables = (List)Preconditions.checkNotNull(tables);
        this.schemaChangeResolver = schemaChangeResolver;
        this.tableSchemaChangeHandler = new TableSchemaChangeEventDispatcher();
        this.tableRowConverters = SeaTunnelRowDebeziumDeserializeSchema.createTableRowConverters(tables, metadataConverters, serverTimeZone, userDefinedConverterFactory);
    }

    @Override
    public void deserialize(SourceRecord record, Collector<SeaTunnelRow> collector) throws Exception {
        super.deserialize(record, collector);
        if (WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(record)) {
            collector.markSchemaChangeBeforeCheckpoint();
            return;
        }
        if (WatermarkEvent.isSchemaChangeAfterWatermarkEvent(record)) {
            collector.markSchemaChangeAfterCheckpoint();
            return;
        }
        if (SourceRecordUtils.isSchemaChangeEvent(record)) {
            this.deserializeSchemaChangeRecord(record, collector);
            return;
        }
        if (SourceRecordUtils.isDataChangeRecord(record)) {
            this.deserializeDataChangeRecord(record, collector);
            return;
        }
        log.debug("Unsupported record {}, just skip.", (Object)record);
    }

    private void deserializeSchemaChangeRecord(SourceRecord record, Collector<SeaTunnelRow> collector) {
        SchemaChangeEvent schemaChangeEvent = null;
        try {
            if (this.schemaChangeResolver != null) {
                schemaChangeEvent = this.schemaChangeResolver.resolve(record, this.tables);
            }
        }
        catch (Exception e) {
            log.warn("Failed to resolve schemaChangeEvent, just skip.", (Throwable)e);
            return;
        }
        if (schemaChangeEvent == null) {
            log.warn("Unsupported resolve schemaChangeEvent {}, just skip.", (Object)record);
            return;
        }
        boolean tableExist = false;
        for (int i = 0; i < this.tables.size(); ++i) {
            CatalogTable changeBefore = this.tables.get(i);
            if (!schemaChangeEvent.tablePath().equals((Object)changeBefore.getTablePath())) continue;
            tableExist = true;
            log.debug("Table[{}] change before: {}", (Object)schemaChangeEvent.tablePath(), (Object)changeBefore.getTableSchema());
            CatalogTable changeAfter = null;
            if (EventType.SCHEMA_CHANGE_UPDATE_COLUMNS.equals((Object)schemaChangeEvent.getEventType())) {
                AlterTableColumnsEvent alterTableColumnsEvent = (AlterTableColumnsEvent)schemaChangeEvent;
                for (AlterTableColumnEvent event : alterTableColumnsEvent.getEvents()) {
                    TableSchema changeAfterSchema = this.tableSchemaChangeHandler.reset(changeBefore.getTableSchema()).apply((SchemaChangeEvent)event);
                    changeAfter = CatalogTable.of((TableIdentifier)changeBefore.getTableId(), (TableSchema)changeAfterSchema, (Map)changeBefore.getOptions(), (List)changeBefore.getPartitionKeys(), (String)changeBefore.getComment());
                    event.setChangeAfter(changeAfter);
                    changeBefore = changeAfter;
                }
            } else {
                TableSchema changeAfterSchema = this.tableSchemaChangeHandler.reset(changeBefore.getTableSchema()).apply(schemaChangeEvent);
                changeAfter = CatalogTable.of((TableIdentifier)changeBefore.getTableId(), (TableSchema)changeAfterSchema, (Map)changeBefore.getOptions(), (List)changeBefore.getPartitionKeys(), (String)changeBefore.getComment());
            }
            this.tables.set(i, changeAfter);
            schemaChangeEvent.setChangeAfter(changeAfter);
            log.debug("Table[{}] change after: {}", (Object)schemaChangeEvent.tablePath(), (Object)changeAfter.getTableSchema());
            break;
        }
        if (!tableExist) {
            log.error("Not found table {}, skip schema change event {}", (Object)schemaChangeEvent.tablePath());
        }
        this.tableRowConverters = SeaTunnelRowDebeziumDeserializeSchema.createTableRowConverters(this.tables, this.metadataConverters, this.serverTimeZone, this.userDefinedConverterFactory);
        collector.collect(schemaChangeEvent);
    }

    private void deserializeDataChangeRecord(SourceRecord record, Collector<SeaTunnelRow> collector) throws Exception {
        SeaTunnelRowDebeziumDeserializationConverters converters;
        Envelope.Operation operation = Envelope.operationFor((SourceRecord)record);
        Struct messageStruct = (Struct)record.value();
        Schema valueSchema = record.valueSchema();
        TablePath tablePath = SourceRecordUtils.getTablePath(record);
        String tableId = tablePath.toString();
        if (this.tables.size() > 1) {
            converters = this.tableRowConverters.get(tableId);
            if (converters == null) {
                log.debug("Ignore newly added table {}", (Object)tableId);
                return;
            }
        } else {
            converters = this.tableRowConverters.get(DEFAULT_TABLE_NAME_KEY);
        }
        Long fetchTimestamp = SourceRecordUtils.getFetchTimestamp(record);
        Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(record);
        long delay = -1L;
        if (fetchTimestamp != null && messageTimestamp != null) {
            delay = fetchTimestamp - messageTimestamp;
        }
        if (operation == Envelope.Operation.CREATE || operation == Envelope.Operation.READ) {
            SeaTunnelRow insert = this.extractAfterRow(converters, record, messageStruct, valueSchema);
            insert.setRowKind(RowKind.INSERT);
            insert.setTableId(tableId);
            MetadataUtil.setDelay((SeaTunnelRow)insert, (Long)delay);
            MetadataUtil.setEventTime((SeaTunnelRow)insert, (Long)fetchTimestamp);
            collector.collect((Object)insert);
        } else if (operation == Envelope.Operation.DELETE) {
            SeaTunnelRow delete = this.extractBeforeRow(converters, record, messageStruct, valueSchema);
            delete.setRowKind(RowKind.DELETE);
            delete.setTableId(tableId);
            MetadataUtil.setDelay((SeaTunnelRow)delete, (Long)delay);
            MetadataUtil.setEventTime((SeaTunnelRow)delete, (Long)fetchTimestamp);
            collector.collect((Object)delete);
        } else if (operation == Envelope.Operation.UPDATE) {
            SeaTunnelRow before = this.extractBeforeRow(converters, record, messageStruct, valueSchema);
            before.setRowKind(RowKind.UPDATE_BEFORE);
            before.setTableId(tableId);
            MetadataUtil.setDelay((SeaTunnelRow)before, (Long)delay);
            MetadataUtil.setEventTime((SeaTunnelRow)before, (Long)fetchTimestamp);
            collector.collect((Object)before);
            SeaTunnelRow after = this.extractAfterRow(converters, record, messageStruct, valueSchema);
            after.setRowKind(RowKind.UPDATE_AFTER);
            after.setTableId(tableId);
            MetadataUtil.setDelay((SeaTunnelRow)after, (Long)delay);
            MetadataUtil.setEventTime((SeaTunnelRow)after, (Long)fetchTimestamp);
            collector.collect((Object)after);
        } else {
            log.warn("Received {} operation, skip", (Object)operation);
        }
    }

    private SeaTunnelRow extractAfterRow(SeaTunnelRowDebeziumDeserializationConverters runtimeConverter, SourceRecord record, Struct value, Schema valueSchema) throws Exception {
        Schema afterSchema = valueSchema.field("after").schema();
        Struct after = value.getStruct("after");
        return runtimeConverter.convert(record, after, afterSchema);
    }

    private SeaTunnelRow extractBeforeRow(SeaTunnelRowDebeziumDeserializationConverters runtimeConverter, SourceRecord record, Struct value, Schema valueSchema) throws Exception {
        Schema beforeSchema = valueSchema.field("before").schema();
        Struct before = value.getStruct("before");
        return runtimeConverter.convert(record, before, beforeSchema);
    }

    @Override
    public List<CatalogTable> getProducedType() {
        return this.tables;
    }

    @Override
    public SchemaChangeResolver getSchemaChangeResolver() {
        return this.schemaChangeResolver;
    }

    @Override
    public void restoreCheckpointProducedType(List<CatalogTable> checkpointDataType) {
        if (this.schemaChangeResolver == null) {
            return;
        }
        Map<TablePath, CatalogTable> latestTableMap = this.tables.stream().collect(Collectors.toMap(CatalogTable::getTablePath, t -> t));
        Map<TablePath, CatalogTable> restoreTableMap = checkpointDataType.stream().collect(Collectors.toMap(CatalogTable::getTablePath, t -> t));
        for (TablePath tablePath : restoreTableMap.keySet()) {
            CatalogTable latestTable = latestTableMap.get(tablePath);
            CatalogTable restoreTable = restoreTableMap.get(tablePath);
            if (latestTable == null) {
                log.info("Ignore restore table[{}] has been deleted.", (Object)tablePath);
                continue;
            }
            log.info("Table[{}] restore before: {}", (Object)tablePath, (Object)latestTable.getSeaTunnelRowType());
            latestTableMap.put(tablePath, restoreTable);
            log.info("Table[{}] restore after: {}", (Object)tablePath, (Object)restoreTable.getSeaTunnelRowType());
        }
        this.tables = new ArrayList<CatalogTable>(latestTableMap.values());
        this.tableRowConverters = SeaTunnelRowDebeziumDeserializeSchema.createTableRowConverters(this.tables, this.metadataConverters, this.serverTimeZone, this.userDefinedConverterFactory);
    }

    private static Map<String, SeaTunnelRowDebeziumDeserializationConverters> createTableRowConverters(List<CatalogTable> tables, MetadataConverter[] metadataConverters, ZoneId serverTimeZone, DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
        HashMap<String, SeaTunnelRowDebeziumDeserializationConverters> tableRowConverters = new HashMap<String, SeaTunnelRowDebeziumDeserializationConverters>();
        if (tables.size() > 1) {
            for (CatalogTable table : tables) {
                SeaTunnelRowDebeziumDeserializationConverters itemRowConverter = new SeaTunnelRowDebeziumDeserializationConverters(table.getSeaTunnelRowType(), metadataConverters, serverTimeZone, userDefinedConverterFactory);
                tableRowConverters.put(table.getTablePath().toString(), itemRowConverter);
            }
            return tableRowConverters;
        }
        SeaTunnelRowDebeziumDeserializationConverters tableRowConverter = new SeaTunnelRowDebeziumDeserializationConverters(tables.get(0).getSeaTunnelRowType(), metadataConverters, serverTimeZone, userDefinedConverterFactory);
        tableRowConverters.put(DEFAULT_TABLE_NAME_KEY, tableRowConverter);
        return tableRowConverters;
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder {
        private List<CatalogTable> tables;
        private MetadataConverter[] metadataConverters = new MetadataConverter[0];
        private ZoneId serverTimeZone = ZoneId.systemDefault();
        private DebeziumDeserializationConverterFactory userDefinedConverterFactory = DebeziumDeserializationConverterFactory.DEFAULT;
        private Map<TableId, Struct> tableIdTableChangeMap = new HashMap<TableId, Struct>();
        private SchemaChangeResolver schemaChangeResolver;

        public SeaTunnelRowDebeziumDeserializeSchema build() {
            return new SeaTunnelRowDebeziumDeserializeSchema(this.metadataConverters, this.tables, this.serverTimeZone, this.userDefinedConverterFactory, this.schemaChangeResolver, this.tableIdTableChangeMap);
        }

        public Builder setTables(List<CatalogTable> tables) {
            this.tables = tables;
            return this;
        }

        public Builder setMetadataConverters(MetadataConverter[] metadataConverters) {
            this.metadataConverters = metadataConverters;
            return this;
        }

        public Builder setServerTimeZone(ZoneId serverTimeZone) {
            this.serverTimeZone = serverTimeZone;
            return this;
        }

        public Builder setUserDefinedConverterFactory(DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
            this.userDefinedConverterFactory = userDefinedConverterFactory;
            return this;
        }

        public Builder setTableIdTableChangeMap(Map<TableId, Struct> tableIdTableChangeMap) {
            this.tableIdTableChangeMap = tableIdTableChangeMap;
            return this;
        }

        public Builder setSchemaChangeResolver(SchemaChangeResolver schemaChangeResolver) {
            this.schemaChangeResolver = schemaChangeResolver;
            return this;
        }

        private Builder() {
        }
    }
}

