/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source;

import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
import org.bson.BsonDocument;

public final class MongoDBRecordEmitter<T>
extends IncrementalSourceRecordEmitter<T> {
    public MongoDBRecordEmitter(DebeziumDeserializationSchema<T> deserializationSchema, OffsetFactory offsetFactory, SourceReader.Context context) {
        super(deserializationSchema, offsetFactory, context);
    }

    protected void processElement(SourceRecord element, Collector<T> output, SourceSplitStateBase splitState) throws Exception {
        if (WatermarkEvent.isWatermarkEvent((SourceRecord)element)) {
            Offset watermark = this.getOffsetPosition(element);
            if (WatermarkEvent.isLowWatermarkEvent((SourceRecord)element) && splitState.isSnapshotSplitState()) {
                splitState.asSnapshotSplitState().setLowWatermark(watermark);
            } else if (WatermarkEvent.isHighWatermarkEvent((SourceRecord)element) && splitState.isSnapshotSplitState()) {
                splitState.asSnapshotSplitState().setHighWatermark(watermark);
            } else if ((WatermarkEvent.isSchemaChangeBeforeWatermarkEvent((SourceRecord)element) || WatermarkEvent.isSchemaChangeAfterWatermarkEvent((SourceRecord)element)) && splitState.isIncrementalSplitState()) {
                this.emitElement(element, output);
            }
        } else if (MongodbRecordUtils.isDataChangeRecord(element) || MongodbRecordUtils.isHeartbeatEvent(element)) {
            if (splitState.isIncrementalSplitState()) {
                this.updatePositionForStreamSplit(element, splitState);
            }
            this.emitElement(element, output);
        } else {
            this.emitElement(element, output);
        }
    }

    private void updatePositionForStreamSplit(SourceRecord element, SourceSplitStateBase splitState) {
        BsonDocument resumeToken = MongodbRecordUtils.getResumeToken(element);
        IncrementalSplitState streamSplitState = splitState.asIncrementalSplitState();
        ChangeStreamOffset offset = (ChangeStreamOffset)streamSplitState.getStartupOffset();
        if (offset != null) {
            offset.updatePosition(resumeToken);
        }
        splitState.asIncrementalSplitState().setStartupOffset((Offset)offset);
    }
}

