/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util.asyncprocessing;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.assertj.core.api.Assertions;

public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
extends KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> {
    private final List<Input<IN>> inputs = new ArrayList<Input<IN>>();
    private long currentWatermark;
    private final ExecutorService executor;

    public static <K, IN, OUT, OP extends AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>> OP create(FunctionWithException<ExecutorService, OP, Exception> constructor) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture future = new CompletableFuture();
        executor.execute(() -> {
            try {
                future.complete((AsyncKeyedOneInputStreamOperatorTestHarness)constructor.apply((Object)executor));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return (OP)((AsyncKeyedOneInputStreamOperatorTestHarness)future.get());
    }

    public static <K, IN, OUT> AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> create(OneInputStreamOperator<IN, OUT> operator, KeySelector<IN, K> keySelector, TypeInformation<K> keyType) throws Exception {
        return AsyncKeyedOneInputStreamOperatorTestHarness.create(operator, keySelector, keyType, 1, 1, 0);
    }

    public static <K, IN, OUT> AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> create(OneInputStreamOperator<IN, OUT> operator, KeySelector<IN, K> keySelector, TypeInformation<K> keyType, int maxParallelism, int numSubtasks, int subtaskIndex) throws Exception {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CompletableFuture future = new CompletableFuture();
        executorService.execute(() -> {
            try {
                future.complete(new AsyncKeyedOneInputStreamOperatorTestHarness(executorService, SimpleOperatorFactory.of((StreamOperator)operator), keySelector, keyType, maxParallelism, numSubtasks, subtaskIndex));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return (AsyncKeyedOneInputStreamOperatorTestHarness)future.get();
    }

    protected AsyncKeyedOneInputStreamOperatorTestHarness(ExecutorService executor, StreamOperatorFactory<OUT> operatorFactory, KeySelector<IN, K> keySelector, TypeInformation<K> keyType, int maxParallelism, int numSubtasks, int subtaskIndex) throws Exception {
        super(operatorFactory, keySelector, keyType, maxParallelism, numSubtasks, subtaskIndex);
        ClosureCleaner.clean(keySelector, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)false);
        this.config.setStatePartitioner(0, keySelector);
        this.config.setStateKeySerializer(keyType.createSerializer(this.executionConfig.getSerializerConfig()));
        this.config.serializeAllConfigs();
        this.executor = executor;
        this.getEnvironment().setExpectedExternalFailureCause(Throwable.class);
    }

    @Override
    public void setup(TypeSerializer<OUT> outputSerializer) {
        super.setup(outputSerializer);
        if (this.operator instanceof MultipleInputStreamOperator) {
            Preconditions.checkState((boolean)this.inputs.isEmpty());
            this.inputs.addAll(((MultipleInputStreamOperator)this.operator).getInputs());
        }
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        this.finishFuture(this.processElementInternal(element));
    }

    public CompletableFuture<Void> processElementInternal(StreamRecord<IN> element) throws Exception {
        if (this.inputs.isEmpty()) {
            return this.execute(() -> RecordProcessorUtils.getRecordProcessor(this.getOneInputOperator()).accept((Object)element));
        }
        Preconditions.checkState((this.inputs.size() == 1 ? 1 : 0) != 0);
        Input input = this.inputs.get(0);
        return this.execute(() -> RecordProcessorUtils.getRecordProcessor((Input)input).accept((Object)element));
    }

    @Override
    public void processWatermark(long watermark) throws Exception {
        this.finishFuture(this.processWatermarkInternal(watermark));
    }

    public CompletableFuture<Void> processWatermarkInternal(long watermark) {
        return this.processWatermarkInternal(new Watermark(watermark));
    }

    @Override
    public void processWatermarkStatus(WatermarkStatus status) throws Exception {
        this.finishFuture(this.processWatermarkStatusInternal(status));
    }

    public CompletableFuture<Void> processWatermarkStatusInternal(WatermarkStatus status) {
        if (this.inputs.isEmpty()) {
            return this.execute(() -> this.getOneInputOperator().processWatermarkStatus(status));
        }
        Preconditions.checkState((this.inputs.size() == 1 ? 1 : 0) != 0);
        Input input = this.inputs.get(0);
        return this.execute(() -> input.processWatermarkStatus(status));
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        this.finishFuture(this.processWatermarkInternal(mark));
    }

    @Override
    public void endInput() throws Exception {
        if (this.operator instanceof BoundedOneInput) {
            this.executeAndGet(() -> ((BoundedOneInput)this.operator).endInput());
        }
    }

    public CompletableFuture<Void> processWatermarkInternal(Watermark mark) {
        this.currentWatermark = mark.getTimestamp();
        if (this.inputs.isEmpty()) {
            return this.execute(() -> this.getOneInputOperator().processWatermark(mark));
        }
        Preconditions.checkState((this.inputs.size() == 1 ? 1 : 0) != 0);
        Input input = this.inputs.get(0);
        return this.execute(() -> input.processWatermark(mark));
    }

    public void processLatencyMarker(LatencyMarker marker) throws Exception {
        this.finishFuture(this.processLatencyMarkerInternal(marker));
    }

    public CompletableFuture<Void> processLatencyMarkerInternal(LatencyMarker marker) {
        if (this.inputs.isEmpty()) {
            return this.execute(() -> this.getOneInputOperator().processLatencyMarker(marker));
        }
        Preconditions.checkState((this.inputs.size() == 1 ? 1 : 0) != 0);
        Input input = this.inputs.get(0);
        return this.execute(() -> input.processLatencyMarker(marker));
    }

    @Override
    public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
        this.finishFuture(this.processRecordAttributesInternal(recordAttributes));
    }

    @Override
    public long getCurrentWatermark() {
        return this.currentWatermark;
    }

    public CompletableFuture<Void> processRecordAttributesInternal(RecordAttributes recordAttributes) {
        if (this.inputs.isEmpty()) {
            return this.execute(() -> this.getOneInputOperator().processRecordAttributes(recordAttributes));
        }
        Preconditions.checkState((this.inputs.size() == 1 ? 1 : 0) != 0);
        Input input = this.inputs.get(0);
        return this.execute(() -> input.processRecordAttributes(recordAttributes));
    }

    public void drainStateRequests() throws Exception {
        this.executeAndGet(() -> AsyncProcessingTestUtil.drain(this.operator));
    }

    @Override
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        this.executeAndGet(() -> this.operator.prepareSnapshotPreBarrier(checkpointId));
    }

    @Override
    public void close() throws Exception {
        this.executeAndGet(() -> super.close());
        this.executor.shutdown();
    }

    private CompletableFuture<Void> execute(RunnableWithException runnable) {
        return AsyncProcessingTestUtil.execute(this.executor, () -> {
            this.checkEnvState();
            runnable.run();
        });
    }

    private void executeAndGet(RunnableWithException runnable) throws Exception {
        this.finishFuture(this.execute(runnable));
    }

    private void finishFuture(CompletableFuture<Void> future) throws Exception {
        try {
            future.get();
            this.checkEnvState();
        }
        catch (Exception e) {
            AsyncProcessingTestUtil.execute(this.executor, () -> this.mockTask.cleanUp(e)).get();
            throw AsyncProcessingTestUtil.unwrapAsyncException(e);
        }
    }

    private void checkEnvState() {
        if (this.getEnvironment().getActualExternalFailureCause().isPresent()) {
            Assertions.fail((String)"There is an error on other threads", (Throwable)this.getEnvironment().getActualExternalFailureCause().get());
        }
    }
}

