/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb;

import com.mongodb.client.model.Filters;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.IntFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException;
import org.apache.beam.sdk.io.mongodb.FindQuery;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.FieldTypeDescriptors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.SelectHelpers;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.ToJson;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbTable
extends SchemaBaseBeamTable
implements Serializable {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(MongoDbTable.class);
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized Pattern locationPattern = Pattern.compile("(?<credsHostPort>mongodb://(?<usernamePassword>.*(?<password>:.*)?@)?.+:\\d+)/(?<database>.+)/(?<collection>.+)");
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized String dbCollection;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized String dbName;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized String dbUri;

    MongoDbTable(@UnknownKeyFor @NonNull @Initialized Table table) {
        super(table.getSchema());
        String location = table.getLocation();
        Matcher matcher = this.locationPattern.matcher(location);
        if (!matcher.matches()) {
            throw new InvalidTableException("MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection' but was: " + location);
        }
        this.dbUri = matcher.group("credsHostPort");
        this.dbName = matcher.group("database");
        this.dbCollection = matcher.group("collection");
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row> buildIOReader(@UnknownKeyFor @NonNull @Initialized PBegin begin) {
        PCollection readDocuments = MongoDbIO.read().withUri(this.dbUri).withDatabase(this.dbName).withCollection(this.dbCollection).expand(begin);
        return (PCollection)readDocuments.apply((PTransform)DocumentToRow.withSchema(this.getSchema()));
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row> buildIOReader(@UnknownKeyFor @NonNull @Initialized PBegin begin, @UnknownKeyFor @NonNull @Initialized BeamSqlTableFilter filters, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> fieldNames) {
        MongoDbFilter mongoFilter;
        MongoDbIO.Read readInstance = MongoDbIO.read().withUri(this.dbUri).withDatabase(this.dbName).withCollection(this.dbCollection);
        FieldAccessDescriptor resolved = FieldAccessDescriptor.withFieldNames(fieldNames).resolve(this.getSchema());
        Schema newSchema = SelectHelpers.getOutputSchema((Schema)this.getSchema(), (FieldAccessDescriptor)resolved);
        FindQuery findQuery = FindQuery.create();
        if (!(filters instanceof DefaultTableFilter) && !(mongoFilter = (MongoDbFilter)filters).getSupported().isEmpty()) {
            Bson filter = this.constructPredicate(mongoFilter.getSupported());
            LOG.info("Pushing down the following filter: {}", (Object)filter);
            findQuery = findQuery.withFilters(filter);
        }
        if (!fieldNames.isEmpty()) {
            findQuery = findQuery.withProjection(fieldNames);
        }
        readInstance = readInstance.withQueryFn((SerializableFunction)findQuery);
        return (PCollection)readInstance.expand(begin).apply((PTransform)DocumentToRow.withSchema(newSchema));
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized POutput buildIOWriter(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row> input) {
        return ((PCollection)input.apply((PTransform)new RowToDocument())).apply((PTransform)MongoDbIO.write().withUri(this.dbUri).withDatabase(this.dbName).withCollection(this.dbCollection));
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized ProjectSupport supportsProjects() {
        return ProjectSupport.WITH_FIELD_REORDERING;
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized BeamSqlTableFilter constructFilter(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized RexNode> filter) {
        return MongoDbFilter.create(filter);
    }

    private @UnknownKeyFor @NonNull @Initialized Bson constructPredicate(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized RexNode> supported) {
        assert (!supported.isEmpty());
        List cnf = supported.stream().map(this::translateRexNodeToBson).collect(Collectors.toList());
        if (cnf.size() == 1) {
            return (Bson)cnf.get(0);
        }
        return Filters.and(cnf);
    }

    /*
     * Enabled aggressive block sorting
     */
    private @UnknownKeyFor @NonNull @Initialized Bson translateRexNodeToBson(@UnknownKeyFor @NonNull @Initialized RexNode node) {
        IntFunction<String> fieldIdToName = i -> this.getSchema().getField(i).getName();
        if (!(node instanceof RexCall)) {
            if (!(node instanceof RexInputRef)) throw new RuntimeException("Was expecting a RexCall or a boolean RexInputRef, but received: " + node.getClass().getSimpleName());
            if (!node.getType().getSqlTypeName().equals((Object)SqlTypeName.BOOLEAN)) throw new RuntimeException("Was expecting a RexCall or a boolean RexInputRef, but received: " + node.getClass().getSimpleName());
            return Filters.eq((String)fieldIdToName.apply(((RexInputRef)node).getIndex()), (Object)true);
        }
        RexCall compositeNode = (RexCall)node;
        ArrayList<RexLiteral> literals = new ArrayList<RexLiteral>();
        ArrayList<RexInputRef> inputRefs = new ArrayList<RexInputRef>();
        for (RexNode operand : compositeNode.getOperands()) {
            if (operand instanceof RexLiteral) {
                literals.add((RexLiteral)operand);
                continue;
            }
            if (!(operand instanceof RexInputRef)) continue;
            inputRefs.add((RexInputRef)operand);
        }
        if (inputRefs.size() == 1) {
            RexInputRef inputRef = (RexInputRef)inputRefs.get(0);
            String inputFieldName = fieldIdToName.apply(inputRef.getIndex());
            if (literals.size() <= 0) {
                if (!node.getKind().equals((Object)SqlKind.NOT)) throw new RuntimeException("Cannot create a filter for an unsupported node: " + node.toString());
                return Filters.not((Bson)this.translateRexNodeToBson((RexNode)inputRef));
            }
            Object literal = this.convertToExpectedType(inputRef, (RexLiteral)literals.get(0));
            switch (node.getKind()) {
                case IN: {
                    return Filters.in((String)inputFieldName, (Object[])new Object[]{this.convertToExpectedType(inputRef, literals)});
                }
                case EQUALS: {
                    return Filters.eq((String)inputFieldName, (Object)literal);
                }
                case NOT_EQUALS: {
                    return Filters.not((Bson)Filters.eq((String)inputFieldName, (Object)literal));
                }
                case LESS_THAN: {
                    return Filters.lt((String)inputFieldName, (Object)literal);
                }
                case GREATER_THAN: {
                    return Filters.gt((String)inputFieldName, (Object)literal);
                }
                case GREATER_THAN_OR_EQUAL: {
                    return Filters.gte((String)inputFieldName, (Object)literal);
                }
                case LESS_THAN_OR_EQUAL: {
                    return Filters.lte((String)inputFieldName, (Object)literal);
                }
            }
            throw new RuntimeException("Encountered an unexpected node kind: " + node.getKind().toString());
        }
        switch (node.getKind()) {
            case AND: {
                return Filters.and((Iterable)compositeNode.getOperands().stream().map(this::translateRexNodeToBson).collect(Collectors.toList()));
            }
            case OR: {
                return Filters.or((Iterable)compositeNode.getOperands().stream().map(this::translateRexNodeToBson).collect(Collectors.toList()));
            }
        }
        throw new RuntimeException("Encountered an unexpected node kind: " + node.getKind().toString());
    }

    private @UnknownKeyFor @NonNull @Initialized Object convertToExpectedType(@UnknownKeyFor @NonNull @Initialized RexInputRef inputRef, @UnknownKeyFor @NonNull @Initialized RexLiteral literal) {
        Schema.FieldType beamFieldType = this.getSchema().getField(inputRef.getIndex()).getType();
        return literal.getValueAs(FieldTypeDescriptors.javaTypeForFieldType((Schema.FieldType)beamFieldType).getRawType());
    }

    private @UnknownKeyFor @NonNull @Initialized Object convertToExpectedType(@UnknownKeyFor @NonNull @Initialized RexInputRef inputRef, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized RexLiteral> literals) {
        return literals.stream().map(l -> this.convertToExpectedType(inputRef, (RexLiteral)l)).collect(Collectors.toList());
    }

    @Override
    public // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized PCollection.IsBounded isBounded() {
        return PCollection.IsBounded.BOUNDED;
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized BeamTableStatistics getTableStatistics(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        long count = MongoDbIO.read().withUri(this.dbUri).withDatabase(this.dbName).withCollection(this.dbCollection).getDocumentCount();
        if (count < 0L) {
            return BeamTableStatistics.BOUNDED_UNKNOWN;
        }
        return BeamTableStatistics.createBoundedTableStatistics(Double.valueOf(count));
    }

    static class MongoDbFilter
    implements BeamSqlTableFilter {
        private @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized RexNode> supported;
        private @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized RexNode> unsupported;

        public MongoDbFilter(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized RexNode> supported, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized RexNode> unsupported) {
            this.supported = supported;
            this.unsupported = unsupported;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized RexNode> getNotSupported() {
            return this.unsupported;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized int numSupported() {
            return BeamSqlTableFilter.expressionsInFilter(this.supported);
        }

        public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized RexNode> getSupported() {
            return this.supported;
        }

        @SideEffectFree
        public @UnknownKeyFor @NonNull @Initialized String toString() {
            String supStr = "supported{" + this.supported.stream().map(RexNode::toString).collect(Collectors.joining()) + "}";
            String unsupStr = "unsupported{" + this.unsupported.stream().map(RexNode::toString).collect(Collectors.joining()) + "}";
            return "[" + supStr + ", " + unsupStr + "]";
        }

        public static @UnknownKeyFor @NonNull @Initialized MongoDbFilter create(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized RexNode> predicateCNF) {
            ImmutableList.Builder supported = ImmutableList.builder();
            ImmutableList.Builder unsupported = ImmutableList.builder();
            for (RexNode node : predicateCNF) {
                if (!node.getType().getSqlTypeName().equals((Object)SqlTypeName.BOOLEAN)) {
                    throw new RuntimeException("Predicate node '" + node.getClass().getSimpleName() + "' should be a boolean expression, but was: " + node.getType().getSqlTypeName());
                }
                if (MongoDbFilter.isSupported(node)) {
                    supported.add((Object)node);
                    continue;
                }
                unsupported.add((Object)node);
            }
            return new MongoDbFilter((List<RexNode>)supported.build(), (List<RexNode>)unsupported.build());
        }

        private static @UnknownKeyFor @NonNull @Initialized boolean isSupported(@UnknownKeyFor @NonNull @Initialized RexNode node) {
            if (node instanceof RexCall) {
                RexCall compositeNode = (RexCall)node;
                if (node.getKind().belongsTo((Collection)SqlKind.COMPARISON) || node.getKind().equals((Object)SqlKind.NOT)) {
                    int fields = 0;
                    for (RexNode operand : compositeNode.getOperands()) {
                        if (operand instanceof RexInputRef) {
                            ++fields;
                            continue;
                        }
                        if (operand instanceof RexLiteral) continue;
                        return false;
                    }
                    if (fields == 1) {
                        return true;
                    }
                } else if (node.getKind().equals((Object)SqlKind.AND) || node.getKind().equals((Object)SqlKind.OR)) {
                    for (RexNode operand : compositeNode.getOperands()) {
                        if (MongoDbFilter.isSupported(operand)) continue;
                        return false;
                    }
                    return true;
                }
            } else {
                if (node instanceof RexInputRef) {
                    return true;
                }
                throw new RuntimeException("Encountered an unexpected node type: " + node.getClass().getSimpleName());
            }
            return false;
        }
    }

    public static class RowToDocument
    extends PTransform<PCollection<Row>, PCollection<Document>> {
        private RowToDocument() {
        }

        public static @UnknownKeyFor @NonNull @Initialized RowToDocument convert() {
            return new RowToDocument();
        }

        public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Document> expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row> input) {
            return (PCollection)((PCollection)input.apply("Transform Rows to JSON", (PTransform)ToJson.of())).apply("Produce documents from JSON", (PTransform)MapElements.via((SimpleFunction)new ObjectToDocumentFn()));
        }

        @VisibleForTesting
        static class ObjectToDocumentFn
        extends SimpleFunction<String, Document> {
            ObjectToDocumentFn() {
            }

            public @UnknownKeyFor @NonNull @Initialized Document apply(@UnknownKeyFor @NonNull @Initialized String input) {
                return Document.parse((String)input);
            }
        }
    }

    public static class DocumentToRow
    extends PTransform<PCollection<Document>, PCollection<Row>> {
        private final @UnknownKeyFor @NonNull @Initialized Schema schema;

        private DocumentToRow(@UnknownKeyFor @NonNull @Initialized Schema schema) {
            this.schema = schema;
        }

        public static @UnknownKeyFor @NonNull @Initialized DocumentToRow withSchema(@UnknownKeyFor @NonNull @Initialized Schema schema) {
            return new DocumentToRow(schema);
        }

        public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row> expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Document> input) {
            return ((PCollection)((PCollection)input.apply("Convert Document to JSON", (PTransform)ParDo.of((DoFn)new DocumentToJsonStringConverter()))).apply("Transform JSON to Row", JsonToRow.withSchema((Schema)this.schema))).setRowSchema(this.schema);
        }

        @VisibleForTesting
        static class DocumentToJsonStringConverter
        extends DoFn<Document, String> {
            DocumentToJsonStringConverter() {
            }

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context) {
                context.output((Object)((Document)context.element()).toJson(JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build()));
            }
        }
    }
}

