/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.RelNode;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.CorrelationId;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.JoinRelType;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexCall;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexNode;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.util.Pair;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.joinlibrary.Join;
import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;

public class BeamJoinRel
extends org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.Join
implements BeamRelNode {
    public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
        super(cluster, traits, left, right, condition, variablesSet, joinType);
    }

    @Override
    public org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
        return new BeamJoinRel(this.getCluster(), traitSet, left, right, conditionExpr, this.variablesSet, joinType);
    }

    @Override
    public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
        return new Transform();
    }

    private void verifySupportedTrigger(PCollection<Row> pCollection) {
        WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
        if (PCollection.IsBounded.UNBOUNDED.equals((Object)pCollection.isBounded()) && !this.triggersOncePerWindow(windowingStrategy)) {
            throw new UnsupportedOperationException("Joining unbounded PCollections is currently only supported for non-global windows with triggers that are known to produce output once per window,such as the default trigger with zero allowed lateness. In these cases Beam can guarantee it joins all input elements once per window. " + windowingStrategy + " is not supported");
        }
    }

    private boolean triggersOncePerWindow(WindowingStrategy windowingStrategy) {
        Trigger trigger = windowingStrategy.getTrigger();
        return !(windowingStrategy.getWindowFn() instanceof GlobalWindows) && trigger instanceof DefaultTrigger && Duration.ZERO.equals((Object)windowingStrategy.getAllowedLateness());
    }

    private PCollection<Row> standardJoin(PCollection<KV<Row, Row>> extractedLeftRows, PCollection<KV<Row, Row>> extractedRightRows, Row leftNullRow, Row rightNullRow, String stageName) {
        PCollection joinedRows = null;
        switch (this.joinType) {
            case LEFT: {
                joinedRows = Join.leftOuterJoin(extractedLeftRows, extractedRightRows, (Object)rightNullRow);
                break;
            }
            case RIGHT: {
                joinedRows = Join.rightOuterJoin(extractedLeftRows, extractedRightRows, (Object)leftNullRow);
                break;
            }
            case FULL: {
                joinedRows = Join.fullOuterJoin(extractedLeftRows, extractedRightRows, (Object)leftNullRow, (Object)rightNullRow);
                break;
            }
            default: {
                joinedRows = Join.innerJoin(extractedLeftRows, extractedRightRows);
            }
        }
        PCollection ret = ((PCollection)joinedRows.apply(stageName + "_JoinParts2WholeRow", (PTransform)MapElements.via((SimpleFunction)new BeamJoinTransforms.JoinParts2WholeRow()))).setCoder((Coder)CalciteUtils.toBeamSchema(this.getRowType()).getRowCoder());
        return ret;
    }

    public PCollection<Row> sideInputJoin(PCollection<KV<Row, Row>> extractedLeftRows, PCollection<KV<Row, Row>> extractedRightRows, Row leftNullRow, Row rightNullRow) {
        boolean swapped = extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED;
        JoinRelType realJoinType = swapped && this.joinType != JoinRelType.INNER ? JoinRelType.LEFT : this.joinType;
        PCollection<KV<Row, Row>> realLeftRows = swapped ? extractedRightRows : extractedLeftRows;
        PCollection<KV<Row, Row>> realRightRows = swapped ? extractedLeftRows : extractedRightRows;
        Row realRightNullRow = swapped ? leftNullRow : rightNullRow;
        return this.sideInputJoinHelper(realJoinType, realLeftRows, realRightRows, realRightNullRow, swapped);
    }

    private PCollection<Row> sideInputJoinHelper(JoinRelType joinType, PCollection<KV<Row, Row>> leftRows, PCollection<KV<Row, Row>> rightRows, Row rightNullRow, boolean swapped) {
        PCollectionView rowsView = (PCollectionView)rightRows.apply((PTransform)View.asMultimap());
        PCollection ret = ((PCollection)leftRows.apply((PTransform)ParDo.of((DoFn)new BeamJoinTransforms.SideInputJoinDoFn(joinType, rightNullRow, (PCollectionView<Map<Row, Iterable<Row>>>)rowsView, swapped)).withSideInputs(new PCollectionView[]{rowsView}))).setCoder((Coder)CalciteUtils.toBeamSchema(this.getRowType()).getRowCoder());
        return ret;
    }

    private Row buildNullRow(BeamRelNode relNode) {
        Schema leftType = CalciteUtils.toBeamSchema(relNode.getRowType());
        return Row.nullRow((Schema)leftType);
    }

    private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
        if (this.condition instanceof RexLiteral && ((Boolean)((RexLiteral)this.condition).getValue()).booleanValue()) {
            throw new UnsupportedOperationException("CROSS JOIN is not supported!");
        }
        RexCall call = (RexCall)this.condition;
        ArrayList<Pair<Integer, Integer>> pairs = new ArrayList<Pair<Integer, Integer>>();
        if ("AND".equals(call.getOperator().getName())) {
            List<RexNode> operands = call.getOperands();
            for (RexNode rexNode : operands) {
                Pair<Integer, Integer> pair = this.extractOneJoinColumn((RexCall)rexNode, leftRowColumnCount);
                pairs.add(pair);
            }
        } else if ("=".equals(call.getOperator().getName())) {
            pairs.add(this.extractOneJoinColumn(call, leftRowColumnCount));
        } else {
            throw new UnsupportedOperationException("Operator " + call.getOperator().getName() + " is not supported in join condition");
        }
        return pairs;
    }

    private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition, int leftRowColumnCount) {
        List<RexNode> operands = oneCondition.getOperands();
        int leftIndex = Math.min(((RexInputRef)operands.get(0)).getIndex(), ((RexInputRef)operands.get(1)).getIndex());
        int rightIndex1 = Math.max(((RexInputRef)operands.get(0)).getIndex(), ((RexInputRef)operands.get(1)).getIndex());
        int rightIndex = rightIndex1 - leftRowColumnCount;
        return new Pair<Integer, Integer>(leftIndex, rightIndex);
    }

    private PCollection<Row> joinAsLookup(BeamRelNode leftRelNode, BeamRelNode rightRelNode, PCollectionTuple inputPCollections) {
        PCollection factStream = (PCollection)inputPCollections.apply(leftRelNode.toPTransform());
        BeamIOSourceRel srcRel = (BeamIOSourceRel)rightRelNode;
        BeamSqlSeekableTable seekableTable = (BeamSqlSeekableTable)((Object)srcRel.getBeamSqlTable());
        return (PCollection)factStream.apply("join_as_lookup", (PTransform)new BeamJoinTransforms.JoinAsLookup(this.condition, seekableTable, CalciteUtils.toBeamSchema(rightRelNode.getRowType()), CalciteUtils.toBeamSchema(leftRelNode.getRowType()).getFieldCount()));
    }

    private boolean seekable(BeamRelNode relNode) {
        BeamIOSourceRel srcRel;
        BeamSqlTable sourceTable;
        return relNode instanceof BeamIOSourceRel && (sourceTable = (srcRel = (BeamIOSourceRel)relNode).getBeamSqlTable()) instanceof BeamSqlSeekableTable;
    }

    private class Transform
    extends PTransform<PCollectionTuple, PCollection<Row>> {
        private Transform() {
        }

        public PCollection<Row> expand(PCollectionTuple inputPCollections) {
            BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(BeamJoinRel.this.left);
            Schema leftSchema = CalciteUtils.toBeamSchema(BeamJoinRel.this.left.getRowType());
            BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(BeamJoinRel.this.right);
            if (!BeamJoinRel.this.seekable(leftRelNode) && BeamJoinRel.this.seekable(rightRelNode)) {
                return BeamJoinRel.this.joinAsLookup(leftRelNode, rightRelNode, inputPCollections).setCoder((Coder)CalciteUtils.toBeamSchema(BeamJoinRel.this.getRowType()).getRowCoder());
            }
            PCollection leftRows = (PCollection)inputPCollections.apply("left", leftRelNode.toPTransform());
            PCollection rightRows = (PCollection)inputPCollections.apply("right", rightRelNode.toPTransform());
            BeamJoinRel.this.verifySupportedTrigger((PCollection<Row>)leftRows);
            BeamJoinRel.this.verifySupportedTrigger((PCollection<Row>)rightRows);
            String stageName = BeamSqlRelUtils.getStageName(BeamJoinRel.this);
            WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
            WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
            List pairs = BeamJoinRel.this.extractJoinColumns(leftRelNode.getRowType().getFieldCount());
            Schema extractKeySchema = (Schema)pairs.stream().map(pair -> leftSchema.getField(((Integer)pair.getKey()).intValue())).collect(Schema.toSchema());
            RowCoder extractKeyRowCoder = extractKeySchema.getRowCoder();
            PCollection extractedLeftRows = ((PCollection)leftRows.apply(stageName + "_left_ExtractJoinFields", (PTransform)MapElements.via((SimpleFunction)new BeamJoinTransforms.ExtractJoinFields(true, pairs)))).setCoder((Coder)KvCoder.of((Coder)extractKeyRowCoder, (Coder)leftRows.getCoder()));
            PCollection extractedRightRows = ((PCollection)rightRows.apply(stageName + "_right_ExtractJoinFields", (PTransform)MapElements.via((SimpleFunction)new BeamJoinTransforms.ExtractJoinFields(false, pairs)))).setCoder((Coder)KvCoder.of((Coder)extractKeyRowCoder, (Coder)rightRows.getCoder()));
            Row leftNullRow = BeamJoinRel.this.buildNullRow(leftRelNode);
            Row rightNullRow = BeamJoinRel.this.buildNullRow(rightRelNode);
            if (leftRows.isBounded() == PCollection.IsBounded.BOUNDED && rightRows.isBounded() == PCollection.IsBounded.BOUNDED || leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED) {
                try {
                    leftWinFn.verifyCompatibility(rightWinFn);
                }
                catch (IncompatibleWindowException e) {
                    throw new IllegalArgumentException("WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e);
                }
                return BeamJoinRel.this.standardJoin((PCollection<KV<Row, Row>>)extractedLeftRows, (PCollection<KV<Row, Row>>)extractedRightRows, leftNullRow, rightNullRow, stageName);
            }
            if (leftRows.isBounded() == PCollection.IsBounded.BOUNDED && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED || leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) {
                if (BeamJoinRel.this.joinType == JoinRelType.FULL) {
                    throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join a bounded table with an unbounded table.");
                }
                if (BeamJoinRel.this.joinType == JoinRelType.LEFT && leftRows.isBounded() == PCollection.IsBounded.BOUNDED || BeamJoinRel.this.joinType == JoinRelType.RIGHT && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) {
                    throw new UnsupportedOperationException("LEFT side of an OUTER JOIN must be Unbounded table.");
                }
                return BeamJoinRel.this.sideInputJoin((PCollection<KV<Row, Row>>)extractedLeftRows, (PCollection<KV<Row, Row>>)extractedRightRows, leftNullRow, rightNullRow);
            }
            throw new UnsupportedOperationException("The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn);
        }
    }
}

