/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.coders.BeamRecordCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.linq4j.Ord;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rel.RelNode;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rel.RelWriter;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rel.core.Aggregate;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rel.core.AggregateCall;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.util.ImmutableBitSet;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.util.Util;
import org.joda.time.Duration;

public class BeamAggregationRel
extends Aggregate
implements BeamRelNode {
    private int windowFieldIdx = -1;
    private WindowFn<BeamRecord, BoundedWindow> windowFn;
    private Trigger trigger;
    private Duration allowedLatence = Duration.ZERO;

    public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls, WindowFn windowFn, Trigger trigger, int windowFieldIdx, Duration allowedLatence) {
        super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
        this.windowFn = windowFn;
        this.trigger = trigger;
        this.windowFieldIdx = windowFieldIdx;
        this.allowedLatence = allowedLatence;
    }

    @Override
    public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) throws Exception {
        RelNode input = this.getInput();
        String stageName = BeamSqlRelUtils.getStageName(this) + "_";
        PCollection upstream = BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
        if (this.windowFieldIdx != -1) {
            upstream = ((PCollection)upstream.apply(stageName + "assignEventTimestamp", (PTransform)WithTimestamps.of((SerializableFunction)new BeamAggregationTransforms.WindowTimestampFn(this.windowFieldIdx)))).setCoder(upstream.getCoder());
        }
        PCollection windowStream = (PCollection)upstream.apply(stageName + "window", (PTransform)Window.into(this.windowFn).triggering(this.trigger).withAllowedLateness(this.allowedLatence).accumulatingFiredPanes());
        BeamRecordCoder keyCoder = this.exKeyFieldsSchema(input.getRowType()).getRecordCoder();
        PCollection exCombineByStream = ((PCollection)windowStream.apply(stageName + "exCombineBy", (PTransform)WithKeys.of((SerializableFunction)new BeamAggregationTransforms.AggregationGroupByKeyFn(this.windowFieldIdx, this.groupSet)))).setCoder((Coder)KvCoder.of((Coder)keyCoder, (Coder)upstream.getCoder()));
        BeamRecordCoder aggCoder = this.exAggFieldsSchema().getRecordCoder();
        PCollection aggregatedStream = ((PCollection)exCombineByStream.apply(stageName + "combineBy", (PTransform)Combine.perKey((CombineFnBase.GlobalCombineFn)new BeamAggregationTransforms.AggregationAdaptor(this.getAggCallList(), CalciteUtils.toBeamRowType(input.getRowType()))))).setCoder((Coder)KvCoder.of((Coder)keyCoder, (Coder)aggCoder));
        PCollection mergedStream = (PCollection)aggregatedStream.apply(stageName + "mergeRecord", (PTransform)ParDo.of((DoFn)new BeamAggregationTransforms.MergeAggregationRecord(CalciteUtils.toBeamRowType(this.getRowType()), this.getAggCallList(), this.windowFieldIdx)));
        mergedStream.setCoder((Coder)CalciteUtils.toBeamRowType(this.getRowType()).getRecordCoder());
        return mergedStream;
    }

    private BeamRecordSqlType exKeyFieldsSchema(RelDataType relDataType) {
        BeamRecordSqlType inputRowType = CalciteUtils.toBeamRowType(relDataType);
        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<Integer> fieldTypes = new ArrayList<Integer>();
        for (int i : this.groupSet.asList()) {
            if (i == this.windowFieldIdx) continue;
            fieldNames.add(inputRowType.getFieldNameByIndex(i));
            fieldTypes.add(inputRowType.getFieldTypeByIndex(i));
        }
        return BeamRecordSqlType.create(fieldNames, fieldTypes);
    }

    private BeamRecordSqlType exAggFieldsSchema() {
        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<Integer> fieldTypes = new ArrayList<Integer>();
        for (AggregateCall ac : this.getAggCallList()) {
            fieldNames.add(ac.name);
            fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
        }
        return BeamRecordSqlType.create(fieldNames, fieldTypes);
    }

    @Override
    public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
        return new BeamAggregationRel(this.getCluster(), traitSet, input, indicator, groupSet, groupSets, aggCalls, this.windowFn, this.trigger, this.windowFieldIdx, this.allowedLatence);
    }

    public void setWindowFn(WindowFn windowFn) {
        this.windowFn = windowFn;
    }

    public void setTrigger(Trigger trigger) {
        this.trigger = trigger;
    }

    @Override
    public RelWriter explainTerms(RelWriter pw) {
        pw.item("group", this.groupSet).itemIf("window", this.windowFn, this.windowFn != null).itemIf("trigger", this.trigger, this.trigger != null).itemIf("event_time", this.windowFieldIdx, this.windowFieldIdx != -1).itemIf("groups", this.groupSets, this.getGroupType() != Aggregate.Group.SIMPLE).itemIf("indicator", this.indicator, this.indicator).itemIf("aggs", this.aggCalls, pw.nest());
        if (!pw.nest()) {
            for (Ord ord : Ord.zip(this.aggCalls)) {
                pw.item(Util.first(((AggregateCall)ord.e).name, "agg#" + ord.i), ord.e);
            }
        }
        return pw;
    }
}

