package org.apache.flink.streaming.runtime.operators.asyncprocessing;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperatorTest;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.class */
public class AbstractAsyncStateStreamOperatorV2Test {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test$KeyedOneInputStreamOperatorV2TestHarness.class */
    public static class KeyedOneInputStreamOperatorV2TestHarness<K, IN, OUT> extends KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> {
        public KeyedOneInputStreamOperatorV2TestHarness(StreamOperatorFactory<OUT> streamOperatorFactory, KeySelector<IN, K> keySelector, TypeInformation<K> typeInformation, int i, int i2, int i3) throws Exception {
            super(streamOperatorFactory, keySelector, typeInformation, i, i2, i3);
        }

        public StreamOperator<OUT> getBaseOperator() {
            return this.operator;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test$SingleInputTestOperator.class */
    private static class SingleInputTestOperator extends AbstractAsyncStateStreamOperatorV2<String> implements MultipleInputStreamOperator<String>, Triggerable<Integer, VoidNamespace> {
        private static final long serialVersionUID = 1;
        final AtomicInteger processed;
        private final ElementOrder elementOrder;
        final Object objectToWait;
        final Input input;

        public SingleInputTestOperator(StreamOperatorParameters<String> streamOperatorParameters, ElementOrder elementOrder) {
            super(streamOperatorParameters, 1);
            this.processed = new AtomicInteger(0);
            this.objectToWait = new Object();
            this.elementOrder = elementOrder;
            this.input = new AbstractInput<Tuple2<Integer, String>, String>(this, 1) { // from class: org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperatorV2Test.SingleInputTestOperator.1
                public void processElement(StreamRecord<Tuple2<Integer, String>> streamRecord) throws Exception {
                    SingleInputTestOperator.this.processed.incrementAndGet();
                    synchronized (SingleInputTestOperator.this.objectToWait) {
                        SingleInputTestOperator.this.objectToWait.wait();
                    }
                }
            };
        }

        public void open() throws Exception {
            super.open();
        }

        public List<Input> getInputs() {
            return Collections.singletonList(this.input);
        }

        public ElementOrder getElementOrder() {
            return this.elementOrder;
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
        }

        public int getProcessed() {
            return this.processed.get();
        }

        public void proceed() {
            synchronized (this.objectToWait) {
                this.objectToWait.notify();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test$TestOperatorFactory.class */
    public static class TestOperatorFactory extends AbstractStreamOperatorFactory<String> {
        private final ElementOrder elementOrder;

        TestOperatorFactory(ElementOrder elementOrder) {
            this.elementOrder = elementOrder;
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new SingleInputTestOperator(streamOperatorParameters, this.elementOrder);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SingleInputTestOperator.class;
        }
    }

    protected KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness(int i, int i2, int i3, ElementOrder elementOrder) throws Exception {
        return new KeyedOneInputStreamOperatorV2TestHarness<>(new TestOperatorFactory(elementOrder), new AbstractAsyncStateStreamOperatorTest.TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, i, i2, i3);
    }

    @Test
    public void testCreateAsyncExecutionController() throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        Throwable th = null;
        try {
            createTestHarness.open();
            Assertions.assertThat(createTestHarness.getBaseOperator()).isInstanceOf(AbstractAsyncStateStreamOperatorV2.class);
            Assertions.assertThat(createTestHarness.getBaseOperator().getAsyncExecutionController()).isNotNull();
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRecordProcessorWithFirstStateOrder() throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.FIRST_STATE_ORDER);
        Throwable th = null;
        try {
            createTestHarness.open();
            SingleInputTestOperator baseOperator = createTestHarness.getBaseOperator();
            ThrowingConsumer recordProcessor = RecordProcessorUtils.getRecordProcessor(baseOperator.getInputs().get(0));
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            newSingleThreadExecutor.execute(() -> {
                try {
                    recordProcessor.accept(new StreamRecord(Tuple2.of(5, "5")));
                } catch (Exception e) {
                }
            });
            Thread.sleep(1000L);
            Assertions.assertThat(baseOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat(baseOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(1);
            baseOperator.proceed();
            newSingleThreadExecutor.shutdown();
            Thread.sleep(1000L);
            Assertions.assertThat(baseOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRecordProcessorWithRecordOrder() throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        Throwable th = null;
        try {
            createTestHarness.open();
            SingleInputTestOperator baseOperator = createTestHarness.getBaseOperator();
            ThrowingConsumer recordProcessor = RecordProcessorUtils.getRecordProcessor(baseOperator.getInputs().get(0));
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            newSingleThreadExecutor.execute(() -> {
                try {
                    recordProcessor.accept(new StreamRecord(Tuple2.of(5, "5")));
                } catch (Exception e) {
                }
            });
            Thread.sleep(1000L);
            Assertions.assertThat(baseOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat(baseOperator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            baseOperator.proceed();
            newSingleThreadExecutor.shutdown();
            Thread.sleep(1000L);
            Assertions.assertThat(baseOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }
}
