/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.source.reader.fetch;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.changestream.OperationType;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBFetchTaskContext;
import org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.RawBsonDocument;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDBScanFetchTask
extends AbstractScanFetchTask {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBScanFetchTask.class);

    public MongoDBScanFetchTask(SnapshotSplit snapshotSplit) {
        super(snapshotSplit);
    }

    protected void executeDataSnapshot(FetchTask.Context context) throws Exception {
        ChangeEventQueue changeEventQueue = context.getQueue();
        MongoDBFetchTaskContext taskContext = (MongoDBFetchTaskContext)context;
        MongoDBSourceConfig sourceConfig = taskContext.getSourceConfig();
        TableId collectionId = this.snapshotSplit.getTableId();
        try (MongoCursor cursor = null;){
            MongoClient mongoClient = MongoUtils.clientFor(sourceConfig);
            MongoCollection<RawBsonDocument> collection = MongoUtils.collectionFor(mongoClient, collectionId, RawBsonDocument.class);
            cursor = collection.find().min((Bson)((BsonDocument)this.snapshotSplit.getSplitStart()[1])).max((Bson)((BsonDocument)this.snapshotSplit.getSplitEnd()[1])).hint((Bson)((BsonDocument)this.snapshotSplit.getSplitStart()[0])).batchSize(sourceConfig.getBatchSize()).noCursorTimeout(sourceConfig.disableCursorTimeout()).cursor();
            while (cursor.hasNext()) {
                if (!this.taskRunning) {
                    throw new InterruptedException("Interrupted while snapshotting collection " + collectionId.identifier());
                }
                BsonDocument valueDocument = this.normalizeSnapshotDocument(collectionId, (BsonDocument)cursor.next());
                BsonDocument keyDocument = new BsonDocument("_id", valueDocument.get((Object)"_id"));
                SourceRecord snapshotRecord = MongoRecordUtils.createSourceRecord(MongoRecordUtils.createPartitionMap(sourceConfig.getScheme(), sourceConfig.getHosts(), collectionId.catalog(), collectionId.table()), MongoRecordUtils.createSourceOffsetMap(keyDocument.getDocument((Object)"_id"), true), collectionId.identifier(), keyDocument, valueDocument);
                changeEventQueue.enqueue((Object)new DataChangeEvent(snapshotRecord));
            }
        }
    }

    protected void executeBackfillTask(FetchTask.Context context, StreamSplit backfillStreamSplit) throws Exception {
        MongoDBStreamFetchTask backfillStreamTask = new MongoDBStreamFetchTask(backfillStreamSplit);
        backfillStreamTask.execute(context);
    }

    protected void dispatchLowWaterMarkEvent(FetchTask.Context context, SourceSplitBase split, Offset lowWatermark) throws InterruptedException {
        ChangeEventQueue changeEventQueue = context.getQueue();
        changeEventQueue.enqueue((Object)new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(this.snapshotSplit.getTableId().identifier()), (String)"__mongodb_watermarks", (String)this.snapshotSplit.splitId(), (WatermarkKind)WatermarkKind.LOW, (Offset)lowWatermark)));
    }

    protected void dispatchHighWaterMarkEvent(FetchTask.Context context, SourceSplitBase split, Offset highWatermark) throws InterruptedException {
        ChangeEventQueue changeEventQueue = context.getQueue();
        changeEventQueue.enqueue((Object)new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(this.snapshotSplit.getTableId().identifier()), (String)"__mongodb_watermarks", (String)split.splitId(), (WatermarkKind)WatermarkKind.HIGH, (Offset)highWatermark)));
    }

    protected void dispatchEndWaterMarkEvent(FetchTask.Context context, SourceSplitBase split, Offset endWatermark) throws InterruptedException {
        ChangeEventQueue changeEventQueue = context.getQueue();
        changeEventQueue.enqueue((Object)new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(this.snapshotSplit.getTableId().identifier()), (String)"__mongodb_watermarks", (String)split.splitId(), (WatermarkKind)WatermarkKind.END, (Offset)endWatermark)));
    }

    private BsonDocument normalizeSnapshotDocument(TableId collectionId, BsonDocument originalDocument) {
        BsonDocument valueDocument = new BsonDocument();
        BsonDocument id = new BsonDocument();
        id.put("_id", originalDocument.get((Object)"_id"));
        valueDocument.put("_id", (BsonValue)id);
        valueDocument.put("operationType", (BsonValue)new BsonString(OperationType.INSERT.getValue()));
        BsonDocument ns = new BsonDocument();
        ns.put("db", (BsonValue)new BsonString(collectionId.catalog()));
        ns.put("coll", (BsonValue)new BsonString(collectionId.table()));
        valueDocument.put("ns", (BsonValue)ns);
        valueDocument.put("documentKey", (BsonValue)new BsonDocument("_id", originalDocument.get((Object)"_id")));
        valueDocument.put("fullDocument", (BsonValue)originalDocument);
        valueDocument.put("ts_ms", (BsonValue)new BsonInt64(System.currentTimeMillis()));
        BsonDocument source = new BsonDocument();
        source.put("snapshot", (BsonValue)new BsonString("true"));
        source.put("ts_ms", (BsonValue)new BsonInt64(0L));
        valueDocument.put("source", (BsonValue)source);
        return valueDocument;
    }
}

