/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.Join;
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.CorrelationId;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.JoinRelType;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.util.Pair;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;

public class BeamSideInputJoinRel
extends BeamJoinRel {
    public BeamSideInputJoinRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
        super(cluster, traitSet, left, right, condition, variablesSet, joinType);
    }

    public org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
        return new BeamSideInputJoinRel(this.getCluster(), traitSet, left, right, conditionExpr, (Set<CorrelationId>)this.variablesSet, joinType);
    }

    @Override
    public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
        if (this.joinType == JoinRelType.FULL) {
            throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join a bounded table with an unbounded table.");
        }
        BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(this.left);
        BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(this.right);
        if (this.joinType == JoinRelType.LEFT && leftRelNode.isBounded() == PCollection.IsBounded.BOUNDED || this.joinType == JoinRelType.RIGHT && rightRelNode.isBounded() == PCollection.IsBounded.BOUNDED) {
            throw new UnsupportedOperationException(String.format("%s side of an OUTER JOIN must be Unbounded table.", this.joinType.name()));
        }
        if (leftRelNode.isBounded() == PCollection.IsBounded.UNBOUNDED && rightRelNode.isBounded() == PCollection.IsBounded.UNBOUNDED) {
            throw new UnsupportedOperationException("Side input join can only be used if one table is bounded.");
        }
        return new SideInputJoin();
    }

    public PCollection<Row> sideInputJoin(PCollection<Row> leftRows, PCollection<Row> rightRows, FieldAccessDescriptor leftKeyFields, FieldAccessDescriptor rightKeyFields) {
        PCollection joined;
        boolean swapped = leftRows.isBounded() == PCollection.IsBounded.BOUNDED;
        JoinRelType realJoinType = this.joinType;
        if (swapped && this.joinType != JoinRelType.INNER) {
            Preconditions.checkArgument((realJoinType != JoinRelType.LEFT ? 1 : 0) != 0);
            realJoinType = JoinRelType.LEFT;
        }
        PCollection<Row> realLeftRows = swapped ? rightRows : leftRows;
        PCollection<Row> realRightRows = swapped ? leftRows : rightRows;
        FieldAccessDescriptor realLeftKeyFields = swapped ? rightKeyFields : leftKeyFields;
        FieldAccessDescriptor realRightKeyFields = swapped ? leftKeyFields : rightKeyFields;
        switch (realJoinType) {
            case INNER: {
                joined = (PCollection)realLeftRows.apply((PTransform)Join.innerBroadcastJoin(realRightRows).on(Join.FieldsEqual.left((FieldAccessDescriptor)realLeftKeyFields).right(realRightKeyFields)));
                break;
            }
            case LEFT: {
                joined = (PCollection)realLeftRows.apply((PTransform)Join.leftOuterBroadcastJoin(realRightRows).on(Join.FieldsEqual.left((FieldAccessDescriptor)realLeftKeyFields).right(realRightKeyFields)));
                break;
            }
            default: {
                throw new RuntimeException("Unexpected join type " + realJoinType);
            }
        }
        Schema schema = CalciteUtils.toSchema(this.getRowType());
        String lhsSelect = "lhs.*";
        String rhsSelect = "rhs.*";
        PCollection selected = !swapped ? (PCollection)joined.apply((PTransform)Select.fieldNames((String[])new String[]{lhsSelect, rhsSelect}).withOutputSchema(schema)) : (PCollection)joined.apply((PTransform)Select.fieldNames((String[])new String[]{rhsSelect, lhsSelect}).withOutputSchema(schema));
        return selected;
    }

    private class SideInputJoin
    extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private SideInputJoin() {
        }

        public PCollection<Row> expand(PCollectionList<Row> pinput) {
            Schema leftSchema = pinput.get(0).getSchema();
            Schema rightSchema = pinput.get(1).getSchema();
            PCollection leftRows = (PCollection)pinput.get(0).apply("left_TimestampCombiner", (PTransform)Window.configure().withTimestampCombiner(TimestampCombiner.EARLIEST));
            PCollection rightRows = (PCollection)pinput.get(1).apply("right_TimestampCombiner", (PTransform)Window.configure().withTimestampCombiner(TimestampCombiner.EARLIEST));
            List<Pair<RexNode, RexNode>> pairs = BeamJoinRel.extractJoinRexNodes(BeamSideInputJoinRel.this.condition);
            int leftRowColumnCount = BeamSqlRelUtils.getBeamRelInput(BeamSideInputJoinRel.this.left).getRowType().getFieldCount();
            FieldAccessDescriptor leftKeyFields = BeamJoinTransforms.getJoinColumns(true, pairs, 0, leftSchema);
            FieldAccessDescriptor rightKeyFields = BeamJoinTransforms.getJoinColumns(false, pairs, leftRowColumnCount, rightSchema);
            return BeamSideInputJoinRel.this.sideInputJoin((PCollection<Row>)leftRows, (PCollection<Row>)rightRows, leftKeyFields, rightKeyFields);
        }
    }
}

