/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import com.mongodb.CursorType;
import com.mongodb.ServerAddress;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import io.debezium.DebeziumException;
import io.debezium.connector.mongodb.CollectionId;
import io.debezium.connector.mongodb.ConnectionContext;
import io.debezium.connector.mongodb.DisconnectEvent;
import io.debezium.connector.mongodb.MongoDbChangeSnapshotOplogRecordEmitter;
import io.debezium.connector.mongodb.MongoDbChangeStreamChangeRecordEmitter;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbIncrementalSnapshotContext;
import io.debezium.connector.mongodb.MongoDbOffsetContext;
import io.debezium.connector.mongodb.MongoDbPartition;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.MongoUtil;
import io.debezium.connector.mongodb.PrimaryElectionEvent;
import io.debezium.connector.mongodb.ReplicaSet;
import io.debezium.connector.mongodb.ReplicaSetOffsetContext;
import io.debezium.connector.mongodb.ReplicaSetPartition;
import io.debezium.connector.mongodb.ReplicaSets;
import io.debezium.connector.mongodb.SourceInfo;
import io.debezium.data.Envelope;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Threads;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbStreamingChangeEventSource
implements StreamingChangeEventSource<MongoDbPartition, MongoDbOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStreamingChangeEventSource.class);
    private static final String AUTHORIZATION_FAILURE_MESSAGE = "Command failed with error 13";
    private static final String OPERATION_FIELD = "op";
    private static final String OBJECT_FIELD = "o";
    private static final String OPERATION_CONTROL = "c";
    private static final String TX_OPS = "applyOps";
    private final MongoDbConnectorConfig connectorConfig;
    private final EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final ConnectionContext connectionContext;
    private final ReplicaSets replicaSets;
    private final MongoDbTaskContext taskContext;

    public MongoDbStreamingChangeEventSource(MongoDbConnectorConfig connectorConfig, MongoDbTaskContext taskContext, ReplicaSets replicaSets, EventDispatcher<MongoDbPartition, CollectionId> dispatcher, ErrorHandler errorHandler, Clock clock) {
        this.connectorConfig = connectorConfig;
        this.connectionContext = taskContext.getConnectionContext();
        this.dispatcher = dispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.replicaSets = replicaSets;
        this.taskContext = taskContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(ChangeEventSource.ChangeEventSourceContext context, MongoDbPartition partition, MongoDbOffsetContext offsetContext) throws InterruptedException {
        List<ReplicaSet> validReplicaSets = this.replicaSets.validReplicaSets();
        if (offsetContext == null) {
            offsetContext = this.initializeOffsets(this.connectorConfig, partition, this.replicaSets);
        }
        try {
            if (validReplicaSets.size() == 1) {
                this.streamChangesForReplicaSet(context, partition, validReplicaSets.get(0), offsetContext);
            } else if (validReplicaSets.size() > 1) {
                this.streamChangesForReplicaSets(context, partition, validReplicaSets, offsetContext);
            }
        }
        finally {
            this.taskContext.getConnectionContext().shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void streamChangesForReplicaSet(ChangeEventSource.ChangeEventSourceContext context, MongoDbPartition partition, ReplicaSet replicaSet, MongoDbOffsetContext offsetContext) {
        ConnectionContext.MongoPrimary primaryClient = null;
        try {
            primaryClient = this.establishConnectionToPrimary(partition, replicaSet);
            if (primaryClient != null) {
                AtomicReference<ConnectionContext.MongoPrimary> primaryReference = new AtomicReference<ConnectionContext.MongoPrimary>(primaryClient);
                primaryClient.execute("read from oplog on '" + replicaSet + "'", primary -> {
                    if (this.taskContext.getCaptureMode().isChangeStreams()) {
                        this.readChangeStream((MongoClient)primary, (ConnectionContext.MongoPrimary)primaryReference.get(), replicaSet, context, offsetContext);
                    } else {
                        this.readOplog((MongoClient)primary, (ConnectionContext.MongoPrimary)primaryReference.get(), replicaSet, context, offsetContext);
                    }
                });
            }
        }
        catch (Throwable t) {
            LOGGER.error("Streaming for replica set {} failed", (Object)replicaSet.replicaSetName(), (Object)t);
            this.errorHandler.setProducerThrowable(t);
        }
        finally {
            if (primaryClient != null) {
                primaryClient.stop();
            }
        }
    }

    private void streamChangesForReplicaSets(ChangeEventSource.ChangeEventSourceContext context, MongoDbPartition partition, List<ReplicaSet> replicaSets, MongoDbOffsetContext offsetContext) {
        int threads = replicaSets.size();
        ExecutorService executor = Threads.newFixedThreadPool(MongoDbConnector.class, (String)this.taskContext.serverName(), (String)"replicator-streaming", (int)threads);
        CountDownLatch latch = new CountDownLatch(threads);
        LOGGER.info("Starting {} thread(s) to stream changes for replica sets: {}", (Object)threads, replicaSets);
        replicaSets.forEach(replicaSet -> executor.submit(() -> {
            try {
                this.streamChangesForReplicaSet(context, partition, (ReplicaSet)replicaSet, offsetContext);
            }
            finally {
                latch.countDown();
            }
        }));
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        executor.shutdown();
    }

    private ConnectionContext.MongoPrimary establishConnectionToPrimary(MongoDbPartition partition, ReplicaSet replicaSet) {
        return this.connectionContext.primaryFor(replicaSet, this.taskContext.filters(), (desc, error) -> {
            if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
                throw new ConnectException("Error while attempting to " + desc, error);
            }
            this.dispatcher.dispatchConnectorEvent((Partition)partition, (ConnectorEvent)new DisconnectEvent());
            LOGGER.error("Error while attempting to {}: {}", new Object[]{desc, error.getMessage(), error});
            throw new ConnectException("Error while attempting to " + desc, error);
        });
    }

    private void readOplog(MongoClient primary, ConnectionContext.MongoPrimary primaryClient, ReplicaSet replicaSet, ChangeEventSource.ChangeEventSourceContext context, MongoDbOffsetContext offsetContext) {
        ReplicaSetPartition rsPartition = offsetContext.getReplicaSetPartition(replicaSet);
        ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
        BsonTimestamp oplogStart = rsOffsetContext.lastOffsetTimestamp();
        OptionalLong txOrder = rsOffsetContext.lastOffsetTxOrder();
        ServerAddress primaryAddress = MongoUtil.getPrimaryAddress(primary);
        LOGGER.info("Reading oplog for '{}' primary {} starting at {}", new Object[]{replicaSet, primaryAddress, oplogStart});
        MongoCollection<Document> oplog = primary.getDatabase("local").getCollection("oplog.rs");
        if (!this.isStartPositionInOplog(oplogStart, oplog)) {
            throw new DebeziumException("Failed to find starting position '" + oplogStart + "' in oplog");
        }
        ReplicaSetOplogContext oplogContext = new ReplicaSetOplogContext(rsPartition, rsOffsetContext, primaryClient, replicaSet);
        Bson filter = null;
        if (!txOrder.isPresent()) {
            LOGGER.info("The last event processed was not transactional, resuming at the oplog event after '{}'", (Object)oplogStart);
            filter = Filters.and(Filters.gt("ts", oplogStart), Filters.exists("fromMigrate", false));
        } else {
            LOGGER.info("The last event processed was transactional, resuming at the oplog event '{}', expecting to skip '{}' events", (Object)oplogStart, (Object)txOrder.getAsLong());
            filter = Filters.and(Filters.gte("ts", oplogStart), Filters.exists("fromMigrate", false));
            oplogContext.setIncompleteEventTimestamp(oplogStart);
            oplogContext.setIncompleteTxOrder(txOrder.getAsLong());
        }
        Bson operationFilter = this.getOplogSkippedOperationsFilter();
        if (operationFilter != null) {
            filter = Filters.and(filter, operationFilter);
        }
        FindIterable<Document> results = oplog.find(filter).sort(new Document("$natural", 1)).oplogReplay(true).cursorType(CursorType.TailableAwait).noCursorTimeout(true);
        if (this.connectorConfig.getCursorMaxAwaitTime() > 0) {
            results = results.maxAwaitTime(this.connectorConfig.getCursorMaxAwaitTime(), TimeUnit.MILLISECONDS);
        }
        try (Iterator cursor = results.iterator();){
            Metronome pause = Metronome.sleeper((Duration)Duration.ofMillis(500L), (Clock)this.clock);
            while (context.isRunning()) {
                Document event = (Document)cursor.tryNext();
                if (event != null) {
                    if (!this.handleOplogEvent(primaryAddress, event, event, 0L, oplogContext)) {
                        return;
                    }
                    try {
                        this.dispatcher.dispatchHeartbeatEvent((Partition)oplogContext.getPartition(), (OffsetContext)oplogContext.getOffset());
                        continue;
                    }
                    catch (InterruptedException e) {
                        LOGGER.info("Replicator thread is interrupted");
                        Thread.currentThread().interrupt();
                        if (cursor != null) {
                            cursor.close();
                        }
                        return;
                    }
                }
                try {
                    pause.pause();
                }
                catch (InterruptedException e) {
                    break;
                }
            }
        }
    }

    private List<String> getChangeStreamSkippedOperationsFilter() {
        EnumSet skippedOperations = this.taskContext.getConnectorConfig().getSkippedOperations();
        ArrayList<String> includedOperations = new ArrayList<String>();
        if (!skippedOperations.contains(Envelope.Operation.CREATE)) {
            includedOperations.add("insert");
        }
        if (!skippedOperations.contains(Envelope.Operation.UPDATE)) {
            includedOperations.add("update");
            includedOperations.add("replace");
        }
        if (!skippedOperations.contains(Envelope.Operation.DELETE)) {
            includedOperations.add("delete");
        }
        return includedOperations;
    }

    private void readChangeStream(MongoClient primary, ConnectionContext.MongoPrimary primaryClient, ReplicaSet replicaSet, ChangeEventSource.ChangeEventSourceContext context, MongoDbOffsetContext offsetContext) {
        ReplicaSetPartition rsPartition = offsetContext.getReplicaSetPartition(replicaSet);
        ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
        BsonTimestamp oplogStart = rsOffsetContext.lastOffsetTimestamp();
        OptionalLong txOrder = rsOffsetContext.lastOffsetTxOrder();
        ReplicaSetOplogContext oplogContext = new ReplicaSetOplogContext(rsPartition, rsOffsetContext, primaryClient, replicaSet);
        ServerAddress primaryAddress = MongoUtil.getPrimaryAddress(primary);
        LOGGER.info("Reading change stream for '{}' primary {} starting at {}", new Object[]{replicaSet, primaryAddress, oplogStart});
        Bson filters = Filters.in("operationType", this.getChangeStreamSkippedOperationsFilter());
        if (rsOffsetContext.lastResumeToken() == null) {
            filters = Filters.and(filters, Filters.ne("clusterTime", oplogStart));
        }
        ChangeStreamIterable<Document> rsChangeStream = primary.watch(Arrays.asList(Aggregates.match(filters)));
        if (this.taskContext.getCaptureMode().isFullUpdate()) {
            rsChangeStream.fullDocument(FullDocument.UPDATE_LOOKUP);
        }
        if (rsOffsetContext.lastResumeToken() != null) {
            LOGGER.info("Resuming streaming from token '{}'", (Object)rsOffsetContext.lastResumeToken());
            BsonDocument doc = new BsonDocument();
            doc.put("_data", new BsonString(rsOffsetContext.lastResumeToken()));
            rsChangeStream.resumeAfter(doc);
        } else if (oplogStart.getTime() > 0) {
            LOGGER.info("Resume token not available, starting streaming from time '{}'", (Object)oplogStart);
            rsChangeStream.startAtOperationTime(oplogStart);
        }
        if (this.connectorConfig.getCursorMaxAwaitTime() > 0) {
            rsChangeStream.maxAwaitTime(this.connectorConfig.getCursorMaxAwaitTime(), TimeUnit.MILLISECONDS);
        }
        try (Iterator cursor = rsChangeStream.iterator();){
            Metronome pause = Metronome.sleeper((Duration)Duration.ofMillis(500L), (Clock)this.clock);
            while (context.isRunning()) {
                ChangeStreamDocument event = (ChangeStreamDocument)cursor.tryNext();
                if (event != null) {
                    LOGGER.trace("Arrived Change Stream event: {}", (Object)event);
                    if (!this.taskContext.filters().databaseFilter().test(event.getDatabaseName())) {
                        LOGGER.debug("Skipping the event for database '{}' based on database include/exclude list", (Object)event.getDatabaseName());
                    } else {
                        oplogContext.getOffset().changeStreamEvent(event, txOrder);
                        oplogContext.getOffset().getOffset();
                        CollectionId collectionId = new CollectionId(replicaSet.replicaSetName(), event.getNamespace().getDatabaseName(), event.getNamespace().getCollectionName());
                        if (this.taskContext.filters().collectionFilter().test(collectionId)) {
                            try {
                                this.dispatcher.dispatchDataChangeEvent((Partition)oplogContext.getPartition(), (DataCollectionId)collectionId, (ChangeRecordEmitter)new MongoDbChangeStreamChangeRecordEmitter(oplogContext.getPartition(), oplogContext.getOffset(), this.clock, event));
                            }
                            catch (Exception e) {
                                this.errorHandler.setProducerThrowable((Throwable)e);
                                if (cursor != null) {
                                    cursor.close();
                                }
                                return;
                            }
                        }
                    }
                    try {
                        this.dispatcher.dispatchHeartbeatEvent((Partition)oplogContext.getPartition(), (OffsetContext)oplogContext.getOffset());
                        continue;
                    }
                    catch (InterruptedException e) {
                        LOGGER.info("Replicator thread is interrupted");
                        Thread.currentThread().interrupt();
                        if (cursor != null) {
                            cursor.close();
                        }
                        return;
                    }
                }
                try {
                    pause.pause();
                }
                catch (InterruptedException e) {
                    break;
                }
            }
        }
    }

    private boolean isStartPositionInOplog(BsonTimestamp startTime, MongoCollection<Document> oplog) {
        Iterator iterator = oplog.find().iterator();
        if (!iterator.hasNext()) {
            return false;
        }
        BsonTimestamp timestamp = (BsonTimestamp)((Object)((Document)iterator.next()).get((Object)"ts", BsonTimestamp.class));
        if (timestamp == null) {
            return false;
        }
        return timestamp.compareTo(startTime) <= 0;
    }

    private Bson getOplogSkippedOperationsFilter() {
        EnumSet skippedOperations = this.taskContext.getConnectorConfig().getSkippedOperations();
        if (skippedOperations.isEmpty()) {
            return null;
        }
        Bson skippedOperationsFilter = null;
        for (Envelope.Operation operation : skippedOperations) {
            Bson skippedOperationFilter = Filters.ne(OPERATION_FIELD, operation.code());
            if (skippedOperationsFilter == null) {
                skippedOperationsFilter = skippedOperationFilter;
                continue;
            }
            skippedOperationsFilter = Filters.or(skippedOperationsFilter, skippedOperationFilter);
        }
        return skippedOperationsFilter;
    }

    private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, Document masterEvent, long txOrder, ReplicaSetOplogContext oplogContext) {
        String ns = event.getString("ns");
        Document object = (Document)((Object)event.get((Object)OBJECT_FIELD, Document.class));
        if (Objects.isNull(object)) {
            if (LOGGER.isWarnEnabled()) {
                LOGGER.warn("Missing 'o' field in event, so skipping {}", (Object)event.toJson());
            }
            return true;
        }
        if (Objects.isNull(ns) || ns.isEmpty()) {
            String msg = object.getString("msg");
            if ("new primary".equals(msg)) {
                AtomicReference address = new AtomicReference();
                try {
                    oplogContext.getPrimary().executeBlocking("conn", (BlockingConsumer<MongoClient>)((BlockingConsumer)mongoClient -> {
                        ServerAddress currentPrimary = MongoUtil.getPrimaryAddress(mongoClient);
                        address.set(currentPrimary);
                    }));
                }
                catch (InterruptedException e) {
                    LOGGER.error("Get current primary executeBlocking", (Throwable)e);
                }
                ServerAddress serverAddress = (ServerAddress)address.get();
                if (Objects.nonNull(serverAddress) && !serverAddress.equals(primaryAddress)) {
                    LOGGER.info("Found new primary event in oplog, so stopping use of {} to continue with new primary {}", (Object)primaryAddress, (Object)serverAddress);
                } else {
                    LOGGER.info("Found new primary event in oplog, current {} is new primary. Continue to process oplog event.", (Object)primaryAddress);
                }
                this.dispatcher.dispatchConnectorEvent((Partition)oplogContext.getPartition(), (ConnectorEvent)new PrimaryElectionEvent(serverAddress));
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Skipping event with no namespace: {}", (Object)event.toJson());
            }
            return true;
        }
        List<Document> txChanges = this.transactionChanges(event);
        if (!txChanges.isEmpty()) {
            if (Objects.nonNull(oplogContext.getIncompleteEventTimestamp())) {
                if (oplogContext.getIncompleteEventTimestamp().equals(SourceInfo.extractEventTimestamp(event))) {
                    for (Document change : txChanges) {
                        if (++txOrder <= oplogContext.getIncompleteTxOrder()) {
                            LOGGER.debug("Skipping record as it is expected to be already processed: {}", (Object)change);
                            continue;
                        }
                        boolean r = this.handleOplogEvent(primaryAddress, change, event, txOrder, oplogContext);
                        if (r) continue;
                        return false;
                    }
                }
                oplogContext.setIncompleteEventTimestamp(null);
                return true;
            }
            try {
                this.dispatcher.dispatchTransactionStartedEvent((Partition)oplogContext.getPartition(), MongoDbStreamingChangeEventSource.getTransactionId(event), (OffsetContext)oplogContext.getOffset());
                for (Document change : txChanges) {
                    boolean r = this.handleOplogEvent(primaryAddress, change, event, ++txOrder, oplogContext);
                    if (r) continue;
                    return false;
                }
                this.dispatcher.dispatchTransactionCommittedEvent((Partition)oplogContext.getPartition(), (OffsetContext)oplogContext.getOffset());
            }
            catch (InterruptedException e) {
                LOGGER.error("Streaming transaction changes for replica set '{}' was interrupted", (Object)oplogContext.getReplicaSetName());
                throw new ConnectException("Streaming of transaction changes was interrupted for replica set " + oplogContext.getReplicaSetName(), (Throwable)e);
            }
            return true;
        }
        String operation = event.getString(OPERATION_FIELD);
        if (!MongoDbChangeSnapshotOplogRecordEmitter.isValidOperation(operation)) {
            LOGGER.debug("Skipping event with \"op={}\"", (Object)operation);
            return true;
        }
        int delimIndex = ns.indexOf(46);
        if (delimIndex > 0) {
            assert (delimIndex + 1 < ns.length());
            String dbName = ns.substring(0, delimIndex);
            String collectionName = ns.substring(delimIndex + 1);
            if ("$cmd".equals(collectionName)) {
                LOGGER.debug("Skipping database command event: {}", (Object)event.toJson());
                return true;
            }
            if (!this.taskContext.filters().databaseFilter().test(dbName)) {
                LOGGER.debug("Skipping the event for database '{}' based on database include/exclude list", (Object)dbName);
                return true;
            }
            oplogContext.getOffset().oplogEvent(event, masterEvent, txOrder);
            oplogContext.getOffset().getOffset();
            CollectionId collectionId = new CollectionId(oplogContext.getReplicaSetName(), dbName, collectionName);
            if (this.taskContext.filters().collectionFilter().test(collectionId)) {
                try {
                    return this.dispatcher.dispatchDataChangeEvent((Partition)oplogContext.getPartition(), (DataCollectionId)collectionId, (ChangeRecordEmitter)new MongoDbChangeSnapshotOplogRecordEmitter(oplogContext.getPartition(), oplogContext.getOffset(), this.clock, event, false));
                }
                catch (Exception e) {
                    this.errorHandler.setProducerThrowable((Throwable)e);
                    return false;
                }
            }
        }
        return true;
    }

    private List<Document> transactionChanges(Document event) {
        String op = event.getString(OPERATION_FIELD);
        Document o = (Document)((Object)event.get((Object)OBJECT_FIELD, Document.class));
        if (!(OPERATION_CONTROL.equals(op) && Objects.nonNull(o) && o.containsKey(TX_OPS))) {
            return Collections.emptyList();
        }
        return (List)((Object)o.get((Object)TX_OPS, List.class));
    }

    protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connectorConfig, MongoDbPartition partition, ReplicaSets replicaSets) {
        LinkedHashMap<ReplicaSet, Document> positions = new LinkedHashMap<ReplicaSet, Document>();
        replicaSets.onEachReplicaSet(replicaSet -> {
            LOGGER.info("Determine Snapshot Offset for replica-set {}", (Object)replicaSet.replicaSetName());
            ConnectionContext.MongoPrimary primaryClient = this.establishConnectionToPrimary(partition, (ReplicaSet)replicaSet);
            if (primaryClient != null) {
                try {
                    primaryClient.execute("get oplog position", primary -> positions.put((ReplicaSet)replicaSet, MongoUtil.getOplogEntry(primary, -1, LOGGER)));
                }
                finally {
                    LOGGER.info("Stopping primary client");
                    primaryClient.stop();
                }
            }
        });
        return new MongoDbOffsetContext(new SourceInfo(connectorConfig), new TransactionContext(), new MongoDbIncrementalSnapshotContext<CollectionId>(false), positions);
    }

    private static String getTransactionId(Document event) {
        Long operationId = event.getLong("h");
        if (operationId != null && operationId != 0L) {
            return Long.toString(operationId);
        }
        return MongoUtil.getOplogSessionTransactionId(event);
    }

    private class ReplicaSetOplogContext {
        private final ReplicaSetPartition partition;
        private final ReplicaSetOffsetContext offset;
        private final ConnectionContext.MongoPrimary primary;
        private final ReplicaSet replicaSet;
        private BsonTimestamp incompleteEventTimestamp;
        private long incompleteTxOrder = 0L;

        ReplicaSetOplogContext(ReplicaSetPartition partition, ReplicaSetOffsetContext offsetContext, ConnectionContext.MongoPrimary primary, ReplicaSet replicaSet) {
            this.partition = partition;
            this.offset = offsetContext;
            this.primary = primary;
            this.replicaSet = replicaSet;
        }

        ReplicaSetPartition getPartition() {
            return this.partition;
        }

        ReplicaSetOffsetContext getOffset() {
            return this.offset;
        }

        ConnectionContext.MongoPrimary getPrimary() {
            return this.primary;
        }

        String getReplicaSetName() {
            return this.replicaSet.replicaSetName();
        }

        BsonTimestamp getIncompleteEventTimestamp() {
            return this.incompleteEventTimestamp;
        }

        public void setIncompleteEventTimestamp(BsonTimestamp incompleteEventTimestamp) {
            this.incompleteEventTimestamp = incompleteEventTimestamp;
        }

        public long getIncompleteTxOrder() {
            return this.incompleteTxOrder;
        }

        public void setIncompleteTxOrder(long incompleteTxOrder) {
            this.incompleteTxOrder = incompleteTxOrder;
        }
    }
}

