/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPCall;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPFieldRef;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPKind;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPLiteral;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPMeasure;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPOperation;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPPattern;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPUtils;
import org.apache.beam.sdk.extensions.sql.impl.cep.OrderKey;
import org.apache.beam.sdk.extensions.sql.impl.nfa.NFA;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSortRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelCollation;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Match;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlKind;

public class BeamMatchRel
extends Match
implements BeamRelNode {
    public BeamMatchRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RelDataType rowType, RexNode pattern, boolean strictStart, boolean strictEnd, Map<String, RexNode> patternDefinitions, Map<String, RexNode> measures, RexNode after, Map<String, ? extends SortedSet<String>> subsets, boolean allRows, List<RexNode> partitionKeys, RelCollation orderKeys, RexNode interval) {
        super(cluster, traitSet, input, rowType, pattern, strictStart, strictEnd, patternDefinitions, measures, after, subsets, allRows, partitionKeys, orderKeys, interval);
    }

    @Override
    public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
        return BeamCostModel.FACTORY.makeTinyCost();
    }

    @Override
    public NodeStats estimateNodeStats(RelMetadataQuery mq) {
        NodeStats inputEstimate = BeamSqlRelUtils.getNodeStats(this.input, mq);
        double numRows = inputEstimate.getRowCount();
        double winSize = inputEstimate.getWindow();
        double rate = inputEstimate.getRate();
        return NodeStats.create(numRows, rate, winSize).multiply(0.5);
    }

    @Override
    public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
        return new MatchTransform(this.partitionKeys, this.orderKeys, (Map<String, RexNode>)this.measures, this.allRows, this.pattern, (Map<String, RexNode>)this.patternDefinitions);
    }

    public Match copy(RelNode input, RelDataType rowType, RexNode pattern, boolean strictStart, boolean strictEnd, Map<String, RexNode> patternDefinitions, Map<String, RexNode> measures, RexNode after, Map<String, ? extends SortedSet<String>> subsets, boolean allRows, List<RexNode> partitionKeys, RelCollation orderKeys, RexNode interval) {
        return new BeamMatchRel(this.getCluster(), this.getTraitSet(), input, rowType, pattern, strictStart, strictEnd, patternDefinitions, measures, after, subsets, allRows, partitionKeys, orderKeys, interval);
    }

    private static class MapKeys
    extends DoFn<Row, KV<Row, Row>> {
        private final Schema partitionKeySchema;

        public MapKeys(Schema partitionKeySchema) {
            this.partitionKeySchema = partitionKeySchema;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element Row eleRow, DoFn.OutputReceiver<KV<Row, Row>> out) {
            Row.Builder newRowBuilder = Row.withSchema((Schema)this.partitionKeySchema);
            for (Schema.Field i : this.partitionKeySchema.getFields()) {
                String fieldName = i.getName();
                newRowBuilder.addValue(eleRow.getValue(fieldName));
            }
            KV kvPair = KV.of((Object)newRowBuilder.build(), (Object)eleRow);
            out.output((Object)kvPair);
        }
    }

    private static class SortPerKey
    extends DoFn<KV<Row, Iterable<Row>>, KV<Row, Iterable<Row>>> {
        private final ArrayList<OrderKey> orderKeys;

        public SortPerKey(ArrayList<OrderKey> orderKeys) {
            this.orderKeys = orderKeys;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<Row, Iterable<Row>> keyRows, DoFn.OutputReceiver<KV<Row, Iterable<Row>>> out) {
            ArrayList<Row> rows = new ArrayList<Row>();
            for (Row i : (Iterable)keyRows.getValue()) {
                rows.add(i);
            }
            ArrayList<Integer> fIndexList = new ArrayList<Integer>();
            ArrayList<Boolean> dirList = new ArrayList<Boolean>();
            ArrayList<Boolean> nullDirList = new ArrayList<Boolean>();
            for (int i = this.orderKeys.size() - 1; i >= 0; --i) {
                OrderKey thisKey = this.orderKeys.get(i);
                fIndexList.add(thisKey.getIndex());
                dirList.add(thisKey.getDir());
                nullDirList.add(thisKey.getNullFirst());
            }
            rows.sort(new BeamSortRel.BeamSqlRowComparator(fIndexList, dirList, nullDirList));
            out.output((Object)KV.of((Object)((Row)keyRows.getKey()), rows));
        }
    }

    private static class MatchPattern
    extends DoFn<KV<Row, Iterable<Row>>, Row> {
        private final Schema upstreamSchema;
        private final Schema outSchema;
        private final List<CEPFieldRef> parKeys;
        private final ArrayList<CEPPattern> pattern;
        private final List<CEPMeasure> measures;
        private final boolean allRows;

        MatchPattern(Schema upstreamSchema, List<CEPFieldRef> parKeys, ArrayList<CEPPattern> pattern, List<CEPMeasure> measures, boolean allRows, Schema outSchema) {
            this.upstreamSchema = upstreamSchema;
            this.parKeys = parKeys;
            this.pattern = pattern;
            this.measures = measures;
            this.allRows = allRows;
            this.outSchema = outSchema;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<Row, Iterable<Row>> keyRows, DoFn.OutputReceiver<Row> out) {
            NFA partNFA = NFA.compile(this.pattern, this.upstreamSchema);
            Iterable partRows = (Iterable)keyRows.getValue();
            for (Row singleRow : partRows) {
                Map<String, ArrayList<Row>> result = partNFA.processNewRow(singleRow);
                if (result == null) continue;
                if (this.allRows) {
                    for (ArrayList<Row> i : result.values()) {
                        for (Row row : i) {
                            out.output((Object)row);
                        }
                    }
                    continue;
                }
                Row.Builder newRowBuilder = Row.withSchema((Schema)this.outSchema);
                Row.FieldValueBuilder newFieldBuilder = null;
                for (CEPFieldRef cEPFieldRef : this.parKeys) {
                    int colIndex = cEPFieldRef.getIndex();
                    Schema.Field parSchema = this.upstreamSchema.getField(colIndex);
                    String fieldName = parSchema.getName();
                    if (result.isEmpty()) break;
                    Row parKeyRow = (Row)keyRows.getKey();
                    if (newFieldBuilder == null) {
                        newFieldBuilder = newRowBuilder.withFieldValue(fieldName, parKeyRow.getValue(fieldName));
                        continue;
                    }
                    newFieldBuilder = newFieldBuilder.withFieldValue(fieldName, parKeyRow.getValue(fieldName));
                }
                for (CEPMeasure cEPMeasure : this.measures) {
                    String outName = cEPMeasure.getName();
                    CEPFieldRef patternRef = cEPMeasure.getField();
                    String patternVar = patternRef.getAlpha();
                    List patternRows = result.get(patternVar);
                    CEPOperation opr = cEPMeasure.getOperation();
                    if (opr.getClass() == CEPCall.class) {
                        CEPCall call = (CEPCall)opr;
                        CEPKind funcName = call.getOperator().getCepKind();
                        switch (funcName) {
                            case FIRST: {
                                CEPFieldRef colFirstField = (CEPFieldRef)call.getOperands().get(0);
                                CEPLiteral colFirstIndex = (CEPLiteral)call.getOperands().get(1);
                                Row rowFirstToProc = (Row)patternRows.get(colFirstIndex.getDecimal().intValue());
                                if (newFieldBuilder == null) {
                                    newFieldBuilder = newRowBuilder.withFieldValue(outName, rowFirstToProc.getValue(colFirstField.getIndex()));
                                    break;
                                }
                                newFieldBuilder = newFieldBuilder.withFieldValue(outName, rowFirstToProc.getValue(colFirstField.getIndex()));
                                break;
                            }
                            case LAST: {
                                CEPFieldRef colLastField = (CEPFieldRef)call.getOperands().get(0);
                                CEPLiteral colLastIndex = (CEPLiteral)call.getOperands().get(1);
                                Row rowLastToProc = (Row)patternRows.get(patternRows.size() - 1 - colLastIndex.getDecimal().intValue());
                                if (newFieldBuilder == null) {
                                    newFieldBuilder = newRowBuilder.withFieldValue(outName, rowLastToProc.getValue(colLastField.getIndex()));
                                    break;
                                }
                                newFieldBuilder = newFieldBuilder.withFieldValue(outName, rowLastToProc.getValue(colLastField.getIndex()));
                                break;
                            }
                            default: {
                                throw new UnsupportedOperationException("The measure function is not recognized: " + funcName.name());
                            }
                        }
                        continue;
                    }
                    if (opr.getClass() == CEPFieldRef.class) {
                        Row rowToProc = (Row)patternRows.get(0);
                        CEPFieldRef fieldRef = (CEPFieldRef)opr;
                        if (newFieldBuilder == null) {
                            newFieldBuilder = newRowBuilder.withFieldValue(outName, rowToProc.getValue(fieldRef.getIndex()));
                            continue;
                        }
                        newFieldBuilder = newFieldBuilder.withFieldValue(outName, rowToProc.getValue(fieldRef.getIndex()));
                        continue;
                    }
                    throw new UnsupportedOperationException("CEP operation is not recognized: " + opr.getClass().getName());
                }
                Row newRow = newFieldBuilder == null ? newRowBuilder.build() : newFieldBuilder.build();
                out.output((Object)newRow);
            }
        }
    }

    private class MatchTransform
    extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private final List<RexNode> parKeys;
        private final RelCollation orderKeys;
        private final Map<String, RexNode> measures;
        private final boolean allRows;
        private final RexNode pattern;
        private final Map<String, RexNode> patternDefs;

        public MatchTransform(List<RexNode> parKeys, RelCollation orderKeys, Map<String, RexNode> measures, boolean allRows, RexNode pattern, Map<String, RexNode> patternDefs) {
            this.parKeys = parKeys;
            this.orderKeys = orderKeys;
            this.measures = measures;
            this.allRows = allRows;
            this.pattern = pattern;
            this.patternDefs = patternDefs;
        }

        public PCollection<Row> expand(PCollectionList<Row> pinput) {
            Preconditions.checkArgument((pinput.size() == 1 ? 1 : 0) != 0, (String)"Wrong number of inputs for %s: %s", (Object[])new Object[]{BeamMatchRel.class.getSimpleName(), pinput});
            PCollection upstream = pinput.get(0);
            Schema upstreamSchema = upstream.getSchema();
            Schema outSchema = CalciteUtils.toSchema(BeamMatchRel.this.getRowType());
            Schema.Builder schemaBuilder = new Schema.Builder();
            for (RexNode i : this.parKeys) {
                RexInputRef varNode = (RexInputRef)i;
                int index = varNode.getIndex();
                schemaBuilder.addField(upstreamSchema.getField(index));
            }
            Schema partitionKeySchema = schemaBuilder.build();
            PCollection keyedUpstream = (PCollection)upstream.apply((PTransform)ParDo.of((DoFn)new MapKeys(partitionKeySchema)));
            PCollection groupedUpstream = (PCollection)keyedUpstream.setCoder((Coder)KvCoder.of((Coder)RowCoder.of((Schema)partitionKeySchema), (Coder)RowCoder.of((Schema)upstreamSchema))).apply((PTransform)GroupByKey.create());
            ArrayList<OrderKey> orderKeyList = CEPUtils.makeOrderKeysFromCollation(this.orderKeys);
            PCollection orderedUpstream = (PCollection)groupedUpstream.apply((PTransform)ParDo.of((DoFn)new SortPerKey(orderKeyList)));
            ArrayList<CEPPattern> cepPattern = CEPUtils.getCEPPatternFromPattern(upstreamSchema, this.pattern, this.patternDefs);
            ArrayList<CEPMeasure> cepMeasures = new ArrayList<CEPMeasure>();
            for (Map.Entry<String, RexNode> i : this.measures.entrySet()) {
                CEPOperation measureOperation;
                RexCall rexCall;
                String outTableName = i.getKey();
                if (i.getValue().getClass() == RexCall.class && (rexCall = (RexCall)i.getValue()).getOperator().getKind() == SqlKind.FINAL) {
                    measureOperation = CEPOperation.of((RexNode)rexCall.getOperands().get(0));
                    cepMeasures.add(new CEPMeasure(upstreamSchema, outTableName, measureOperation));
                    continue;
                }
                measureOperation = CEPOperation.of(i.getValue());
                cepMeasures.add(new CEPMeasure(upstreamSchema, outTableName, measureOperation));
            }
            List<CEPFieldRef> cepParKeys = CEPUtils.getCEPFieldRefFromParKeys(this.parKeys);
            PCollection outStream = ((PCollection)orderedUpstream.apply((PTransform)ParDo.of((DoFn)new MatchPattern(upstreamSchema, cepParKeys, cepPattern, cepMeasures, this.allRows, outSchema)))).setRowSchema(outSchema);
            return outStream;
        }
    }
}

