package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest.class */
class StreamTaskOperatorTimerTest {
    private static final String TRIGGER_PREFIX = "trigger:";
    private static final String RESULT_PREFIX = "timer:";

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest$TestOperator.class */
    private static class TestOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        private final transient MailboxExecutor mailboxExecutor;
        private final int chainIndex;
        private transient int count;

        TestOperator(int i, MailboxExecutor mailboxExecutor) {
            this.chainIndex = i;
            this.mailboxExecutor = mailboxExecutor;
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            if (!isTriggerEvent(streamRecord)) {
                this.output.collect(streamRecord);
                return;
            }
            int i = this.count;
            ProcessingTimeService processingTimeService = getProcessingTimeService();
            processingTimeService.registerTimer(processingTimeService.getCurrentProcessingTime() + 1000, j -> {
                this.output.collect(new StreamRecord(StreamTaskOperatorTimerTest.RESULT_PREFIX + this.chainIndex + ":" + i));
                this.count--;
            });
            this.count++;
            this.output.collect(streamRecord);
            while (this.count > 0) {
                this.mailboxExecutor.yield();
            }
        }

        private static boolean isTriggerEvent(StreamRecord<String> streamRecord) {
            if (streamRecord.isRecord()) {
                return ((String) streamRecord.getValue()).startsWith(StreamTaskOperatorTimerTest.TRIGGER_PREFIX);
            }
            return false;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamTaskOperatorTimerTest$TestOperatorFactory.class */
    private static class TestOperatorFactory extends AbstractStreamOperatorFactory<String> implements OneInputStreamOperatorFactory<String, String>, YieldingOperatorFactory<String> {
        private MailboxExecutor mailboxExecutor;

        private TestOperatorFactory() {
        }

        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
            this.mailboxExecutor = mailboxExecutor;
        }

        public <Operator extends StreamOperator<String>> Operator createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            TestOperator testOperator = new TestOperator(streamOperatorParameters.getStreamConfig().getChainIndex(), this.mailboxExecutor);
            testOperator.setProcessingTimeService(this.processingTimeService);
            testOperator.setup(streamOperatorParameters.getContainingTask(), streamOperatorParameters.getStreamConfig(), streamOperatorParameters.getOutput());
            return testOperator;
        }

        public void setChainingStrategy(ChainingStrategy chainingStrategy) {
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return TestOperator.class;
        }
    }

    StreamTaskOperatorTimerTest() {
    }

    @Test
    void testOperatorYieldExecutesSelectedTimers() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOperatorChain(new OperatorID(), (StreamOperatorFactory<?>) new TestOperatorFactory()).chain(new OperatorID(), (OneInputStreamOperatorFactory) new TestOperatorFactory(), (TypeSerializer) StringSerializer.INSTANCE).finish();
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("trigger:42"));
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        ArrayList arrayList = new ArrayList();
        oneInputStreamTaskTestHarness.getOutput().forEach(obj -> {
            arrayList.add(((StreamRecord) obj).getValue());
        });
        Assertions.assertThat(arrayList).containsExactly(new String[]{"trigger:42", "timer:1:0", "timer:0:0"});
    }
}
