/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.CorrelationId;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Join;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.JoinRelType;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;

public class BeamSideInputJoinRel
extends BeamJoinRel {
    public BeamSideInputJoinRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
        super(cluster, traitSet, left, right, condition, variablesSet, joinType);
    }

    public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
        return new BeamSideInputJoinRel(this.getCluster(), traitSet, left, right, conditionExpr, (Set<CorrelationId>)this.variablesSet, joinType);
    }

    @Override
    public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
        if (this.joinType == JoinRelType.FULL) {
            throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join a bounded table with an unbounded table.");
        }
        BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(this.left);
        BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(this.right);
        if (this.joinType == JoinRelType.LEFT && leftRelNode.isBounded() == PCollection.IsBounded.BOUNDED || this.joinType == JoinRelType.RIGHT && rightRelNode.isBounded() == PCollection.IsBounded.BOUNDED) {
            throw new UnsupportedOperationException(String.format("%s side of an OUTER JOIN must be Unbounded table.", this.joinType.name()));
        }
        return new SideInputJoin();
    }

    public PCollection<Row> sideInputJoin(PCollection<KV<Row, Row>> extractedLeftRows, PCollection<KV<Row, Row>> extractedRightRows, Schema leftSchema, Schema rightSchema) {
        Row realRightNullRow;
        PCollection<KV<Row, Row>> realRightRows;
        boolean swapped = extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED;
        JoinRelType realJoinType = swapped && this.joinType != JoinRelType.INNER ? JoinRelType.LEFT : this.joinType;
        PCollection<KV<Row, Row>> realLeftRows = swapped ? extractedRightRows : extractedLeftRows;
        PCollection<KV<Row, Row>> pCollection = realRightRows = swapped ? extractedLeftRows : extractedRightRows;
        if (swapped) {
            Schema leftNullSchema = this.buildNullSchema(leftSchema);
            realRightRows = BeamJoinRel.setValueCoder(realRightRows, SchemaCoder.of((Schema)leftNullSchema));
            realRightNullRow = Row.nullRow((Schema)leftNullSchema);
        } else {
            Schema rightNullSchema = this.buildNullSchema(rightSchema);
            realRightRows = BeamJoinRel.setValueCoder(realRightRows, SchemaCoder.of((Schema)rightNullSchema));
            realRightNullRow = Row.nullRow((Schema)rightNullSchema);
        }
        return this.sideInputJoinHelper(realJoinType, realLeftRows, realRightRows, realRightNullRow, swapped);
    }

    private PCollection<Row> sideInputJoinHelper(JoinRelType joinType, PCollection<KV<Row, Row>> leftRows, PCollection<KV<Row, Row>> rightRows, Row rightNullRow, boolean swapped) {
        PCollectionView rowsView = (PCollectionView)rightRows.apply((PTransform)View.asMultimap());
        Schema schema = CalciteUtils.toSchema(this.getRowType());
        return ((PCollection)leftRows.apply((PTransform)ParDo.of((DoFn)new BeamJoinTransforms.SideInputJoinDoFn(joinType, rightNullRow, (PCollectionView<Map<Row, Iterable<Row>>>)rowsView, swapped, schema)).withSideInputs(new PCollectionView[]{rowsView}))).setRowSchema(schema);
    }

    private class SideInputJoin
    extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private SideInputJoin() {
        }

        public PCollection<Row> expand(PCollectionList<Row> pinput) {
            Schema leftSchema = CalciteUtils.toSchema(BeamSideInputJoinRel.this.left.getRowType());
            Schema rightSchema = CalciteUtils.toSchema(BeamSideInputJoinRel.this.right.getRowType());
            PCollectionList keyedInputs = (PCollectionList)pinput.apply((PTransform)new BeamJoinRel.ExtractJoinKeys(BeamSideInputJoinRel.this));
            PCollection extractedLeftRows = keyedInputs.get(0);
            PCollection extractedRightRows = keyedInputs.get(1);
            return BeamSideInputJoinRel.this.sideInputJoin((PCollection<KV<Row, Row>>)extractedLeftRows, (PCollection<KV<Row, Row>>)extractedRightRows, leftSchema, rightSchema);
        }
    }
}

