/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.List;
import java.util.Optional;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.RelNode;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.Aggregate;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.AggregateCall;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.util.ImmutableBitSet;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.rule.AggregateWindowField;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;

public class BeamAggregationRel
extends Aggregate
implements BeamRelNode {
    private final int windowFieldIndex;
    private Optional<AggregateWindowField> windowField;

    public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls, Optional<AggregateWindowField> windowField) {
        super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
        this.windowField = windowField;
        this.windowFieldIndex = windowField.map(AggregateWindowField::fieldIndex).orElse(-1);
    }

    @Override
    public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
        return new Transform();
    }

    @Override
    public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
        return new BeamAggregationRel(this.getCluster(), traitSet, input, indicator, groupSet, groupSets, aggCalls, this.windowField);
    }

    private class Transform
    extends PTransform<PCollectionTuple, PCollection<Row>> {
        private Transform() {
        }

        public PCollection<Row> expand(PCollectionTuple inputPCollections) {
            RelNode input = BeamAggregationRel.this.getInput();
            String stageName = BeamSqlRelUtils.getStageName(BeamAggregationRel.this) + "_";
            PCollection upstream = (PCollection)inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
            if (BeamAggregationRel.this.windowField.isPresent()) {
                upstream = ((PCollection)upstream.apply(stageName + "assignEventTimestamp", (PTransform)WithTimestamps.of((SerializableFunction)new BeamAggregationTransforms.WindowTimestampFn(BeamAggregationRel.this.windowFieldIndex)).withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))).setCoder(upstream.getCoder());
            }
            PCollection windowedStream = BeamAggregationRel.this.windowField.isPresent() ? (PCollection)upstream.apply(stageName + "window", (PTransform)Window.into(((AggregateWindowField)BeamAggregationRel.this.windowField.get()).windowFn())) : upstream;
            this.validateWindowIsSupported((PCollection<Row>)windowedStream);
            Schema keySchema = this.exKeyFieldsSchema(input.getRowType());
            RowCoder keyCoder = keySchema.getRowCoder();
            PCollection exCombineByStream = ((PCollection)windowedStream.apply(stageName + "exCombineBy", (PTransform)WithKeys.of((SerializableFunction)new BeamAggregationTransforms.AggregationGroupByKeyFn(keySchema, BeamAggregationRel.this.windowFieldIndex, BeamAggregationRel.this.groupSet)))).setCoder((Coder)KvCoder.of((Coder)keyCoder, (Coder)upstream.getCoder()));
            RowCoder aggCoder = this.exAggFieldsSchema().getRowCoder();
            PCollection aggregatedStream = ((PCollection)exCombineByStream.apply(stageName + "combineBy", (PTransform)Combine.perKey((CombineFnBase.GlobalCombineFn)new BeamAggregationTransforms.AggregationAdaptor(BeamAggregationRel.this.getAggCallList(), CalciteUtils.toBeamSchema(input.getRowType()))))).setCoder((Coder)KvCoder.of((Coder)keyCoder, (Coder)aggCoder));
            PCollection mergedStream = (PCollection)aggregatedStream.apply(stageName + "mergeRecord", (PTransform)ParDo.of((DoFn)new BeamAggregationTransforms.MergeAggregationRecord(CalciteUtils.toBeamSchema(BeamAggregationRel.this.getRowType()), BeamAggregationRel.this.getAggCallList(), BeamAggregationRel.this.windowFieldIndex)));
            mergedStream.setCoder((Coder)CalciteUtils.toBeamSchema(BeamAggregationRel.this.getRowType()).getRowCoder());
            return mergedStream;
        }

        private void validateWindowIsSupported(PCollection<Row> upstream) {
            WindowingStrategy windowingStrategy = upstream.getWindowingStrategy();
            if (windowingStrategy.getWindowFn() instanceof GlobalWindows && windowingStrategy.getTrigger() instanceof DefaultTrigger && upstream.isBounded() != PCollection.IsBounded.BOUNDED) {
                throw new UnsupportedOperationException("Please explicitly specify windowing in SQL query using HOP/TUMBLE/SESSION functions (default trigger will be used in this case). Unbounded input with global windowing and default trigger is not supported in Beam SQL aggregations. See GroupByKey section in Beam Programming Guide");
            }
        }

        private Schema exKeyFieldsSchema(RelDataType relDataType) {
            Schema inputSchema = CalciteUtils.toBeamSchema(relDataType);
            return (Schema)BeamAggregationRel.this.groupSet.asList().stream().filter(i -> i != BeamAggregationRel.this.windowFieldIndex).map(i -> this.newRowField(inputSchema, (int)i)).collect(Schema.toSchema());
        }

        private Schema.Field newRowField(Schema schema, int i) {
            return schema.getField(i);
        }

        private Schema exAggFieldsSchema() {
            return (Schema)BeamAggregationRel.this.getAggCallList().stream().map(this::newRowField).collect(Schema.toSchema());
        }

        private Schema.Field newRowField(AggregateCall aggCall) {
            return Schema.Field.of((String)aggCall.getName(), (Schema.FieldType)CalciteUtils.toFieldType(aggCall.getType())).withNullable(aggCall.getType().isNullable());
        }
    }
}

