/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.engineconnplugin.flink.client.result;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.experimental.SocketStreamIterator;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.linkis.engineconnplugin.flink.client.result.AbstractResult;
import org.apache.linkis.engineconnplugin.flink.client.result.CollectStreamTableSink;
import org.apache.linkis.engineconnplugin.flink.client.result.TypedResult;
import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.SqlExecutionException;
import org.apache.linkis.engineconnplugin.flink.listener.RowsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChangelogResult
extends AbstractResult<ApplicationId, Tuple2<Boolean, Row>> {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogResult.class);
    private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
    private final CollectStreamTableSink collectTableSink;
    private final ResultRetrievalThread retrievalThread;
    private CompletableFuture<JobExecutionResult> jobExecutionResultFuture;
    private final Object resultLock;
    private AtomicReference<SqlExecutionException> executionException = new AtomicReference();
    private final List<Tuple2<Boolean, Row>> changeRecordBuffer;
    private final int maxBufferSize;

    ChangelogResult(RowTypeInfo outputType, TableSchema tableSchema, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort, int maxBufferSize) throws SqlExecutionException {
        this.resultLock = new Object();
        TypeInformation socketType = Types.TUPLE((TypeInformation[])new TypeInformation[]{Types.BOOLEAN, outputType});
        TypeSerializer serializer = socketType.createSerializer(config);
        try {
            this.iterator = new SocketStreamIterator(gatewayPort, gatewayAddress, serializer);
        }
        catch (IOException e) {
            throw new SqlExecutionException(FlinkErrorCodeSummary.NOT_SOCKET_RETRIEVAL.getErrorDesc(), (Throwable)e);
        }
        this.collectTableSink = new CollectStreamTableSink(this.iterator.getBindAddress(), this.iterator.getPort(), (TypeSerializer<Tuple2<Boolean, Row>>)serializer, tableSchema);
        this.retrievalThread = new ResultRetrievalThread();
        this.changeRecordBuffer = new ArrayList<Tuple2<Boolean, Row>>();
        this.maxBufferSize = maxBufferSize;
    }

    @Override
    public void startRetrieval(JobClient jobClient) {
        this.retrievalThread.setName(jobClient.getJobID().toHexString() + "-JobResult-Fetch-Thread");
        this.retrievalThread.start();
        this.jobExecutionResultFuture = ((CompletableFuture)CompletableFuture.completedFuture(jobClient).thenCompose(JobClient::getJobExecutionResult)).whenComplete((unused, throwable) -> {
            if (throwable != null) {
                LOG.warn("throwable is not null", throwable);
                this.executionException.compareAndSet(null, new SqlExecutionException(FlinkErrorCodeSummary.ERROR_SUBMITTING_JOB.getErrorDesc(), throwable));
            } else {
                LOG.warn("throwable is null");
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges() throws SqlExecutionException {
        Object object = this.resultLock;
        synchronized (object) {
            if (this.isRetrieving() && this.executionException.get() == null) {
                if (this.changeRecordBuffer.isEmpty()) {
                    return TypedResult.empty();
                }
                ArrayList<Tuple2<Boolean, Row>> change = new ArrayList<Tuple2<Boolean, Row>>(this.changeRecordBuffer);
                this.changeRecordBuffer.clear();
                this.resultLock.notifyAll();
                return TypedResult.payload(change);
            }
            if (!this.isRetrieving() && !this.changeRecordBuffer.isEmpty()) {
                ArrayList<Tuple2<Boolean, Row>> change = new ArrayList<Tuple2<Boolean, Row>>(this.changeRecordBuffer);
                this.changeRecordBuffer.clear();
                return TypedResult.payload(change);
            }
            return this.handleMissingResult();
        }
    }

    @Override
    public TableSink<?> getTableSink() {
        return this.collectTableSink;
    }

    @Override
    public void close() {
        this.retrievalThread.isRunning = false;
        this.iterator.close();
    }

    private <T> TypedResult<T> handleMissingResult() throws SqlExecutionException {
        if (!this.jobExecutionResultFuture.isDone()) {
            return TypedResult.empty();
        }
        if (this.executionException.get() != null) {
            throw this.executionException.get();
        }
        return TypedResult.endOfStream();
    }

    private boolean isRetrieving() {
        return this.retrievalThread.isRunning;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processRecord(Tuple2<Boolean, Row> change) {
        Object object = this.resultLock;
        synchronized (object) {
            while (this.changeRecordBuffer.size() >= this.maxBufferSize) {
                try {
                    this.getFlinkStreamingResultSetListeners().forEach(listener -> listener.onResultSetPulled(this.changeRecordBuffer.size()));
                    this.resultLock.wait();
                }
                catch (InterruptedException interruptedException) {}
            }
            this.changeRecordBuffer.add(change);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Accept the streaming result, row is {}.", change.f1);
        }
    }

    private class ResultRetrievalThread
    extends Thread {
        volatile boolean isRunning = true;
        private boolean isStatusListenersNotified = false;

        private ResultRetrievalThread() {
        }

        @Override
        public void run() {
            int rows = 0;
            try {
                while (this.isRunning && ChangelogResult.this.iterator.hasNext()) {
                    Tuple2 change = (Tuple2)ChangelogResult.this.iterator.next();
                    ChangelogResult.this.processRecord((Tuple2<Boolean, Row>)change);
                    ++rows;
                }
            }
            catch (Exception e) {
                LOG.warn(this.getName() + " has finished with an error, ignore it.", (Throwable)e);
            }
            if (!ChangelogResult.this.changeRecordBuffer.isEmpty()) {
                this.dealOrFailed(() -> {
                    ChangelogResult.this.getFlinkStreamingResultSetListeners().forEach(listener -> listener.onResultSetPulled(ChangelogResult.this.changeRecordBuffer.size()));
                    return null;
                });
            }
            try {
                ChangelogResult.this.jobExecutionResultFuture.get();
            }
            catch (Exception e) {
                LOG.warn(this.getName() + " has finished with an error, ignore it.", (Throwable)e);
            }
            int totalRows = rows;
            LOG.warn("executionException is", (Throwable)ChangelogResult.this.executionException.get());
            if (!this.isStatusListenersNotified) {
                this.dealOrFailed(() -> {
                    SqlExecutionException exception = (SqlExecutionException)((Object)((Object)ChangelogResult.this.executionException.get()));
                    if (exception != null) {
                        ChangelogResult.this.getFlinkStatusListeners().forEach(listener -> listener.onFailed(ExceptionUtils.getRootCauseMessage((Throwable)exception), exception, RowsType.Fetched()));
                    } else {
                        ChangelogResult.this.getFlinkStatusListeners().forEach(listener -> listener.onSuccess(totalRows, RowsType.Fetched()));
                    }
                    return null;
                });
            }
            this.isRunning = false;
        }

        private void dealOrFailed(Supplier<Void> supplier) {
            try {
                supplier.get();
            }
            catch (Exception e) {
                LOG.error("Listener execute failed!", (Throwable)e);
                this.isStatusListenersNotified = true;
                ChangelogResult.this.getFlinkStatusListeners().forEach(listener -> listener.onFailed(ExceptionUtils.getRootCauseMessage((Throwable)e), e, RowsType.Fetched()));
            }
        }
    }
}

