package org.apache.flink.streaming.runtime.operators.asyncprocessing;

import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.class */
public class AbstractAsyncStateStreamOperatorTest {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest$TestKeySelector.class */
    public static class TestKeySelector implements KeySelector<Tuple2<Integer, String>, Integer> {
        private static final long serialVersionUID = 1;

        public Integer getKey(Tuple2<Integer, String> tuple2) {
            return (Integer) tuple2.f0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest$TestOperator.class */
    public static class TestOperator extends AbstractAsyncStateStreamOperator<String> implements OneInputStreamOperator<Tuple2<Integer, String>, String>, Triggerable<Integer, VoidNamespace> {
        private static final long serialVersionUID = 1;
        private final ElementOrder elementOrder;
        final AtomicInteger processed = new AtomicInteger(0);
        final Object objectToWait = new Object();

        TestOperator(ElementOrder elementOrder) {
            this.elementOrder = elementOrder;
        }

        public void open() throws Exception {
            super.open();
        }

        public ElementOrder getElementOrder() {
            return this.elementOrder;
        }

        public void processElement(StreamRecord<Tuple2<Integer, String>> streamRecord) throws Exception {
            this.processed.incrementAndGet();
            synchronized (this.objectToWait) {
                this.objectToWait.wait();
            }
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
        }

        public int getProcessed() {
            return this.processed.get();
        }

        public void proceed() {
            synchronized (this.objectToWait) {
                this.objectToWait.notify();
            }
        }
    }

    protected KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness(int i, int i2, int i3, ElementOrder elementOrder) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness<>((OneInputStreamOperator) new TestOperator(elementOrder), (KeySelector) new TestKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO, i, i2, i3);
    }

    @Test
    public void testCreateAsyncExecutionController() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        Throwable th = null;
        try {
            createTestHarness.open();
            Assertions.assertThat(createTestHarness.getOperator()).isInstanceOf(AbstractAsyncStateStreamOperator.class);
            Assertions.assertThat(createTestHarness.getOperator().getAsyncExecutionController()).isNotNull();
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRecordProcessorWithFirstStateOrder() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.FIRST_STATE_ORDER);
        Throwable th = null;
        try {
            createTestHarness.open();
            TestOperator operator = createTestHarness.getOperator();
            ThrowingConsumer recordProcessor = RecordProcessorUtils.getRecordProcessor(operator);
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            newSingleThreadExecutor.execute(() -> {
                try {
                    recordProcessor.accept(new StreamRecord(Tuple2.of(5, "5")));
                } catch (Exception e) {
                }
            });
            Thread.sleep(1000L);
            Assertions.assertThat(operator.getProcessed()).isEqualTo(1);
            Assertions.assertThat(operator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(1);
            operator.proceed();
            newSingleThreadExecutor.shutdown();
            Thread.sleep(1000L);
            Assertions.assertThat(operator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRecordProcessorWithRecordOrder() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        Throwable th = null;
        try {
            createTestHarness.open();
            TestOperator operator = createTestHarness.getOperator();
            ThrowingConsumer recordProcessor = RecordProcessorUtils.getRecordProcessor(operator);
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            newSingleThreadExecutor.execute(() -> {
                try {
                    recordProcessor.accept(new StreamRecord(Tuple2.of(5, "5")));
                } catch (Exception e) {
                }
            });
            Thread.sleep(1000L);
            Assertions.assertThat(operator.getProcessed()).isEqualTo(1);
            Assertions.assertThat(operator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            operator.proceed();
            newSingleThreadExecutor.shutdown();
            Thread.sleep(1000L);
            Assertions.assertThat(operator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testCheckpointDrain() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        Throwable th = null;
        try {
            createTestHarness.open();
            CheckpointStorageLocationReference checkpointStorageLocationReference = CheckpointStorageLocationReference.getDefault();
            AsyncExecutionController asyncExecutionController = createTestHarness.getOperator().getAsyncExecutionController();
            asyncExecutionController.setStateExecutor(new StateExecutor() { // from class: org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperatorTest.1
                public CompletableFuture<Boolean> executeBatchRequests(Iterable<StateRequest<?, ?, ?>> iterable) {
                    Iterator<StateRequest<?, ?, ?>> it = iterable.iterator();
                    while (it.hasNext()) {
                        it.next().getFuture().complete(true);
                    }
                    return CompletableFuture.completedFuture(true);
                }
            });
            createTestHarness.getOperator().setAsyncKeyedContextElement(new StreamRecord(Tuple2.of(5, "5")), new TestKeySelector());
            asyncExecutionController.handleRequest((State) null, StateRequestType.VALUE_GET, (Object) null);
            createTestHarness.getOperator().postProcessElement();
            Assertions.assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(1);
            createTestHarness.getOperator().snapshotState(1L, 1L, new CheckpointOptions(CheckpointType.CHECKPOINT, checkpointStorageLocationReference), new JobManagerCheckpointStorage().createCheckpointStorage(new JobID()).resolveCheckpointStorageLocation(1L, checkpointStorageLocationReference));
            Assertions.assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(0);
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testTimerServiceIsAsync() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        Throwable th = null;
        try {
            createTestHarness.open();
            Assertions.assertThat(createTestHarness.getOperator()).isInstanceOf(AbstractAsyncStateStreamOperator.class);
            Assertions.assertThat(createTestHarness.getOperator().getInternalTimerService("test", VoidNamespaceSerializer.INSTANCE, new Triggerable() { // from class: org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperatorTest.2
                public void onEventTime(InternalTimer internalTimer) throws Exception {
                }

                public void onProcessingTime(InternalTimer internalTimer) throws Exception {
                }
            })).isInstanceOf(InternalTimerServiceAsyncImpl.class);
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }
}
