/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util.asyncprocessing;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorV2;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.util.function.RunnableWithException;

public class AsyncProcessingTestUtil {
    public static <OUT> void drain(StreamOperator<OUT> operator) {
        if (operator instanceof AbstractAsyncStateStreamOperator) {
            ((AbstractAsyncStateStreamOperator)operator).drainStateRequests();
        } else if (operator instanceof AbstractAsyncStateStreamOperatorV2) {
            ((AbstractAsyncStateStreamOperatorV2)operator).getAsyncExecutionController().drainInflightRecords(0);
        } else {
            throw new IllegalStateException("Operator is not an AsyncStateProcessingOperator");
        }
    }

    public static CompletableFuture<Void> execute(ExecutorService executor, RunnableWithException processor) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        executor.execute(() -> {
            try {
                processor.run();
                future.complete(null);
            }
            catch (Exception e) {
                future.completeExceptionally(e);
            }
        });
        return future;
    }

    public static Exception unwrapAsyncException(Exception t) {
        while (t != null && t.getCause() != null && t.getCause() != t && (t instanceof ExecutionException || t instanceof RuntimeException) && t.getCause() instanceof Exception) {
            t = (Exception)t.getCause();
        }
        return t;
    }
}

