/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.engineconnplugin.flink.client.sql.operation.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.types.Row;
import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext;
import org.apache.linkis.engineconnplugin.flink.client.result.AbstractResult;
import org.apache.linkis.engineconnplugin.flink.client.result.BatchResult;
import org.apache.linkis.engineconnplugin.flink.client.result.ChangelogResult;
import org.apache.linkis.engineconnplugin.flink.client.result.ResultUtil;
import org.apache.linkis.engineconnplugin.flink.client.result.TypedResult;
import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.JobExecutionException;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.SqlExecutionException;
import org.apache.linkis.engineconnplugin.flink.client.sql.operation.AbstractJobOperation;
import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.ColumnInfo;
import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SelectOperation
extends AbstractJobOperation {
    private static final Logger LOG = LoggerFactory.getLogger(SelectOperation.class);
    private final String query;
    private AbstractResult<?, ?> result;
    private TableSchema resultSchema;
    private List<ColumnInfo> columnInfos;

    public SelectOperation(FlinkEngineConnContext context, String query) {
        super(context);
        this.query = query;
        this.noMoreResult = false;
    }

    @Override
    protected JobID submitJob() throws SqlExecutionException {
        JobID jobId = this.executeQueryInternal(this.context.getExecutionContext(), this.query);
        List resultSchemaColumns = this.resultSchema.getTableColumns();
        this.columnInfos = new ArrayList<ColumnInfo>();
        for (TableColumn column : resultSchemaColumns) {
            this.columnInfos.add(ColumnInfo.create(column.getName(), column.getType().getLogicalType()));
        }
        return jobId;
    }

    @Override
    protected void cancelJobInternal() throws JobExecutionException {
        LOG.info("Start to cancel job {} and result retrieval.", (Object)this.getJobId());
        this.result.close();
        super.cancelJobInternal();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected Optional<Tuple2<List<Row>, List<Boolean>>> fetchJobResults() throws SqlExecutionException {
        Optional<Tuple2<List<Row>, List<Boolean>>> ret;
        Object object = this.lock;
        synchronized (object) {
            if (this.result == null) {
                LOG.error("The job for this query has been canceled.");
                throw new SqlExecutionException(FlinkErrorCodeSummary.QUERY_CANCELED.getErrorDesc());
            }
            ret = this.result instanceof ChangelogResult ? this.fetchStreamingResult() : this.fetchBatchResult();
        }
        return ret;
    }

    @Override
    protected List<ColumnInfo> getColumnInfos() {
        return this.columnInfos;
    }

    private Optional<Tuple2<List<Row>, List<Boolean>>> fetchBatchResult() throws SqlExecutionException {
        BatchResult batchResult = (BatchResult)this.result;
        TypedResult<List<Row>> typedResult = batchResult.retrieveChanges();
        if (typedResult.getType() == TypedResult.ResultType.PAYLOAD) {
            List<Row> payload = typedResult.getPayload();
            return Optional.of(Tuple2.of(payload, null));
        }
        return Optional.of(Tuple2.of(Collections.emptyList(), null));
    }

    private Optional<Tuple2<List<Row>, List<Boolean>>> fetchStreamingResult() throws SqlExecutionException {
        ChangelogResult changLogResult = (ChangelogResult)this.result;
        TypedResult<List<Tuple2<Boolean, Row>>> typedResult = changLogResult.retrieveChanges();
        if (typedResult.getType() == TypedResult.ResultType.EOS) {
            return Optional.of(Tuple2.of(Collections.emptyList(), Collections.emptyList()));
        }
        if (typedResult.getType() == TypedResult.ResultType.PAYLOAD) {
            List<Tuple2<Boolean, Row>> payload = typedResult.getPayload();
            ArrayList<Object> data = new ArrayList<Object>();
            ArrayList<Object> changeFlags = new ArrayList<Object>();
            for (Tuple2<Boolean, Row> tuple : payload) {
                data.add(tuple.f1);
                changeFlags.add(tuple.f0);
            }
            return Optional.of(Tuple2.of(data, changeFlags));
        }
        return Optional.of(Tuple2.of(Collections.emptyList(), Collections.emptyList()));
    }

    private JobID executeQueryInternal(ExecutionContext executionContext, String query) throws SqlExecutionException {
        TableResult tableResult;
        Table table = this.createTable(executionContext, executionContext.getTableEnvironment(), query);
        boolean isChangelogResult = executionContext.getEnvironment().getExecution().inStreamingMode();
        this.resultSchema = this.removeTimeAttributes(table.getSchema());
        if (isChangelogResult) {
            this.result = ResultUtil.createChangelogResult(executionContext.getFlinkConfig(), executionContext.getEnvironment(), this.resultSchema, executionContext.getExecutionConfig());
        } else {
            executionContext.getExecutionConfig().setExecutionMode(ExecutionMode.BATCH);
            this.result = ResultUtil.createBatchResult(this.resultSchema, executionContext.getExecutionConfig());
        }
        this.result.setFlinkListeners(this.getFlinkListeners());
        String tableName = String.format("_tmp_table_%s", UUID.randomUUID().toString().replace("-", ""));
        try {
            tableResult = executionContext.wrapClassLoader(tableEnv -> {
                tableEnv.registerTableSinkInternal(tableName, this.result.getTableSink());
                return table.executeInsert(tableName);
            });
        }
        catch (Exception t) {
            this.result.close();
            LOG.error(String.format("Invalid SQL query, sql is %s.", query), (Throwable)t);
            throw new SqlExecutionException(FlinkErrorCodeSummary.INVALID_SQL_QUERY.getErrorDesc(), (Throwable)t);
        }
        finally {
            executionContext.wrapClassLoader(tableEnv -> tableEnv.dropTemporaryTable(tableName));
        }
        return tableResult.getJobClient().map(jobClient -> {
            JobID jobId = jobClient.getJobID();
            LOG.info("Submit flink job: {} successfully.", (Object)jobId);
            this.result.startRetrieval((JobClient)jobClient);
            return jobId;
        }).orElseThrow(() -> new SqlExecutionException(FlinkErrorCodeSummary.NOT_JOB_ASD_ADMIN.getErrorDesc()));
    }

    private Table createTable(ExecutionContext context, TableEnvironment tableEnv, String selectQuery) throws SqlExecutionException {
        try {
            return context.wrapClassLoader(() -> tableEnv.sqlQuery(selectQuery));
        }
        catch (Exception t) {
            throw new SqlExecutionException(FlinkErrorCodeSummary.INVALID_SQL_STATEMENT.getErrorDesc(), (Throwable)t);
        }
    }

    private TableSchema removeTimeAttributes(TableSchema schema) {
        TableSchema.Builder builder = TableSchema.builder();
        for (int i = 0; i < schema.getFieldCount(); ++i) {
            DataType dataType = schema.getFieldDataTypes()[i];
            DataType convertedType = DataTypeUtils.replaceLogicalType((DataType)dataType, (LogicalType)LogicalTypeUtils.removeTimeAttributes((LogicalType)dataType.getLogicalType()));
            builder.field(schema.getFieldNames()[i], convertedType);
        }
        return builder.build();
    }
}

