/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.List;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.RelNode;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.RelWriter;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.Aggregate;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.AggregateCall;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.util.ImmutableBitSet;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.util.Pair;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;

public class BeamAggregationRel
extends Aggregate
implements BeamRelNode {
    private WindowFn<Row, IntervalWindow> windowFn;
    private final int windowFieldIndex;

    public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls, WindowFn<Row, IntervalWindow> windowFn, int windowFieldIndex) {
        super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
        this.windowFn = windowFn;
        this.windowFieldIndex = windowFieldIndex;
    }

    @Override
    public RelWriter explainTerms(RelWriter pw) {
        super.explainTerms(pw);
        if (this.windowFn != null) {
            WindowFn<Row, IntervalWindow> windowFn = this.windowFn;
            String window = windowFn.getClass().getSimpleName() + "($" + String.valueOf(this.windowFieldIndex);
            if (windowFn instanceof FixedWindows) {
                FixedWindows fn = (FixedWindows)windowFn;
                window = window + ", " + fn.getSize().toString() + ", " + fn.getOffset().toString();
            } else if (windowFn instanceof SlidingWindows) {
                SlidingWindows fn = (SlidingWindows)windowFn;
                window = window + ", " + fn.getPeriod().toString() + ", " + fn.getSize().toString() + ", " + fn.getOffset().toString();
            } else if (windowFn instanceof Sessions) {
                Sessions fn = (Sessions)windowFn;
                window = window + ", " + fn.getGapDuration().toString();
            } else {
                throw new RuntimeException("Unknown window function " + windowFn.getClass().getSimpleName());
            }
            window = window + ")";
            pw.item("window", window);
        }
        return pw;
    }

    @Override
    public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
        return new Transform();
    }

    @Override
    public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
        return new BeamAggregationRel(this.getCluster(), traitSet, input, indicator, groupSet, groupSets, aggCalls, this.windowFn, this.windowFieldIndex);
    }

    private class Transform
    extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private Transform() {
        }

        public PCollection<Row> expand(PCollectionList<Row> pinput) {
            PCollection upstream;
            Preconditions.checkArgument(pinput.size() == 1, "Wrong number of inputs for %s: %s", (Object)BeamAggregationRel.class.getSimpleName(), pinput);
            PCollection windowedStream = upstream = pinput.get(0);
            if (BeamAggregationRel.this.windowFn != null) {
                upstream = ((PCollection)upstream.apply("assignEventTimestamp", (PTransform)WithTimestamps.of((SerializableFunction)new BeamAggregationTransforms.WindowTimestampFn(BeamAggregationRel.this.windowFieldIndex)).withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))).setCoder(upstream.getCoder());
                windowedStream = (PCollection)upstream.apply((PTransform)Window.into((WindowFn)BeamAggregationRel.this.windowFn));
            }
            this.validateWindowIsSupported((PCollection<Row>)windowedStream);
            Schema keySchema = this.exKeyFieldsSchema(BeamAggregationRel.this.input.getRowType());
            SchemaCoder keyCoder = SchemaCoder.of((Schema)keySchema);
            PCollection exCombineByStream = ((PCollection)windowedStream.apply("exCombineBy", (PTransform)WithKeys.of((SerializableFunction)new BeamAggregationTransforms.AggregationGroupByKeyFn(keySchema, BeamAggregationRel.this.windowFieldIndex, BeamAggregationRel.this.groupSet)))).setCoder((Coder)KvCoder.of((Coder)keyCoder, (Coder)upstream.getCoder()));
            SchemaCoder aggCoder = SchemaCoder.of((Schema)this.exAggFieldsSchema());
            PCollection aggregatedStream = ((PCollection)exCombineByStream.apply("combineBy", (PTransform)Combine.perKey((CombineFnBase.GlobalCombineFn)new BeamAggregationTransforms.AggregationAdaptor(BeamAggregationRel.this.getNamedAggCalls(), CalciteUtils.toSchema(BeamAggregationRel.this.input.getRowType()))))).setCoder((Coder)KvCoder.of((Coder)keyCoder, (Coder)aggCoder));
            PCollection mergedStream = (PCollection)aggregatedStream.apply("mergeRecord", (PTransform)ParDo.of((DoFn)new BeamAggregationTransforms.MergeAggregationRecord(CalciteUtils.toSchema(BeamAggregationRel.this.getRowType()), BeamAggregationRel.this.windowFieldIndex)));
            mergedStream.setRowSchema(CalciteUtils.toSchema(BeamAggregationRel.this.getRowType()));
            return mergedStream;
        }

        private void validateWindowIsSupported(PCollection<Row> upstream) {
            WindowingStrategy windowingStrategy = upstream.getWindowingStrategy();
            if (windowingStrategy.getWindowFn() instanceof GlobalWindows && windowingStrategy.getTrigger() instanceof DefaultTrigger && upstream.isBounded() != PCollection.IsBounded.BOUNDED) {
                throw new UnsupportedOperationException("Please explicitly specify windowing in SQL query using HOP/TUMBLE/SESSION functions (default trigger will be used in this case). Unbounded input with global windowing and default trigger is not supported in Beam SQL aggregations. See GroupByKey section in Beam Programming Guide");
            }
        }

        private Schema exKeyFieldsSchema(RelDataType relDataType) {
            Schema inputSchema = CalciteUtils.toSchema(relDataType);
            return (Schema)BeamAggregationRel.this.groupSet.asList().stream().filter(i -> i != BeamAggregationRel.this.windowFieldIndex).map(i -> this.newRowField(inputSchema, (int)i)).collect(Schema.toSchema());
        }

        private Schema.Field newRowField(Schema schema, int i) {
            return schema.getField(i);
        }

        private Schema exAggFieldsSchema() {
            return (Schema)BeamAggregationRel.this.getNamedAggCalls().stream().map(this::newRowField).collect(Schema.toSchema());
        }

        private Schema.Field newRowField(Pair<AggregateCall, String> namedAggCall) {
            return CalciteUtils.toField((String)namedAggCall.right, ((AggregateCall)namedAggCall.left).getType());
        }
    }
}

