/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.base.source.reader;

import io.debezium.document.Array;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalSourceRecordEmitter<T>
implements RecordEmitter<SourceRecords, T, SourceSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceRecordEmitter.class);
    private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = new FlinkJsonTableChangeSerializer();
    protected final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
    protected final SourceReaderMetrics sourceReaderMetrics;
    protected final boolean includeSchemaChanges;
    protected final OutputCollector<T> outputCollector;
    protected final OffsetFactory offsetFactory;

    public IncrementalSourceRecordEmitter(DebeziumDeserializationSchema<T> debeziumDeserializationSchema, SourceReaderMetrics sourceReaderMetrics, boolean includeSchemaChanges, OffsetFactory offsetFactory) {
        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
        this.sourceReaderMetrics = sourceReaderMetrics;
        this.includeSchemaChanges = includeSchemaChanges;
        this.outputCollector = new OutputCollector();
        this.offsetFactory = offsetFactory;
    }

    public void emitRecord(SourceRecords sourceRecords, SourceOutput<T> output, SourceSplitState splitState) throws Exception {
        Iterator<SourceRecord> elementIterator = sourceRecords.iterator();
        while (elementIterator.hasNext()) {
            this.processElement(elementIterator.next(), output, splitState);
        }
    }

    protected void processElement(SourceRecord element, SourceOutput<T> output, SourceSplitState splitState) throws Exception {
        if (WatermarkEvent.isWatermarkEvent(element)) {
            LOG.trace("Process WatermarkEvent: {}; splitState = {}", (Object)element, (Object)splitState);
            Offset watermark = this.getWatermark(element);
            if (WatermarkEvent.isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
                LOG.trace("Set HighWatermark {} for {}", (Object)watermark, (Object)splitState);
                splitState.asSnapshotSplitState().setHighWatermark(watermark);
            }
        } else if (SourceRecordUtils.isSchemaChangeEvent(element) && splitState.isStreamSplitState()) {
            LOG.trace("Process SchemaChangeEvent: {}; splitState = {}", (Object)element, (Object)splitState);
            HistoryRecord historyRecord = SourceRecordUtils.getHistoryRecord(element);
            Array tableChanges = historyRecord.document().getArray("tableChanges");
            TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
            for (TableChanges.TableChange tableChange : changes) {
                splitState.asStreamSplitState().recordSchema(tableChange.getId(), tableChange);
            }
            if (this.includeSchemaChanges) {
                this.emitElement(element, output);
            }
        } else if (SourceRecordUtils.isDataChangeRecord(element)) {
            LOG.trace("Process DataChangeRecord: {}; splitState = {}", (Object)element, (Object)splitState);
            this.updateStreamSplitState(splitState, element);
            this.reportMetrics(element);
            this.emitElement(element, output);
        } else if (SourceRecordUtils.isHeartbeatEvent(element)) {
            LOG.trace("Process Heartbeat: {}; splitState = {}", (Object)element, (Object)splitState);
            this.updateStreamSplitState(splitState, element);
        } else {
            LOG.info("Meet unknown element {} for splitState = {}, just skip.", (Object)element, (Object)splitState);
            this.sourceReaderMetrics.addNumRecordsInErrors(1L);
        }
    }

    private void updateStreamSplitState(SourceSplitState splitState, SourceRecord element) {
        if (splitState.isStreamSplitState()) {
            Offset position = this.getOffsetPosition(element);
            splitState.asStreamSplitState().setStartingOffset(position);
        }
    }

    private Offset getWatermark(SourceRecord watermarkEvent) {
        return this.getOffsetPosition(watermarkEvent.sourceOffset());
    }

    public Offset getOffsetPosition(SourceRecord dataRecord) {
        return this.getOffsetPosition(dataRecord.sourceOffset());
    }

    public Offset getOffsetPosition(Map<String, ?> offset) {
        HashMap<String, String> offsetStrMap = new HashMap<String, String>();
        for (Map.Entry<String, ?> entry : offset.entrySet()) {
            offsetStrMap.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
        }
        return this.offsetFactory.newOffset(offsetStrMap);
    }

    protected void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception {
        this.sourceReaderMetrics.markRecord();
        this.sourceReaderMetrics.updateRecordCounters(element);
        this.outputCollector.output = output;
        this.outputCollector.currentMessageTimestamp = SourceRecordUtils.getMessageTimestamp(element);
        this.debeziumDeserializationSchema.deserialize(element, this.outputCollector);
    }

    protected void reportMetrics(SourceRecord element) {
        Long fetchTimestamp;
        Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(element);
        if (messageTimestamp != null && messageTimestamp > 0L && (fetchTimestamp = SourceRecordUtils.getFetchTimestamp(element)) != null) {
            this.sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp);
        }
    }

    protected static class OutputCollector<T>
    implements Collector<T> {
        public SourceOutput<T> output;
        public Long currentMessageTimestamp;

        protected OutputCollector() {
        }

        public void collect(T record) {
            if (this.currentMessageTimestamp != null && this.currentMessageTimestamp > 0L) {
                this.output.collect(record, this.currentMessageTimestamp.longValue());
            } else {
                this.output.collect(record);
            }
        }

        public void close() {
        }
    }
}

