/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.lang.invoke.CallSite;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.streaming.api.functions.source.legacy.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class AbstractUdfStreamOperatorLifecycleTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final List<String> EXPECTED_CALL_ORDER_FULL = Arrays.asList("OPERATOR::setup", "UDF::setRuntimeContext", "OPERATOR::initializeState", "OPERATOR::open", "UDF::open", "OPERATOR::run", "UDF::run", "OPERATOR::prepareSnapshotPreBarrier", "OPERATOR::snapshotState", "OPERATOR::finish", "OPERATOR::close", "UDF::close");
    private static final List<String> EXPECTED_CALL_ORDER_CANCEL_RUNNING = Arrays.asList("OPERATOR::setup", "UDF::setRuntimeContext", "OPERATOR::initializeState", "OPERATOR::open", "UDF::open", "OPERATOR::run", "UDF::run", "OPERATOR::cancel", "UDF::cancel", "OPERATOR::close", "UDF::close");
    private static final String ALL_METHODS_STREAM_OPERATOR = "[close[], finish[], getCurrentKey[], getMetricGroup[], getOperatorAttributes[], getOperatorID[], initializeState[interface org.apache.flink.streaming.api.operators.StreamTaskStateInitializer], notifyCheckpointAborted[long], notifyCheckpointComplete[long], open[], prepareSnapshotPreBarrier[long], setCurrentKey[class java.lang.Object], setKeyContextElement1[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord], setKeyContextElement2[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord], snapshotState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions, interface org.apache.flink.runtime.state.CheckpointStreamFactory]]";
    private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[], open[interface org.apache.flink.api.common.functions.OpenContext], setRuntimeContext[interface org.apache.flink.api.common.functions.RuntimeContext]]";
    private static final List<String> ACTUAL_ORDER_TRACKING = Collections.synchronizedList(new ArrayList(EXPECTED_CALL_ORDER_FULL.size()));

    AbstractUdfStreamOperatorLifecycleTest() {
    }

    @Test
    void testAllMethodsRegisteredInTest() {
        ArrayList<CallSite> methodsWithSignatureString = new ArrayList<CallSite>();
        for (Method method : StreamOperator.class.getMethods()) {
            methodsWithSignatureString.add((CallSite)((Object)(method.getName() + Arrays.toString(method.getParameterTypes()))));
        }
        Collections.sort(methodsWithSignatureString);
        ((ListAssert)Assertions.assertThat(methodsWithSignatureString).as("It seems like new methods have been introduced to " + String.valueOf(StreamOperator.class) + ". Please register them with this test and ensure to document their position in the lifecycle (if applicable).", new Object[0])).hasToString(ALL_METHODS_STREAM_OPERATOR);
        methodsWithSignatureString = new ArrayList();
        for (Method method : RichFunction.class.getMethods()) {
            methodsWithSignatureString.add((CallSite)((Object)(method.getName() + Arrays.toString(method.getParameterTypes()))));
        }
        Collections.sort(methodsWithSignatureString);
        ((ListAssert)Assertions.assertThat(methodsWithSignatureString).as("It seems like new methods have been introduced to " + String.valueOf(RichFunction.class) + ". Please register them with this test and ensure to document their position in the lifecycle (if applicable).", new Object[0])).hasToString(ALL_METHODS_RICH_FUNCTION);
    }

    @Test
    void testLifeCycleFull() throws Exception {
        ACTUAL_ORDER_TRACKING.clear();
        Configuration taskManagerConfig = new Configuration();
        StreamConfig cfg = new StreamConfig(new Configuration());
        MockSourceFunction srcFun = new MockSourceFunction();
        cfg.setStreamOperator(new LifecycleTrackingStreamSource(srcFun, true));
        cfg.setOperatorID(new OperatorID());
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = StreamTaskTest.createTask(SourceStreamTask.class, (ShuffleEnvironment)shuffleEnvironment, cfg, taskManagerConfig, EXECUTOR_RESOURCE.getExecutor());
            task.startTaskThread();
            LifecycleTrackingStreamSource.runStarted.await();
            task.getExecutingThread().join();
            Assertions.assertThat((Comparable)task.getExecutionState()).isEqualTo((Object)ExecutionState.FINISHED);
            Assertions.assertThat(ACTUAL_ORDER_TRACKING).isEqualTo(EXPECTED_CALL_ORDER_FULL);
        }
    }

    @Test
    void testLifeCycleCancel() throws Exception {
        ACTUAL_ORDER_TRACKING.clear();
        Configuration taskManagerConfig = new Configuration();
        StreamConfig cfg = new StreamConfig(new Configuration());
        MockSourceFunction srcFun = new MockSourceFunction();
        cfg.setStreamOperator(new LifecycleTrackingStreamSource(srcFun, false));
        cfg.setOperatorID(new OperatorID());
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = StreamTaskTest.createTask(SourceStreamTask.class, (ShuffleEnvironment)shuffleEnvironment, cfg, taskManagerConfig, EXECUTOR_RESOURCE.getExecutor());
            task.startTaskThread();
            LifecycleTrackingStreamSource.runStarted.await();
            task.cancelExecution();
            task.getExecutingThread().join();
            Assertions.assertThat((Comparable)task.getExecutionState()).isEqualTo((Object)ExecutionState.CANCELED);
            Assertions.assertThat(ACTUAL_ORDER_TRACKING).isEqualTo(EXPECTED_CALL_ORDER_CANCEL_RUNNING);
        }
    }

    private static class LifecycleTrackingStreamSource<OUT, SRC extends SourceFunction<OUT>>
    extends StreamSource<OUT, SRC>
    implements Serializable {
        private static final long serialVersionUID = 2431488948886850562L;
        private transient Thread testCheckpointer;
        private final boolean simulateCheckpointing;
        static OneShotLatch runStarted;
        static OneShotLatch runFinish;

        public LifecycleTrackingStreamSource(SRC sourceFunction, boolean simulateCheckpointing) {
            super(sourceFunction);
            this.simulateCheckpointing = simulateCheckpointing;
            runStarted = new OneShotLatch();
            runFinish = new OneShotLatch();
        }

        public void run(Object lockingObject, Output<StreamRecord<OUT>> collector, OperatorChain<?, ?> operatorChain) throws Exception {
            ACTUAL_ORDER_TRACKING.add("OPERATOR::run");
            super.run(lockingObject, collector, operatorChain);
            runStarted.trigger();
            runFinish.await();
        }

        protected void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
            ACTUAL_ORDER_TRACKING.add("OPERATOR::setup");
            super.setup(containingTask, config, output);
            if (this.simulateCheckpointing) {
                this.testCheckpointer = new Thread(){

                    @Override
                    public void run() {
                        try {
                            runStarted.await();
                            if (this.getContainingTask().isCanceled() || ((Boolean)this.getContainingTask().triggerCheckpointAsync(new CheckpointMetaData(0L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation()).get()).booleanValue()) {
                                runFinish.trigger();
                            }
                        }
                        catch (Exception e) {
                            org.junit.jupiter.api.Assertions.fail((Throwable)e);
                        }
                    }
                };
                this.testCheckpointer.start();
            }
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotState");
            super.snapshotState(context);
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            ACTUAL_ORDER_TRACKING.add("OPERATOR::initializeState");
            super.initializeState(context);
        }

        public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
            ACTUAL_ORDER_TRACKING.add("OPERATOR::prepareSnapshotPreBarrier");
            super.prepareSnapshotPreBarrier(checkpointId);
        }

        public void open() throws Exception {
            ACTUAL_ORDER_TRACKING.add("OPERATOR::open");
            super.open();
        }

        public void finish() throws Exception {
            ACTUAL_ORDER_TRACKING.add("OPERATOR::finish");
            super.finish();
        }

        public void close() throws Exception {
            ACTUAL_ORDER_TRACKING.add("OPERATOR::close");
            super.close();
        }

        public void cancel() {
            ACTUAL_ORDER_TRACKING.add("OPERATOR::cancel");
            super.cancel();
        }
    }

    private static class MockSourceFunction
    extends RichSourceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private MockSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<Long> ctx) {
            ACTUAL_ORDER_TRACKING.add("UDF::run");
        }

        public void cancel() {
            ACTUAL_ORDER_TRACKING.add("UDF::cancel");
        }

        public void setRuntimeContext(RuntimeContext t) {
            ACTUAL_ORDER_TRACKING.add("UDF::setRuntimeContext");
            super.setRuntimeContext(t);
        }

        public void open(OpenContext openContext) throws Exception {
            ACTUAL_ORDER_TRACKING.add("UDF::open");
            super.open(openContext);
        }

        public void close() throws Exception {
            ACTUAL_ORDER_TRACKING.add("UDF::close");
            super.close();
        }
    }
}

