/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.kafka.connect.source.heartbeat;

import com.mongodb.client.MongoChangeStreamCursor;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.utils.Time;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeartbeatManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatManager.class);
    public static final String HEARTBEAT_KEY = "HEARTBEAT";
    private final Time time;
    private final MongoChangeStreamCursor<? extends BsonDocument> cursor;
    private final String heartbeatTopicName;
    private final long heartbeatIntervalMS;
    private final Map<String, Object> partitionMap;
    private long lastHeartbeatMS = 0L;
    private String lastResumeToken = "";

    public HeartbeatManager(Time time, MongoChangeStreamCursor<? extends BsonDocument> cursor, long heartbeatIntervalMS, String heartbeatTopicName, Map<String, Object> partitionMap) {
        this.time = time;
        this.cursor = cursor;
        this.heartbeatIntervalMS = heartbeatIntervalMS;
        this.heartbeatTopicName = heartbeatTopicName;
        this.partitionMap = partitionMap;
    }

    public Optional<SourceRecord> heartbeat() {
        if (this.cursor == null) {
            LOGGER.debug("Returning no heartbeat: null cursor");
            return Optional.empty();
        }
        if (this.heartbeatIntervalMS <= 0L) {
            LOGGER.debug("Returning no heartbeat: heartbeatIntervalMS not positive: {}", (Object)this.heartbeatIntervalMS);
            return Optional.empty();
        }
        long currentMS = this.time.milliseconds();
        long timeSinceHeartbeatMS = currentMS - this.lastHeartbeatMS;
        if (timeSinceHeartbeatMS <= this.heartbeatIntervalMS) {
            LOGGER.debug("Returning no heartbeat: timeSinceHeartbeat has not exceeded heartbeatInterval");
            return Optional.empty();
        }
        this.lastHeartbeatMS = currentMS;
        BsonDocument resumeTokenBson = this.cursor.getResumeToken();
        if (resumeTokenBson == null) {
            LOGGER.debug("Returning no heartbeat: cursor resumeToken is null");
            return Optional.empty();
        }
        String resumeToken = resumeTokenBson.toJson();
        if (resumeToken.equals(this.lastResumeToken)) {
            LOGGER.debug("Returning no heartbeat: same resumeToken");
            return Optional.empty();
        }
        LOGGER.info("Generating heartbeat event. {}", (Object)resumeToken);
        HashMap<String, String> sourceOffset = new HashMap<String, String>();
        sourceOffset.put("_id", resumeToken);
        sourceOffset.put(HEARTBEAT_KEY, "true");
        this.lastResumeToken = resumeToken;
        return Optional.of(new SourceRecord(this.partitionMap, sourceOffset, this.heartbeatTopicName, Schema.STRING_SCHEMA, resumeToken, Schema.OPTIONAL_BYTES_SCHEMA, null));
    }
}

