/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.base.source.reader.external;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.SchemaNameAdjuster;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.apache.flink.table.types.logical.RowType;

public abstract class JdbcSourceFetchTaskContext
implements FetchTask.Context {
    protected final JdbcSourceConfig sourceConfig;
    protected final JdbcDataSourceDialect dataSourceDialect;
    protected CommonConnectorConfig dbzConnectorConfig;
    protected final SchemaNameAdjuster schemaNameAdjuster;

    public JdbcSourceFetchTaskContext(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
        this.sourceConfig = sourceConfig;
        this.dataSourceDialect = dataSourceDialect;
        this.dbzConnectorConfig = sourceConfig.getDbzConnectorConfig();
        this.schemaNameAdjuster = SchemaNameAdjuster.create();
    }

    @Override
    public TableId getTableId(SourceRecord record) {
        return SourceRecordUtils.getTableId(record);
    }

    @Override
    public boolean isDataChangeRecord(SourceRecord record) {
        return SourceRecordUtils.isDataChangeRecord(record);
    }

    @Override
    public boolean isRecordBetween(SourceRecord record, Object[] splitStart, Object[] splitEnd) {
        RowType splitKeyType = this.getSplitType(this.getDatabaseSchema().tableFor(this.getTableId(record)));
        Object[] key = SourceRecordUtils.getSplitKey(splitKeyType, record, this.getSchemaNameAdjuster());
        return SourceRecordUtils.splitKeyRangeContains(key, splitStart, splitEnd);
    }

    @Override
    public void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord) {
        Struct key = (Struct)changeRecord.key();
        Struct value = (Struct)changeRecord.value();
        if (value != null) {
            Envelope.Operation operation = Envelope.Operation.forCode(value.getString("op"));
            switch (operation) {
                case CREATE: 
                case UPDATE: {
                    Envelope envelope = Envelope.fromSchema(changeRecord.valueSchema());
                    Struct source = value.getStruct("source");
                    Struct after = value.getStruct("after");
                    Instant fetchTs = Instant.ofEpochMilli((Long)source.get("ts_ms"));
                    SourceRecord record = new SourceRecord(changeRecord.sourcePartition(), changeRecord.sourceOffset(), changeRecord.topic(), changeRecord.kafkaPartition(), changeRecord.keySchema(), changeRecord.key(), changeRecord.valueSchema(), envelope.read(after, source, fetchTs));
                    outputBuffer.put(key, record);
                    break;
                }
                case DELETE: {
                    outputBuffer.remove(key);
                    break;
                }
                case READ: {
                    throw new IllegalStateException(String.format("Data change record shouldn't use READ operation, the the record is %s.", changeRecord));
                }
            }
        }
    }

    @Override
    public List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords) {
        return snapshotRecords.stream().map(record -> {
            Envelope envelope = Envelope.fromSchema(record.valueSchema());
            Struct value = (Struct)record.value();
            Struct updateAfter = value.getStruct("after");
            Struct source = value.getStruct("source");
            source.put("ts_ms", (Object)0L);
            Instant fetchTs = Instant.ofEpochMilli(value.getInt64("ts_ms"));
            SourceRecord sourceRecord = new SourceRecord(record.sourcePartition(), record.sourceOffset(), record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), envelope.read(updateAfter, source, fetchTs));
            return sourceRecord;
        }).collect(Collectors.toList());
    }

    @Override
    public SourceConfig getSourceConfig() {
        return this.sourceConfig;
    }

    @Override
    public JdbcDataSourceDialect getDataSourceDialect() {
        return this.dataSourceDialect;
    }

    public CommonConnectorConfig getDbzConnectorConfig() {
        return this.dbzConnectorConfig;
    }

    public void setDbzConnectorConfig(CommonConnectorConfig dbzConnectorConfig) {
        this.dbzConnectorConfig = dbzConnectorConfig;
    }

    public SchemaNameAdjuster getSchemaNameAdjuster() {
        return SchemaNameAdjuster.create();
    }

    public abstract RelationalDatabaseSchema getDatabaseSchema();

    public abstract RowType getSplitType(Table var1);

    public abstract ErrorHandler getErrorHandler();

    public abstract JdbcSourceEventDispatcher getDispatcher();

    public abstract OffsetContext getOffsetContext();

    public abstract Partition getPartition();
}

