/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.interpreter;

import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexCall;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexCorrelVariable;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexFieldAccess;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexLocalRef;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexNode;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexProgram;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.runtime.SqlFunctions;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.type.SqlTypeName;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.util.NlsString;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCorrelVariableExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlDefaultExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlDotExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlLocalRefExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlOperatorExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlUdfExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowEndExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowStartExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.DateOperators;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.StringOperators;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlDivideExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMinusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlModExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlPlusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.array.BeamSqlArrayExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.array.BeamSqlArrayItemExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.collection.BeamSqlCardinalityExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.collection.BeamSqlSingleElementExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlEqualsExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNullExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLikeExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentDateExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimeExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.map.BeamSqlMapExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.map.BeamSqlMapItemExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAbsExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAcosExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAsinExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAtan2Expression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAtanExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlCeilExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlCosExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlCotExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlDegreesExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlExpExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlFloorExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlLnExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlLogExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlPiExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlPowerExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRadiansExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRandExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRandIntegerExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRoundExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSignExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSinExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTanExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTruncateExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret.BeamSqlReinterpretExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row.BeamSqlFieldAccessExpression;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;

public class BeamSqlFnExecutor
implements BeamSqlExpressionExecutor {
    private List<BeamSqlExpression> exprs;
    private BeamSqlExpression filterCondition;
    private List<BeamSqlExpression> projections;

    public BeamSqlFnExecutor(RexProgram program) {
        this.exprs = program.getExprList().stream().map(BeamSqlFnExecutor::buildExpression).collect(Collectors.toList());
        this.filterCondition = program.getCondition() == null ? BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true) : BeamSqlFnExecutor.buildExpression(program.getCondition());
        this.projections = program.getProjectList().stream().map(BeamSqlFnExecutor::buildExpression).collect(Collectors.toList());
    }

    static BeamSqlExpression buildExpression(RexNode rexNode) {
        BeamSqlExpression ret = BeamSqlFnExecutor.getBeamSqlExpression(rexNode);
        if (!ret.accept()) {
            throw new IllegalStateException(ret.getClass().getSimpleName() + " does not accept the operands.(" + rexNode + ")");
        }
        return ret;
    }

    private static BeamSqlExpression getBeamSqlExpression(RexNode rexNode) {
        BeamSqlExpression ret;
        if (rexNode instanceof RexLiteral) {
            RexLiteral node = (RexLiteral)rexNode;
            SqlTypeName type = node.getTypeName();
            Comparable value = node.getValue();
            if (SqlTypeName.CHAR_TYPES.contains((Object)type) && node.getValue() instanceof NlsString) {
                ret = BeamSqlPrimitive.of(type, ((NlsString)value).getValue());
            } else if (BeamSqlFnExecutor.isDateNode(type, value)) {
                ret = BeamSqlPrimitive.of(type, new DateTime(((Calendar)value).getTimeInMillis()));
            } else {
                SqlTypeName realType = node.getType().getSqlTypeName();
                Comparable realValue = value;
                if (SqlTypeName.NUMERIC_TYPES.contains((Object)type)) {
                    switch (realType) {
                        case TINYINT: {
                            realValue = Byte.valueOf(SqlFunctions.toByte(value));
                            break;
                        }
                        case SMALLINT: {
                            realValue = Short.valueOf(SqlFunctions.toShort(value));
                            break;
                        }
                        case INTEGER: {
                            realValue = Integer.valueOf(SqlFunctions.toInt(value));
                            break;
                        }
                        case BIGINT: {
                            realValue = Long.valueOf(SqlFunctions.toLong(value));
                            break;
                        }
                        case FLOAT: {
                            realValue = Float.valueOf(SqlFunctions.toFloat(value));
                            break;
                        }
                        case DOUBLE: {
                            realValue = Double.valueOf(SqlFunctions.toDouble(value));
                            break;
                        }
                        case DECIMAL: {
                            realValue = SqlFunctions.toBigDecimal(value);
                            break;
                        }
                        default: {
                            throw new IllegalStateException("Unsupported conversion: Attempted convert node " + node.toString() + " of type " + (Object)((Object)type) + "to " + (Object)((Object)realType));
                        }
                    }
                }
                ret = BeamSqlPrimitive.of(realType, realValue);
            }
        } else if (rexNode instanceof RexInputRef) {
            RexInputRef node = (RexInputRef)rexNode;
            ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
        } else if (rexNode instanceof RexCorrelVariable) {
            RexCorrelVariable correlVariable = (RexCorrelVariable)rexNode;
            ret = new BeamSqlCorrelVariableExpression(correlVariable.getType().getSqlTypeName(), correlVariable.id.getId());
        } else if (rexNode instanceof RexLocalRef) {
            RexLocalRef localRef = (RexLocalRef)rexNode;
            ret = new BeamSqlLocalRefExpression(localRef.getType().getSqlTypeName(), localRef.getIndex());
        } else if (rexNode instanceof RexFieldAccess) {
            RexFieldAccess fieldAccessNode = (RexFieldAccess)rexNode;
            BeamSqlExpression referenceExpression = BeamSqlFnExecutor.buildExpression(fieldAccessNode.getReferenceExpr());
            int nestedFieldIndex = fieldAccessNode.getField().getIndex();
            SqlTypeName nestedFieldType = fieldAccessNode.getField().getType().getSqlTypeName();
            ret = new BeamSqlFieldAccessExpression(referenceExpression, nestedFieldIndex, nestedFieldType);
        } else if (rexNode instanceof RexCall) {
            RexCall node = (RexCall)rexNode;
            String opName = node.op.getName();
            ArrayList<BeamSqlExpression> subExps = new ArrayList<BeamSqlExpression>();
            for (RexNode subNode : node.getOperands()) {
                subExps.add(BeamSqlFnExecutor.buildExpression(subNode));
            }
            block89 : switch (opName) {
                case "AND": {
                    ret = new BeamSqlAndExpression(subExps);
                    break;
                }
                case "OR": {
                    ret = new BeamSqlOrExpression(subExps);
                    break;
                }
                case "NOT": {
                    ret = new BeamSqlNotExpression(subExps);
                    break;
                }
                case "=": {
                    ret = new BeamSqlEqualsExpression(subExps);
                    break;
                }
                case "<>": {
                    ret = new BeamSqlNotEqualsExpression(subExps);
                    break;
                }
                case ">": {
                    ret = new BeamSqlGreaterThanExpression(subExps);
                    break;
                }
                case ">=": {
                    ret = new BeamSqlGreaterThanOrEqualsExpression(subExps);
                    break;
                }
                case "<": {
                    ret = new BeamSqlLessThanExpression(subExps);
                    break;
                }
                case "<=": {
                    ret = new BeamSqlLessThanOrEqualsExpression(subExps);
                    break;
                }
                case "LIKE": {
                    ret = new BeamSqlLikeExpression(subExps);
                    break;
                }
                case "+": {
                    if (SqlTypeName.NUMERIC_TYPES.contains((Object)node.type.getSqlTypeName())) {
                        ret = new BeamSqlPlusExpression(subExps);
                        break;
                    }
                    ret = new BeamSqlDatetimePlusExpression(subExps);
                    break;
                }
                case "-": {
                    if (SqlTypeName.NUMERIC_TYPES.contains((Object)node.type.getSqlTypeName())) {
                        ret = new BeamSqlMinusExpression(subExps);
                        break;
                    }
                    ret = new BeamSqlDatetimeMinusExpression(subExps, node.type.getSqlTypeName());
                    break;
                }
                case "*": {
                    if (SqlTypeName.NUMERIC_TYPES.contains((Object)node.type.getSqlTypeName())) {
                        ret = new BeamSqlMultiplyExpression(subExps);
                        break;
                    }
                    ret = new BeamSqlIntervalMultiplyExpression(subExps);
                    break;
                }
                case "/": 
                case "/INT": {
                    ret = new BeamSqlDivideExpression(subExps);
                    break;
                }
                case "MOD": {
                    ret = new BeamSqlModExpression(subExps);
                    break;
                }
                case "ABS": {
                    ret = new BeamSqlAbsExpression(subExps);
                    break;
                }
                case "ROUND": {
                    ret = new BeamSqlRoundExpression(subExps);
                    break;
                }
                case "LN": {
                    ret = new BeamSqlLnExpression(subExps);
                    break;
                }
                case "LOG10": {
                    ret = new BeamSqlLogExpression(subExps);
                    break;
                }
                case "EXP": {
                    ret = new BeamSqlExpExpression(subExps);
                    break;
                }
                case "ACOS": {
                    ret = new BeamSqlAcosExpression(subExps);
                    break;
                }
                case "ASIN": {
                    ret = new BeamSqlAsinExpression(subExps);
                    break;
                }
                case "ATAN": {
                    ret = new BeamSqlAtanExpression(subExps);
                    break;
                }
                case "COT": {
                    ret = new BeamSqlCotExpression(subExps);
                    break;
                }
                case "DEGREES": {
                    ret = new BeamSqlDegreesExpression(subExps);
                    break;
                }
                case "RADIANS": {
                    ret = new BeamSqlRadiansExpression(subExps);
                    break;
                }
                case "COS": {
                    ret = new BeamSqlCosExpression(subExps);
                    break;
                }
                case "SIN": {
                    ret = new BeamSqlSinExpression(subExps);
                    break;
                }
                case "TAN": {
                    ret = new BeamSqlTanExpression(subExps);
                    break;
                }
                case "SIGN": {
                    ret = new BeamSqlSignExpression(subExps);
                    break;
                }
                case "POWER": {
                    ret = new BeamSqlPowerExpression(subExps);
                    break;
                }
                case "PI": {
                    ret = new BeamSqlPiExpression();
                    break;
                }
                case "ATAN2": {
                    ret = new BeamSqlAtan2Expression(subExps);
                    break;
                }
                case "TRUNCATE": {
                    ret = new BeamSqlTruncateExpression(subExps);
                    break;
                }
                case "RAND": {
                    ret = new BeamSqlRandExpression(subExps);
                    break;
                }
                case "RAND_INTEGER": {
                    ret = new BeamSqlRandIntegerExpression(subExps);
                    break;
                }
                case "||": {
                    ret = new BeamSqlOperatorExpression(StringOperators.CONCAT, subExps);
                    break;
                }
                case "POSITION": {
                    ret = new BeamSqlOperatorExpression(StringOperators.POSITION, subExps);
                    break;
                }
                case "CHAR_LENGTH": 
                case "CHARACTER_LENGTH": {
                    ret = new BeamSqlOperatorExpression(StringOperators.CHAR_LENGTH, subExps);
                    break;
                }
                case "UPPER": {
                    ret = new BeamSqlOperatorExpression(StringOperators.UPPER, subExps);
                    break;
                }
                case "LOWER": {
                    ret = new BeamSqlOperatorExpression(StringOperators.LOWER, subExps);
                    break;
                }
                case "TRIM": {
                    ret = new BeamSqlOperatorExpression(StringOperators.TRIM, subExps);
                    break;
                }
                case "SUBSTRING": {
                    ret = new BeamSqlOperatorExpression(StringOperators.SUBSTRING, subExps);
                    break;
                }
                case "OVERLAY": {
                    ret = new BeamSqlOperatorExpression(StringOperators.OVERLAY, subExps);
                    break;
                }
                case "INITCAP": {
                    ret = new BeamSqlOperatorExpression(StringOperators.INIT_CAP, subExps);
                    break;
                }
                case "Reinterpret": {
                    ret = new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName());
                    break;
                }
                case "CEIL": {
                    if (SqlTypeName.NUMERIC_TYPES.contains((Object)node.type.getSqlTypeName())) {
                        ret = new BeamSqlCeilExpression(subExps);
                        break;
                    }
                    ret = new BeamSqlOperatorExpression(DateOperators.DATETIME_CEIL, subExps);
                    break;
                }
                case "FLOOR": {
                    if (SqlTypeName.NUMERIC_TYPES.contains((Object)node.type.getSqlTypeName())) {
                        ret = new BeamSqlFloorExpression(subExps);
                        break;
                    }
                    ret = new BeamSqlOperatorExpression(DateOperators.DATETIME_FLOOR, subExps);
                    break;
                }
                case "EXTRACT_DATE": 
                case "EXTRACT": {
                    ret = new BeamSqlOperatorExpression(DateOperators.EXTRACT, subExps);
                    break;
                }
                case "LOCALTIME": 
                case "CURRENT_TIME": {
                    ret = new BeamSqlCurrentTimeExpression(subExps);
                    break;
                }
                case "CURRENT_TIMESTAMP": 
                case "LOCALTIMESTAMP": {
                    ret = new BeamSqlCurrentTimestampExpression(subExps);
                    break;
                }
                case "CURRENT_DATE": {
                    ret = new BeamSqlCurrentDateExpression();
                    break;
                }
                case "DATETIME_PLUS": {
                    ret = new BeamSqlDatetimePlusExpression(subExps);
                    break;
                }
                case "ARRAY": {
                    ret = new BeamSqlArrayExpression(subExps);
                    break;
                }
                case "MAP": {
                    ret = new BeamSqlMapExpression(subExps);
                    break;
                }
                case "ITEM": {
                    switch (((BeamSqlExpression)subExps.get(0)).getOutputType()) {
                        case MAP: {
                            ret = new BeamSqlMapItemExpression(subExps, node.type.getSqlTypeName());
                            break block89;
                        }
                        case ARRAY: {
                            ret = new BeamSqlArrayItemExpression(subExps, node.type.getSqlTypeName());
                            break block89;
                        }
                    }
                    throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet");
                }
                case "ELEMENT": {
                    ret = new BeamSqlSingleElementExpression(subExps, node.type.getSqlTypeName());
                    break;
                }
                case "CARDINALITY": {
                    ret = new BeamSqlCardinalityExpression(subExps, node.type.getSqlTypeName());
                    break;
                }
                case "DOT": {
                    ret = new BeamSqlDotExpression(subExps, node.type.getSqlTypeName());
                    break;
                }
                case "DEFAULT": {
                    ret = new BeamSqlDefaultExpression();
                    break;
                }
                case "CASE": {
                    ret = new BeamSqlCaseExpression(subExps);
                    break;
                }
                case "CAST": {
                    ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName());
                    break;
                }
                case "IS NULL": {
                    ret = new BeamSqlIsNullExpression((BeamSqlExpression)subExps.get(0));
                    break;
                }
                case "IS NOT NULL": {
                    ret = new BeamSqlIsNotNullExpression((BeamSqlExpression)subExps.get(0));
                    break;
                }
                case "HOP": 
                case "TUMBLE": 
                case "SESSION": {
                    ret = new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
                    break;
                }
                case "HOP_START": 
                case "TUMBLE_START": 
                case "SESSION_START": {
                    ret = new BeamSqlWindowStartExpression();
                    break;
                }
                case "HOP_END": 
                case "TUMBLE_END": 
                case "SESSION_END": {
                    ret = new BeamSqlWindowEndExpression();
                    break;
                }
                default: {
                    if (((RexCall)rexNode).getOperator() instanceof SqlUserDefinedFunction) {
                        SqlUserDefinedFunction udf = (SqlUserDefinedFunction)((RexCall)rexNode).getOperator();
                        ScalarFunctionImpl fn = (ScalarFunctionImpl)udf.getFunction();
                        ret = new BeamSqlUdfExpression(fn.method, subExps, ((RexCall)rexNode).type.getSqlTypeName());
                        break;
                    }
                    throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet");
                }
            }
        } else {
            throw new UnsupportedOperationException(String.format("%s is not supported yet", rexNode.getClass().toString()));
        }
        return ret;
    }

    private static boolean isDateNode(SqlTypeName type, Object value) {
        return (type == SqlTypeName.DATE || type == SqlTypeName.TIMESTAMP) && value instanceof Calendar;
    }

    @Override
    public void prepare() {
    }

    @Override
    @Nullable
    public List<Object> execute(Row inputRow, BoundedWindow window, BeamSqlExpressionEnvironment env) {
        BeamSqlExpressionEnvironment localEnv = env.copyWithLocalRefExprs(this.exprs);
        boolean conditionResult = this.filterCondition.evaluate(inputRow, window, localEnv).getBoolean();
        if (conditionResult) {
            return this.projections.stream().map(project -> project.evaluate(inputRow, window, localEnv).getValue()).collect(Collectors.toList());
        }
        return null;
    }

    @Override
    public void close() {
    }
}

