/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.source.utils;

import com.mongodb.MongoCommandException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import io.debezium.relational.TableId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import org.apache.flink.cdc.connectors.mongodb.source.connection.MongoClientPool;
import org.apache.flink.cdc.connectors.mongodb.source.offset.ChangeStreamDescriptor;
import org.apache.flink.cdc.connectors.mongodb.source.utils.CollectionDiscoveryUtils;
import org.apache.flink.util.Preconditions;
import org.bson.BsonDocument;
import org.bson.BsonDouble;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoUtils {
    private static final Logger LOG = LoggerFactory.getLogger(MongoUtils.class);
    public static final BsonDouble COMMAND_SUCCEED_FLAG = new BsonDouble(1.0);
    public static final int FAILED_TO_PARSE_ERROR = 9;
    public static final int UNAUTHORIZED_ERROR = 13;
    public static final int ILLEGAL_OPERATION_ERROR = 20;
    public static final int INVALIDATED_RESUME_TOKEN_ERROR = 260;
    public static final int CHANGE_STREAM_FATAL_ERROR = 280;
    public static final int CHANGE_STREAM_HISTORY_LOST = 286;
    public static final int BSON_OBJECT_TOO_LARGE = 10334;
    public static final int UNKNOWN_FIELD_ERROR = 40415;
    private static final Set<Integer> INVALID_CHANGE_STREAM_ERRORS = new HashSet<Integer>(Arrays.asList(260, 280, 286, 10334));
    private static final String RESUME_TOKEN = "resume token";
    private static final String NOT_FOUND = "not found";
    private static final String DOES_NOT_EXIST = "does not exist";
    private static final String INVALID_RESUME_TOKEN = "invalid resume token";
    private static final String NO_LONGER_IN_THE_OPLOG = "no longer be in the oplog";

    private MongoUtils() {
    }

    public static ChangeStreamDescriptor getChangeStreamDescriptor(MongoDBSourceConfig sourceConfig, List<String> discoveredDatabases, List<String> discoveredCollections) {
        ChangeStreamDescriptor changeStreamFilter;
        List<String> databaseList = sourceConfig.getDatabaseList();
        List<String> collectionList = sourceConfig.getCollectionList();
        if (collectionList != null) {
            if (CollectionDiscoveryUtils.isIncludeListExplicitlySpecified(collectionList, discoveredCollections)) {
                changeStreamFilter = ChangeStreamDescriptor.collection(TableId.parse(discoveredCollections.get(0)));
            } else {
                Pattern namespaceRegex = CollectionDiscoveryUtils.includeListAsFlatPattern(collectionList);
                if (databaseList != null) {
                    if (CollectionDiscoveryUtils.isIncludeListExplicitlySpecified(databaseList, discoveredDatabases)) {
                        changeStreamFilter = ChangeStreamDescriptor.database(discoveredDatabases.get(0), namespaceRegex);
                    } else {
                        Pattern databaseRegex = CollectionDiscoveryUtils.includeListAsFlatPattern(databaseList);
                        changeStreamFilter = ChangeStreamDescriptor.deployment(databaseRegex, namespaceRegex);
                    }
                } else {
                    changeStreamFilter = ChangeStreamDescriptor.deployment(null, namespaceRegex);
                }
            }
        } else if (databaseList != null) {
            if (CollectionDiscoveryUtils.isIncludeListExplicitlySpecified(databaseList, discoveredDatabases)) {
                changeStreamFilter = ChangeStreamDescriptor.database(discoveredDatabases.get(0));
            } else {
                Pattern databaseRegex = CollectionDiscoveryUtils.includeListAsFlatPattern(databaseList);
                changeStreamFilter = ChangeStreamDescriptor.deployment(databaseRegex);
            }
        } else {
            changeStreamFilter = ChangeStreamDescriptor.deployment();
        }
        return changeStreamFilter;
    }

    public static ChangeStreamIterable<Document> getChangeStreamIterable(MongoDBSourceConfig sourceConfig, ChangeStreamDescriptor descriptor) {
        return MongoUtils.getChangeStreamIterable(MongoUtils.clientFor(sourceConfig), descriptor.getDatabase(), descriptor.getCollection(), descriptor.getDatabaseRegex(), descriptor.getNamespaceRegex(), sourceConfig.getBatchSize(), sourceConfig.isUpdateLookup(), sourceConfig.isFullDocPrePostImageEnabled());
    }

    public static ChangeStreamIterable<Document> getChangeStreamIterable(MongoClient mongoClient, ChangeStreamDescriptor descriptor, int batchSize, boolean updateLookup, boolean fullDocPrePostImage) {
        return MongoUtils.getChangeStreamIterable(mongoClient, descriptor.getDatabase(), descriptor.getCollection(), descriptor.getDatabaseRegex(), descriptor.getNamespaceRegex(), batchSize, updateLookup, fullDocPrePostImage);
    }

    public static ChangeStreamIterable<Document> getChangeStreamIterable(MongoClient mongoClient, @Nullable String database, @Nullable String collection, @Nullable Pattern databaseRegex, @Nullable Pattern namespaceRegex, int batchSize, boolean updateLookup, boolean fullDocPrePostImage) {
        ChangeStreamIterable<Document> changeStream;
        if (StringUtils.isNotEmpty((CharSequence)database) && StringUtils.isNotEmpty((CharSequence)collection)) {
            MongoCollection<Document> coll = mongoClient.getDatabase(database).getCollection(collection);
            LOG.info("Preparing change stream for collection {}.{}", (Object)database, (Object)collection);
            changeStream = coll.watch();
        } else if (StringUtils.isNotEmpty((CharSequence)database) && namespaceRegex != null) {
            MongoDatabase db = mongoClient.getDatabase(database);
            ArrayList<Bson> pipeline = new ArrayList<Bson>();
            pipeline.add(CollectionDiscoveryUtils.ADD_NS_FIELD);
            Bson nsFilter = Filters.regex("_ns_", namespaceRegex);
            pipeline.add(Aggregates.match(nsFilter));
            LOG.info("Preparing change stream for database {} with namespace regex filter {}", (Object)database, (Object)namespaceRegex);
            changeStream = db.watch(pipeline);
        } else if (StringUtils.isNotEmpty((CharSequence)database)) {
            MongoDatabase db = mongoClient.getDatabase(database);
            LOG.info("Preparing change stream for database {}", (Object)database);
            changeStream = db.watch();
        } else if (namespaceRegex != null) {
            ArrayList<Bson> pipeline = new ArrayList<Bson>();
            pipeline.add(CollectionDiscoveryUtils.ADD_NS_FIELD);
            Bson nsFilter = Filters.regex("_ns_", namespaceRegex);
            if (databaseRegex != null) {
                Bson dbFilter = Filters.regex("ns.db", databaseRegex);
                nsFilter = Filters.and(dbFilter, nsFilter);
                LOG.info("Preparing change stream for deployment with database regex filter {} and namespace regex filter {}", (Object)databaseRegex, (Object)namespaceRegex);
            } else {
                LOG.info("Preparing change stream for deployment with namespace regex filter {}", (Object)namespaceRegex);
            }
            pipeline.add(Aggregates.match(nsFilter));
            changeStream = mongoClient.watch(pipeline);
        } else if (databaseRegex != null) {
            ArrayList<Bson> pipeline = new ArrayList<Bson>();
            pipeline.add(Aggregates.match(Filters.regex("ns.db", databaseRegex)));
            LOG.info("Preparing change stream for deployment  with database regex filter {}", (Object)databaseRegex);
            changeStream = mongoClient.watch(pipeline);
        } else {
            LOG.info("Preparing change stream for deployment");
            changeStream = mongoClient.watch();
        }
        if (batchSize > 0) {
            changeStream.batchSize(batchSize);
        }
        if (fullDocPrePostImage) {
            if (StringUtils.isNotEmpty((CharSequence)database) && StringUtils.isNotEmpty((CharSequence)collection)) {
                changeStream.fullDocument(FullDocument.REQUIRED);
                changeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
            } else {
                changeStream.fullDocument(FullDocument.WHEN_AVAILABLE);
                changeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);
            }
        } else if (updateLookup) {
            changeStream.fullDocument(FullDocument.UPDATE_LOOKUP);
        }
        return changeStream;
    }

    @Nullable
    public static BsonDocument getLatestResumeToken(MongoClient mongoClient, ChangeStreamDescriptor descriptor) {
        ChangeStreamIterable<Document> changeStreamIterable = MongoUtils.getChangeStreamIterable(mongoClient, descriptor, 1, false, false);
        try (MongoCursor changeStreamCursor = changeStreamIterable.cursor();){
            ChangeStreamDocument firstResult = (ChangeStreamDocument)changeStreamCursor.tryNext();
            BsonDocument bsonDocument = firstResult != null ? firstResult.getResumeToken() : changeStreamCursor.getResumeToken();
            return bsonDocument;
        }
    }

    public static boolean isCommandSucceed(BsonDocument commandResult) {
        return commandResult != null && COMMAND_SUCCEED_FLAG.equals(commandResult.getDouble("ok"));
    }

    public static String commandErrorMessage(BsonDocument commandResult) {
        return Optional.ofNullable(commandResult).map(doc -> doc.getString("errmsg")).map(BsonString::getValue).orElse(null);
    }

    public static BsonDocument collStats(MongoClient mongoClient, TableId collectionId) {
        BsonDocument collStatsCommand = new BsonDocument("collStats", new BsonString(collectionId.table()));
        return mongoClient.getDatabase(collectionId.catalog()).runCommand((Bson)collStatsCommand, BsonDocument.class);
    }

    public static BsonDocument splitVector(MongoClient mongoClient, TableId collectionId, BsonDocument keyPattern, int maxChunkSizeMB) {
        return MongoUtils.splitVector(mongoClient, collectionId, keyPattern, maxChunkSizeMB, null, null);
    }

    public static BsonDocument splitVector(MongoClient mongoClient, TableId collectionId, BsonDocument keyPattern, int maxChunkSizeMB, @Nullable BsonDocument min, @Nullable BsonDocument max) {
        BsonDocument splitVectorCommand = new BsonDocument("splitVector", new BsonString(collectionId.identifier())).append("keyPattern", keyPattern).append("maxChunkSize", new BsonInt32(maxChunkSizeMB));
        Optional.ofNullable(min).ifPresent(v -> splitVectorCommand.append("min", (BsonValue)v));
        Optional.ofNullable(max).ifPresent(v -> splitVectorCommand.append("max", (BsonValue)v));
        return mongoClient.getDatabase(collectionId.catalog()).runCommand((Bson)splitVectorCommand, BsonDocument.class);
    }

    public static BsonTimestamp getCurrentClusterTime(MongoClient mongoClient) {
        BsonDocument isMasterResult = MongoUtils.isMaster(mongoClient);
        if (!MongoUtils.isCommandSucceed(isMasterResult)) {
            throw new IllegalStateException("Failed to execute isMaster command: " + MongoUtils.commandErrorMessage(isMasterResult));
        }
        return isMasterResult.getDocument("$clusterTime").getTimestamp("clusterTime");
    }

    public static BsonDocument isMaster(MongoClient mongoClient) {
        BsonDocument isMasterCommand = new BsonDocument("isMaster", new BsonInt32(1));
        return mongoClient.getDatabase("admin").runCommand((Bson)isMasterCommand, BsonDocument.class);
    }

    public static List<BsonDocument> readChunks(MongoClient mongoClient, BsonDocument collectionMetadata) {
        MongoCollection<BsonDocument> chunks = MongoUtils.collectionFor(mongoClient, TableId.parse("config.chunks"), BsonDocument.class);
        ArrayList<BsonDocument> collectionChunks = new ArrayList<BsonDocument>();
        Bson filter = Filters.or(new BsonDocument("ns", collectionMetadata.get("_id")), new BsonDocument("uuid", collectionMetadata.get("uuid")));
        chunks.find(filter).projection(Projections.include("min", "max", "shard")).sort(Sorts.ascending("min")).into(collectionChunks);
        return collectionChunks;
    }

    @Nullable
    public static BsonDocument readCollectionMetadata(MongoClient mongoClient, TableId collectionId) {
        MongoCollection<BsonDocument> collection = MongoUtils.collectionFor(mongoClient, TableId.parse("config.collections"), BsonDocument.class);
        return (BsonDocument)collection.find(Filters.eq("_id", collectionId.identifier())).projection(Projections.include("_id", "uuid", "dropped", "key")).first();
    }

    public static <T> MongoCollection<T> collectionFor(MongoClient mongoClient, TableId collectionId, Class<T> documentClass) {
        return mongoClient.getDatabase(collectionId.catalog()).getCollection(collectionId.table()).withDocumentClass(documentClass);
    }

    public static String getMongoVersion(MongoDBSourceConfig sourceConfig) {
        MongoClient client = MongoClientPool.getInstance().getOrCreateMongoClient(sourceConfig);
        return client.getDatabase("config").runCommand(new BsonDocument("buildinfo", new BsonString(""))).get("version").toString();
    }

    public static MongoClient clientFor(MongoDBSourceConfig sourceConfig) {
        return MongoClientPool.getInstance().getOrCreateMongoClient(sourceConfig);
    }

    public static String buildConnectionString(@Nullable String username, @Nullable String password, String scheme, String hosts, @Nullable String connectionOptions) {
        StringBuilder sb = new StringBuilder(scheme).append("://");
        if (StringUtils.isNotEmpty((CharSequence)username) && StringUtils.isNotEmpty((CharSequence)password)) {
            sb.append(MongoDBEnvelope.encodeValue(username)).append(":").append(MongoDBEnvelope.encodeValue(password)).append("@");
        }
        sb.append((String)Preconditions.checkNotNull((Object)hosts));
        if (StringUtils.isNotEmpty((CharSequence)connectionOptions)) {
            sb.append("/?").append(connectionOptions);
        }
        return sb.toString();
    }

    public static boolean checkIfChangeStreamCursorExpires(MongoCommandException e) {
        return INVALID_CHANGE_STREAM_ERRORS.contains(e.getCode());
    }

    public static boolean checkIfResumeTokenExpires(MongoCommandException e) {
        if (e.getCode() != 280) {
            return false;
        }
        String errorMessage = e.getErrorMessage().toLowerCase(Locale.ROOT);
        return errorMessage.contains(RESUME_TOKEN) && (errorMessage.contains(NOT_FOUND) || errorMessage.contains(DOES_NOT_EXIST) || errorMessage.contains(INVALID_RESUME_TOKEN) || errorMessage.contains(NO_LONGER_IN_THE_OPLOG));
    }
}

