/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.sql.impl.TVFSlidingWindowFn;
import org.apache.beam.sdk.extensions.sql.impl.ZetaSqlUserDefinedSQLNativeTableValuedFunction;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_28_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptPlanner;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.RelColumnMapping;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;

public class BeamTableFunctionScanRel
extends TableFunctionScan
implements BeamRelNode {
    public BeamTableFunctionScanRel(RelOptCluster cluster, RelTraitSet traitSet, List<RelNode> inputs, RexNode rexCall, Type elementType, RelDataType rowType, Set<RelColumnMapping> columnMappings) {
        super(cluster, traitSet, inputs, rexCall, elementType, rowType, columnMappings);
    }

    public TableFunctionScan copy(RelTraitSet traitSet, List<RelNode> list, RexNode rexNode, Type type, RelDataType relDataType, Set<RelColumnMapping> set) {
        return new BeamTableFunctionScanRel(this.getCluster(), traitSet, list, rexNode, type, relDataType, set);
    }

    @Override
    public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
        return new Transform();
    }

    private Duration durationParameter(RexNode node) {
        return Duration.millis((long)this.longValue(node));
    }

    private long longValue(RexNode operand) {
        if (operand instanceof RexLiteral) {
            return ((Number)((Object)RexLiteral.value((RexNode)operand))).longValue();
        }
        throw new IllegalArgumentException(String.format("[%s] is not valid.", operand));
    }

    @Override
    public NodeStats estimateNodeStats(BeamRelMetadataQuery mq) {
        return BeamSqlRelUtils.getNodeStats(this.getInput(0), mq);
    }

    @Override
    public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, BeamRelMetadataQuery mq) {
        NodeStats inputEstimates = BeamSqlRelUtils.getNodeStats(this.getInput(0), mq);
        double rowSize = this.getRowType().getFieldCount();
        double cpu = inputEstimates.getRowCount() * rowSize;
        double cpuRate = inputEstimates.getRate() * inputEstimates.getWindow() * rowSize;
        return BeamCostModel.FACTORY.makeCost(cpu, cpuRate);
    }

    private static class SessionWindowDoFn
    extends DoFn<KV<Row, Iterable<Row>>, Row> {
        private final Schema outputSchema;

        public SessionWindowDoFn(Schema schema) {
            this.outputSchema = schema;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<Row, Iterable<Row>> element, BoundedWindow window, DoFn.OutputReceiver<Row> out) {
            IntervalWindow intervalWindow = (IntervalWindow)window;
            for (Row cur : (Iterable)element.getValue()) {
                Row.Builder builder = Row.withSchema((Schema)this.outputSchema).addValues(cur.getValues()).addValue((Object)intervalWindow.start()).addValue((Object)intervalWindow.end());
                out.output((Object)builder.build());
            }
        }
    }

    private static class SlidingWindowDoFn
    extends DoFn<Row, Row> {
        private final int windowFieldIndex;
        private final SlidingWindows windowFn;
        private final Schema outputSchema;

        public SlidingWindowDoFn(SlidingWindows windowFn, int windowFieldIndex, Schema schema) {
            this.windowFn = windowFn;
            this.windowFieldIndex = windowFieldIndex;
            this.outputSchema = schema;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            Row row = (Row)c.element();
            Collection windows = this.windowFn.assignWindows(row.getDateTime(this.windowFieldIndex).toInstant());
            for (IntervalWindow window : windows) {
                Row.Builder builder = Row.withSchema((Schema)this.outputSchema);
                builder.addValues(row.getValues());
                builder.addValue((Object)window.start());
                builder.addValue((Object)window.end());
                c.output((Object)builder.build());
            }
        }
    }

    private static class FixedWindowDoFn
    extends DoFn<Row, Row> {
        private final int windowFieldIndex;
        private final FixedWindows windowFn;
        private final Schema outputSchema;

        public FixedWindowDoFn(FixedWindows windowFn, int windowFieldIndex, Schema schema) {
            this.windowFn = windowFn;
            this.windowFieldIndex = windowFieldIndex;
            this.outputSchema = schema;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            Row row = (Row)c.element();
            IntervalWindow window = this.windowFn.assignWindow(row.getDateTime(this.windowFieldIndex).toInstant());
            Row.Builder builder = Row.withSchema((Schema)this.outputSchema);
            builder.addValues(row.getValues());
            builder.addValue((Object)window.start());
            builder.addValue((Object)window.end());
            c.output((Object)builder.build());
        }
    }

    private static class SessionKeyDoFn
    extends DoFn<Row, KV<Row, Row>> {
        private final Schema keySchema;
        private final List<Integer> keyIndex;

        public SessionKeyDoFn(Schema keySchema, List<Integer> keyIndex) {
            this.keySchema = keySchema;
            this.keyIndex = keyIndex;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            Row row = (Row)c.element();
            Row.Builder builder = Row.withSchema((Schema)this.keySchema);
            for (Integer i : this.keyIndex) {
                builder.addValue(row.getValue(i.intValue()));
            }
            Row keyRow = builder.build();
            c.output((Object)KV.of((Object)keyRow, (Object)row));
        }
    }

    private class Transform
    extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private TVFToPTransform tumbleToPTransform = (call, upstream) -> {
            RexInputRef wmCol = (RexInputRef)call.getOperands().get(1);
            Schema outputSchema = CalciteUtils.toSchema(BeamTableFunctionScanRel.this.getRowType());
            FixedWindows windowFn = FixedWindows.of((Duration)BeamTableFunctionScanRel.this.durationParameter((RexNode)call.getOperands().get(2)));
            PCollection streamWithWindowMetadata = ((PCollection)upstream.apply((PTransform)ParDo.of((DoFn)new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))).setRowSchema(outputSchema);
            PCollection<Row> windowedStream = this.assignTimestampsAndWindow((PCollection<Row>)streamWithWindowMetadata, wmCol.getIndex(), (WindowFn<Row, IntervalWindow>)windowFn);
            return windowedStream;
        };
        private TVFToPTransform hopToPTransform = (call, upstream) -> {
            RexInputRef wmCol = (RexInputRef)call.getOperands().get(1);
            Schema outputSchema = CalciteUtils.toSchema(BeamTableFunctionScanRel.this.getRowType());
            Duration period = BeamTableFunctionScanRel.this.durationParameter((RexNode)call.getOperands().get(2));
            Duration size = BeamTableFunctionScanRel.this.durationParameter((RexNode)call.getOperands().get(3));
            SlidingWindows windowFn = SlidingWindows.of((Duration)size).every(period);
            PCollection streamWithWindowMetadata = ((PCollection)upstream.apply((PTransform)ParDo.of((DoFn)new SlidingWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))).setRowSchema(outputSchema);
            TVFSlidingWindowFn specialWindowFn = TVFSlidingWindowFn.of(size, period);
            PCollection<Row> windowedStream = this.assignTimestampsAndWindow((PCollection<Row>)streamWithWindowMetadata, wmCol.getIndex(), (WindowFn<Row, IntervalWindow>)specialWindowFn);
            return windowedStream;
        };
        private TVFToPTransform sessionToPTransform = (call, upstream) -> {
            RexInputRef wmCol = (RexInputRef)call.getOperands().get(1);
            Duration gap = BeamTableFunctionScanRel.this.durationParameter((RexNode)call.getOperands().get(2));
            ArrayList<Integer> keyIndex = new ArrayList<Integer>();
            for (RexNode node : call.getOperands().subList(3, call.getOperands().size())) {
                keyIndex.add(((RexInputRef)node).getIndex());
            }
            Sessions sessions = Sessions.withGapDuration((Duration)gap);
            PCollection<Row> windowedStream = this.assignTimestampsAndWindow((PCollection<Row>)upstream, wmCol.getIndex(), (WindowFn<Row, IntervalWindow>)sessions);
            Schema inputSchema = upstream.getSchema();
            Schema keySchema = this.getKeySchema(inputSchema, keyIndex);
            Schema outputSchema = CalciteUtils.toSchema(BeamTableFunctionScanRel.this.getRowType());
            PCollection streamWithWindowMetadata = ((PCollection)((PCollection)((PCollection)windowedStream.apply("assign_session_key", (PTransform)ParDo.of((DoFn)new SessionKeyDoFn(keySchema, keyIndex)))).setCoder((Coder)KvCoder.of((Coder)RowCoder.of((Schema)keySchema), (Coder)upstream.getCoder())).apply((PTransform)GroupByKey.create())).apply((PTransform)ParDo.of((DoFn)new SessionWindowDoFn(outputSchema)))).setRowSchema(outputSchema);
            PCollection reWindowedStream = (PCollection)streamWithWindowMetadata.apply("reWindowIntoGlobalWindow", (PTransform)Window.into((WindowFn)new GlobalWindows()));
            return reWindowedStream;
        };
        private final ImmutableMap<String, TVFToPTransform> tvfToPTransformMap = ImmutableMap.of((Object)"TUMBLE", (Object)this.tumbleToPTransform, (Object)"HOP", (Object)this.hopToPTransform, (Object)"SESSION", (Object)this.sessionToPTransform);

        private Transform() {
        }

        public PCollection<Row> expand(PCollectionList<Row> input) {
            Preconditions.checkArgument((input.size() == 1 ? 1 : 0) != 0, (String)"Wrong number of inputs for %s, expected 1 input but received: %s", (Object)BeamTableFunctionScanRel.class.getSimpleName(), input);
            String operatorName = ((RexCall)BeamTableFunctionScanRel.this.getCall()).getOperator().getName();
            if (this.tvfToPTransformMap.keySet().contains((Object)operatorName)) {
                return ((TVFToPTransform)this.tvfToPTransformMap.get((Object)operatorName)).toPTransform((RexCall)BeamTableFunctionScanRel.this.getCall(), (PCollection<Row>)input.get(0));
            }
            if (((RexCall)BeamTableFunctionScanRel.this.getCall()).getOperator() instanceof ZetaSqlUserDefinedSQLNativeTableValuedFunction) {
                return input.get(0);
            }
            throw new IllegalArgumentException(String.format("Does not support table_valued function: %s", operatorName));
        }

        private Schema getKeySchema(Schema inputSchema, List<Integer> keys) {
            ArrayList<Schema.Field> fields = new ArrayList<Schema.Field>();
            for (Integer i : keys) {
                fields.add(inputSchema.getField(i.intValue()));
            }
            return Schema.builder().addFields(fields).build();
        }

        private PCollection<Row> assignTimestampsAndWindow(PCollection<Row> upstream, int windowFieldIndex, WindowFn<Row, IntervalWindow> windowFn) {
            PCollection windowedStream = (PCollection)((PCollection)upstream.apply("assignEventTimestamp", (PTransform)WithTimestamps.of((SerializableFunction & Serializable)row -> row.getDateTime(windowFieldIndex).toInstant()).withAllowedTimestampSkew(Duration.millis((long)Long.MAX_VALUE)))).setCoder(upstream.getCoder()).apply((PTransform)Window.into(windowFn));
            return windowedStream;
        }
    }

    private static interface TVFToPTransform {
        public PCollection<Row> toPTransform(RexCall var1, PCollection<Row> var2);
    }
}

