package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.class */
class AbstractUdfStreamOperatorLifecycleTest {
    private static final String ALL_METHODS_STREAM_OPERATOR = "[close[], finish[], getCurrentKey[], getMetricGroup[], getOperatorAttributes[], getOperatorID[], initializeState[interface org.apache.flink.streaming.api.operators.StreamTaskStateInitializer], notifyCheckpointAborted[long], notifyCheckpointComplete[long], open[], prepareSnapshotPreBarrier[long], setCurrentKey[class java.lang.Object], setKeyContextElement1[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord], setKeyContextElement2[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord], snapshotState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions, interface org.apache.flink.runtime.state.CheckpointStreamFactory]]";
    private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[], open[class org.apache.flink.configuration.Configuration], open[interface org.apache.flink.api.common.functions.OpenContext], setRuntimeContext[interface org.apache.flink.api.common.functions.RuntimeContext]]";

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final List<String> EXPECTED_CALL_ORDER_FULL = Arrays.asList("OPERATOR::setup", "UDF::setRuntimeContext", "OPERATOR::initializeState", "OPERATOR::open", "UDF::open", "OPERATOR::run", "UDF::run", "OPERATOR::prepareSnapshotPreBarrier", "OPERATOR::snapshotState", "OPERATOR::finish", "OPERATOR::close", "UDF::close");
    private static final List<String> EXPECTED_CALL_ORDER_CANCEL_RUNNING = Arrays.asList("OPERATOR::setup", "UDF::setRuntimeContext", "OPERATOR::initializeState", "OPERATOR::open", "UDF::open", "OPERATOR::run", "UDF::run", "OPERATOR::cancel", "UDF::cancel", "OPERATOR::close", "UDF::close");
    private static final List<String> ACTUAL_ORDER_TRACKING = Collections.synchronizedList(new ArrayList(EXPECTED_CALL_ORDER_FULL.size()));

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest$LifecycleTrackingStreamSource.class */
    private static class LifecycleTrackingStreamSource<OUT, SRC extends SourceFunction<OUT>> extends StreamSource<OUT, SRC> implements Serializable {
        private static final long serialVersionUID = 2431488948886850562L;
        private transient Thread testCheckpointer;
        private final boolean simulateCheckpointing;
        static OneShotLatch runStarted;
        static OneShotLatch runFinish;

        public LifecycleTrackingStreamSource(SRC src, boolean z) {
            super(src);
            this.simulateCheckpointing = z;
            runStarted = new OneShotLatch();
            runFinish = new OneShotLatch();
        }

        public void run(Object obj, Output<StreamRecord<OUT>> output, OperatorChain<?, ?> operatorChain) throws Exception {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("OPERATOR::run");
            super.run(obj, output, operatorChain);
            runStarted.trigger();
            runFinish.await();
        }

        public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("OPERATOR::setup");
            super.setup(streamTask, streamConfig, output);
            if (this.simulateCheckpointing) {
                this.testCheckpointer = new Thread() { // from class: org.apache.flink.streaming.api.operators.AbstractUdfStreamOperatorLifecycleTest.LifecycleTrackingStreamSource.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            LifecycleTrackingStreamSource.runStarted.await();
                            if (LifecycleTrackingStreamSource.this.getContainingTask().isCanceled() || ((Boolean) LifecycleTrackingStreamSource.this.getContainingTask().triggerCheckpointAsync(new CheckpointMetaData(0L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation()).get()).booleanValue()) {
                                LifecycleTrackingStreamSource.runFinish.trigger();
                            }
                        } catch (Exception e) {
                            Assertions.fail(e);
                        }
                    }
                };
                this.testCheckpointer.start();
            }
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotState");
            super.snapshotState(stateSnapshotContext);
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("OPERATOR::initializeState");
            super.initializeState(stateInitializationContext);
        }

        public void prepareSnapshotPreBarrier(long j) throws Exception {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("OPERATOR::prepareSnapshotPreBarrier");
            super.prepareSnapshotPreBarrier(j);
        }

        public void open() throws Exception {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("OPERATOR::open");
            super.open();
        }

        public void finish() throws Exception {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("OPERATOR::finish");
            super.finish();
        }

        public void close() throws Exception {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("OPERATOR::close");
            super.close();
        }

        public void cancel() {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("OPERATOR::cancel");
            super.cancel();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest$MockSourceFunction.class */
    private static class MockSourceFunction extends RichSourceFunction<Long> {
        private static final long serialVersionUID = 1;

        private MockSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("UDF::run");
        }

        public void cancel() {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("UDF::cancel");
        }

        public void setRuntimeContext(RuntimeContext runtimeContext) {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("UDF::setRuntimeContext");
            super.setRuntimeContext(runtimeContext);
        }

        public void open(OpenContext openContext) throws Exception {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("UDF::open");
            super.open(openContext);
        }

        public void close() throws Exception {
            AbstractUdfStreamOperatorLifecycleTest.ACTUAL_ORDER_TRACKING.add("UDF::close");
            super.close();
        }
    }

    AbstractUdfStreamOperatorLifecycleTest() {
    }

    @Test
    void testAllMethodsRegisteredInTest() {
        ArrayList arrayList = new ArrayList();
        for (Method method : StreamOperator.class.getMethods()) {
            arrayList.add(method.getName() + Arrays.toString(method.getParameterTypes()));
        }
        Collections.sort(arrayList);
        org.assertj.core.api.Assertions.assertThat(arrayList).as("It seems like new methods have been introduced to " + StreamOperator.class + ". Please register them with this test and ensure to document their position in the lifecycle (if applicable).", new Object[0]).hasToString(ALL_METHODS_STREAM_OPERATOR);
        ArrayList arrayList2 = new ArrayList();
        for (Method method2 : RichFunction.class.getMethods()) {
            arrayList2.add(method2.getName() + Arrays.toString(method2.getParameterTypes()));
        }
        Collections.sort(arrayList2);
        org.assertj.core.api.Assertions.assertThat(arrayList2).as("It seems like new methods have been introduced to " + RichFunction.class + ". Please register them with this test and ensure to document their position in the lifecycle (if applicable).", new Object[0]).hasToString(ALL_METHODS_RICH_FUNCTION);
    }

    @Test
    void testLifeCycleFull() throws Exception {
        ACTUAL_ORDER_TRACKING.clear();
        Configuration configuration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setStreamOperator(new LifecycleTrackingStreamSource(new MockSourceFunction(), true));
        streamConfig.setOperatorID(new OperatorID());
        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        Throwable th = null;
        try {
            try {
                Task createTask = StreamTaskTest.createTask(SourceStreamTask.class, build, streamConfig, configuration, EXECUTOR_RESOURCE.getExecutor());
                createTask.startTaskThread();
                LifecycleTrackingStreamSource.runStarted.await();
                createTask.getExecutingThread().join();
                org.assertj.core.api.Assertions.assertThat(createTask.getExecutionState()).isEqualTo(ExecutionState.FINISHED);
                org.assertj.core.api.Assertions.assertThat(ACTUAL_ORDER_TRACKING).isEqualTo(EXPECTED_CALL_ORDER_FULL);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testLifeCycleCancel() throws Exception {
        ACTUAL_ORDER_TRACKING.clear();
        Configuration configuration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setStreamOperator(new LifecycleTrackingStreamSource(new MockSourceFunction(), false));
        streamConfig.setOperatorID(new OperatorID());
        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        Throwable th = null;
        try {
            try {
                Task createTask = StreamTaskTest.createTask(SourceStreamTask.class, build, streamConfig, configuration, EXECUTOR_RESOURCE.getExecutor());
                createTask.startTaskThread();
                LifecycleTrackingStreamSource.runStarted.await();
                createTask.cancelExecution();
                createTask.getExecutingThread().join();
                org.assertj.core.api.Assertions.assertThat(createTask.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
                org.assertj.core.api.Assertions.assertThat(ACTUAL_ORDER_TRACKING).isEqualTo(EXPECTED_CALL_ORDER_CANCEL_RUNNING);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }
}
