/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.engineconnplugin.flink.client.result;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.flink.util.AbstractID;
import org.apache.linkis.engineconnplugin.flink.client.result.AbstractResult;
import org.apache.linkis.engineconnplugin.flink.client.result.CollectBatchTableSink;
import org.apache.linkis.engineconnplugin.flink.client.result.TypedResult;
import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.JobExecutionException;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.SqlExecutionException;
import org.apache.linkis.engineconnplugin.flink.listener.FlinkStatusListener;
import org.apache.linkis.engineconnplugin.flink.listener.RowsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchResult<C>
extends AbstractResult<C, Row> {
    private Logger LOG = LoggerFactory.getLogger(this.getClass());
    private final String accumulatorName;
    private final CollectBatchTableSink tableSink;
    private final Object resultLock;
    private AtomicReference<SqlExecutionException> executionException = new AtomicReference();
    private List<Row> resultTable;
    private boolean allResultRetrieved = false;

    public BatchResult(TableSchema tableSchema, RowTypeInfo outputType, ExecutionConfig config) {
        this.accumulatorName = new AbstractID().toString();
        this.tableSink = new CollectBatchTableSink(this.accumulatorName, (TypeSerializer<Row>)outputType.createSerializer(config), tableSchema);
        this.resultLock = new Object();
    }

    @Override
    public void startRetrieval(JobClient jobClient) {
        ((CompletableFuture)((CompletableFuture)CompletableFuture.completedFuture(jobClient).thenCompose(JobClient::getJobExecutionResult)).thenAccept((Consumer)new ResultRetrievalHandler())).whenComplete((unused, throwable) -> {
            if (throwable != null) {
                this.executionException.compareAndSet(null, new SqlExecutionException(FlinkErrorCodeSummary.ERROR_SUBMITTING_JOB.getErrorDesc(), throwable));
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TypedResult<List<Row>> retrieveChanges() throws SqlExecutionException {
        Object object = this.resultLock;
        synchronized (object) {
            SqlExecutionException e = this.executionException.get();
            if (e != null) {
                throw e;
            }
            if (null == this.resultTable) {
                return TypedResult.empty();
            }
            if (this.allResultRetrieved) {
                return TypedResult.endOfStream();
            }
            this.allResultRetrieved = true;
            return TypedResult.payload(this.resultTable);
        }
    }

    @Override
    public TableSink<?> getTableSink() {
        return this.tableSink;
    }

    @Override
    public void close() {
    }

    private class ResultRetrievalHandler
    implements Consumer<JobExecutionResult> {
        private ResultRetrievalHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void accept(JobExecutionResult jobExecutionResult) {
            try {
                ArrayList accResult = (ArrayList)jobExecutionResult.getAccumulatorResult(BatchResult.this.accumulatorName);
                if (accResult == null) {
                    throw new JobExecutionException(FlinkErrorCodeSummary.NOT_RETRIEVE_RESULT.getErrorDesc());
                }
                List resultTable = SerializedListAccumulator.deserializeList((ArrayList)accResult, BatchResult.this.tableSink.getSerializer());
                Object object = BatchResult.this.resultLock;
                synchronized (object) {
                    BatchResult.this.resultTable = resultTable;
                }
                BatchResult.this.LOG.info("Accept the result, row is {}.", (Object)resultTable.size());
                BatchResult.this.getFlinkStatusListeners().forEach(listener -> listener.onSuccess(resultTable.size(), RowsType.Fetched()));
            }
            catch (IOException | ClassNotFoundException | JobExecutionException e) {
                BatchResult.this.getFlinkStatusListeners().forEach(arg_0 -> ResultRetrievalHandler.lambda$accept$1((Exception)e, arg_0));
                throw new RuntimeException("Serialization error while deserialize collected data.", e);
            }
        }

        private static /* synthetic */ void lambda$accept$1(Exception e, FlinkStatusListener listener) {
            listener.onFailed("Serialization error while deserialize collected data.", e, RowsType.Fetched());
        }
    }
}

