/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.mongodb;

import com.mongodb.BasicDBObject;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.UpdateResult;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mongodb.AbstractMongoProcessor;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StringUtils;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;

@Tags(value={"mongodb", "insert", "update", "write", "put"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Writes the contents of a FlowFile to MongoDB")
@SystemResourceConsideration(resource=SystemResource.MEMORY)
@WritesAttributes(value={@WritesAttribute(attribute="mongo.put.update.match.count", description="The match count from result if update/upsert is performed, otherwise not set."), @WritesAttribute(attribute="mongo.put.update.modify.count", description="The modify count from result if update/upsert is performed, otherwise not set."), @WritesAttribute(attribute="mongo.put.upsert.id", description="The '_id' hex value if upsert is performed, otherwise not set.")})
public class PutMongo
extends AbstractMongoProcessor {
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();
    static final String ATTRIBUTE_UPDATE_MATCH_COUNT = "mongo.put.update.match.count";
    static final String ATTRIBUTE_UPDATE_MODIFY_COUNT = "mongo.put.update.modify.count";
    static final String ATTRIBUTE_UPSERT_ID = "mongo.put.upsert.id";
    static final String MODE_INSERT = "insert";
    static final String MODE_UPDATE = "update";
    static final AllowableValue UPDATE_WITH_DOC = new AllowableValue("doc", "With whole document");
    static final AllowableValue UPDATE_WITH_OPERATORS = new AllowableValue("operators", "With operators enabled");
    static final PropertyDescriptor MODE = new PropertyDescriptor.Builder().name("Mode").description("Indicates whether the processor should insert or update content").required(true).allowableValues(new String[]{"insert", "update"}).defaultValue("insert").build();
    static final PropertyDescriptor UPSERT = new PropertyDescriptor.Builder().name("Upsert").description("When true, inserts a document if no document matches the update query criteria; this property is valid only when using update mode, otherwise it is ignored").required(true).dependsOn(MODE, "update", new String[0]).allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).defaultValue("false").build();
    static final PropertyDescriptor UPDATE_QUERY_KEY = new PropertyDescriptor.Builder().name("Update Query Key").description("One or more comma-separated document key names used to build the update query criteria, such as _id").required(false).dependsOn(MODE, "update", new String[0]).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor UPDATE_QUERY = new PropertyDescriptor.Builder().name("putmongo-update-query").displayName("Update Query").description("Specify a full MongoDB query to be used for the lookup query to do an update/upsert. NOTE: this field is ignored if the '%s' value is not empty.".formatted(UPDATE_QUERY_KEY.getDisplayName())).required(false).dependsOn(MODE, "update", new String[0]).addValidator((Validator)JsonValidator.INSTANCE).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor UPDATE_OPERATION_MODE = new PropertyDescriptor.Builder().displayName("Update Mode").name("put-mongo-update-mode").required(true).dependsOn(MODE, "update", new String[0]).allowableValues(new DescribedValue[]{UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS}).defaultValue((DescribedValue)UPDATE_WITH_DOC).description("Choose an update mode. You can either supply a JSON document to use as a direct replacement or specify a document that contains update operators like $set, $unset, and $inc. When Operators mode is enabled, the flowfile content is expected to be the operator part for example: {$set:{\"key\": \"value\"},$inc:{\"count\":1234}} and the update query will come from the configured Update Query property.").build();
    static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder().name("Update Method").dependsOn(UPDATE_OPERATION_MODE, new AllowableValue[]{UPDATE_WITH_OPERATORS}).description("MongoDB method for running collection update operations, such as updateOne or updateMany").allowableValues(AbstractMongoProcessor.UpdateMethod.class).defaultValue((DescribedValue)AbstractMongoProcessor.UpdateMethod.UPDATE_ONE).build();
    static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder().name("Character Set").description("The Character Set in which the data is encoded").required(true).addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).defaultValue("UTF-8").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Stream.concat(PutMongo.getCommonPropertyDescriptors().stream(), Stream.of(MODE, UPSERT, UPDATE_QUERY_KEY, UPDATE_QUERY, UPDATE_OPERATION_MODE, UPDATE_METHOD, CHARACTER_SET)).toList();

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> problems = new ArrayList<ValidationResult>();
        if (validationContext.getProperty(MODE).getValue().equals(MODE_INSERT)) {
            return problems;
        }
        boolean queryKey = validationContext.getProperty(UPDATE_QUERY_KEY).isSet();
        boolean query = validationContext.getProperty(UPDATE_QUERY).isSet();
        if (queryKey && query) {
            problems.add(new ValidationResult.Builder().valid(false).explanation("Both update query key and update query cannot be set at the same time.").build());
        } else if (!queryKey && !query) {
            problems.add(new ValidationResult.Builder().valid(false).explanation("Either the update query key or the update query field must be set.").build());
        }
        return problems;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        ComponentLog logger = this.getLogger();
        Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
        String processorMode = context.getProperty(MODE).getValue();
        String updateOperationMode = processorMode.equals(MODE_UPDATE) ? context.getProperty(UPDATE_OPERATION_MODE).getValue() : null;
        WriteConcern writeConcern = this.clientService.getWriteConcern();
        try {
            Document doc;
            MongoCollection collection = this.getCollection(context, flowFile).withWriteConcern(writeConcern);
            byte[] content = new byte[(int)flowFile.getSize()];
            session.read(flowFile, in -> StreamUtils.fillBuffer((InputStream)in, (byte[])content, (boolean)true));
            Object object = doc = processorMode.equals(MODE_INSERT) || Objects.equals(updateOperationMode, UPDATE_WITH_DOC.getValue()) ? Document.parse((String)new String(content, charset)) : BasicDBObject.parse((String)new String(content, charset));
            if (MODE_INSERT.equals(processorMode)) {
                collection.insertOne((Object)doc);
                logger.info("inserted {} into MongoDB", new Object[]{flowFile});
            } else {
                UpdateResult updateResult;
                Document updateQuery;
                boolean upsert = context.getProperty(UPSERT).asBoolean();
                String updateKey = context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue();
                String filterQuery = context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
                if (StringUtils.isNotBlank((String)updateKey)) {
                    updateQuery = this.parseUpdateKey(updateKey, (Map)doc);
                    this.removeUpdateKeys(updateKey, (Map)doc);
                } else {
                    updateQuery = Document.parse((String)filterQuery);
                }
                if (Objects.equals(updateOperationMode, UPDATE_WITH_DOC.getValue())) {
                    updateResult = collection.replaceOne((Bson)updateQuery, (Object)doc, new ReplaceOptions().upsert(upsert));
                } else {
                    BasicDBObject update = (BasicDBObject)doc;
                    update.remove((Object)updateKey);
                    UpdateOptions updateOptions = new UpdateOptions().upsert(upsert);
                    AbstractMongoProcessor.UpdateMethod updateQueryMode = (AbstractMongoProcessor.UpdateMethod)context.getProperty(UPDATE_METHOD).asAllowableValue(AbstractMongoProcessor.UpdateMethod.class);
                    if (this.updateModeMatches(AbstractMongoProcessor.UpdateMethod.UPDATE_ONE, updateQueryMode, flowFile)) {
                        updateResult = collection.updateOne((Bson)updateQuery, (Bson)update, updateOptions);
                    } else if (this.updateModeMatches(AbstractMongoProcessor.UpdateMethod.UPDATE_MANY, updateQueryMode, flowFile)) {
                        updateResult = collection.updateMany((Bson)updateQuery, (Bson)update, updateOptions);
                    } else {
                        String flowfileUpdateMode = flowFile.getAttribute("mongodb.update.mode");
                        throw new ProcessException("Unrecognized 'mongodb.update.mode' value '" + flowfileUpdateMode + "'");
                    }
                }
                flowFile = session.putAttribute(flowFile, ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount()));
                flowFile = session.putAttribute(flowFile, ATTRIBUTE_UPDATE_MODIFY_COUNT, String.valueOf(updateResult.getModifiedCount()));
                BsonValue upsertedId = updateResult.getUpsertedId();
                if (upsertedId != null) {
                    String id = upsertedId.isString() ? upsertedId.asString().getValue() : upsertedId.asObjectId().getValue().toString();
                    flowFile = session.putAttribute(flowFile, ATTRIBUTE_UPSERT_ID, id);
                }
                logger.info("updated {} into MongoDB", new Object[]{flowFile});
            }
            session.getProvenanceReporter().send(flowFile, this.getURI(context));
            session.transfer(flowFile, REL_SUCCESS);
        }
        catch (Exception e) {
            logger.error("Failed to insert {} into MongoDB due to {}", new Object[]{flowFile, e, e});
            session.transfer(flowFile, REL_FAILURE);
            context.yield();
        }
    }

    private void removeUpdateKeys(String updateKeyParam, Map doc) {
        String[] parts;
        for (String part : parts = updateKeyParam.split(",[\\s]*")) {
            if (!part.contains(".")) continue;
            doc.remove(part);
        }
    }

    private Document parseUpdateKey(String updateKey, Map doc) {
        Document retVal;
        if (updateKey.equals("_id")) {
            retVal = doc.get("_id") instanceof ObjectId ? new Document("_id", doc.get("_id")) : (ObjectId.isValid((String)((String)doc.get("_id"))) ? new Document("_id", (Object)new ObjectId((String)doc.get("_id"))) : new Document("_id", doc.get("_id")));
        } else if (updateKey.contains(",")) {
            String[] parts = updateKey.split(",[\\s]*");
            retVal = new Document();
            for (String part : parts) {
                retVal.append(part, doc.get(part));
            }
        } else {
            retVal = new Document(updateKey, doc.get(updateKey));
        }
        return retVal;
    }
}

