package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.class */
class StreamSourceContextIdleDetectionTests {
    private TestMethod testMethod;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.api.operators.StreamSourceContextIdleDetectionTests$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$api$operators$StreamSourceContextIdleDetectionTests$TestMethod = new int[TestMethod.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$StreamSourceContextIdleDetectionTests$TestMethod[TestMethod.COLLECT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$StreamSourceContextIdleDetectionTests$TestMethod[TestMethod.COLLECT_WITH_TIMESTAMP.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$StreamSourceContextIdleDetectionTests$TestMethod[TestMethod.EMIT_WATERMARK.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests$TestMethod.class */
    public enum TestMethod {
        COLLECT,
        COLLECT_WITH_TIMESTAMP,
        EMIT_WATERMARK
    }

    public StreamSourceContextIdleDetectionTests(TestMethod testMethod) {
        this.testMethod = testMethod;
    }

    @TestTemplate
    void testManualWatermarkContext() throws Exception {
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        testProcessingTimeService.setCurrentTime(0L);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        SourceFunction.SourceContext<String> sourceContext = StreamSourceContexts.getSourceContext(TimeCharacteristic.EventTime, testProcessingTimeService, new Object(), new CollectorOutput(arrayList), 0L, 100L, true);
        testProcessingTimeService.setCurrentTime(0 + 100);
        arrayList2.add(WatermarkStatus.IDLE);
        Assertions.assertThat(arrayList).isEqualTo(arrayList2);
        testProcessingTimeService.setCurrentTime(0 + (2 * 100));
        testProcessingTimeService.setCurrentTime(0 + (3 * 100));
        Assertions.assertThat(arrayList).isEqualTo(arrayList2);
        arrayList2.add(WatermarkStatus.ACTIVE);
        emitStreamElement(0 + (3 * 100) + (100 / 10), arrayList2, testProcessingTimeService, sourceContext);
        Assertions.assertThat(arrayList).isEqualTo(arrayList2);
        emitStreamElement(0 + (3 * 100) + ((2 * 100) / 10), arrayList2, testProcessingTimeService, sourceContext);
        Assertions.assertThat(arrayList).isEqualTo(arrayList2);
        testProcessingTimeService.setCurrentTime(0 + (4 * 100) + (100 / 10));
        Assertions.assertThat(arrayList).isEqualTo(arrayList2);
        testProcessingTimeService.setCurrentTime(0 + (5 * 100) + (100 / 10));
        arrayList2.add(WatermarkStatus.IDLE);
        Assertions.assertThat(arrayList).isEqualTo(arrayList2);
    }

    private void emitStreamElement(long j, List<StreamElement> list, TestProcessingTimeService testProcessingTimeService, SourceFunction.SourceContext<String> sourceContext) throws Exception {
        testProcessingTimeService.setCurrentTime(j);
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$operators$StreamSourceContextIdleDetectionTests$TestMethod[this.testMethod.ordinal()]) {
            case SourceOperatorTestContext.SUBTASK_INDEX /* 1 */:
                list.add(new StreamRecord("msg"));
                sourceContext.collect("msg");
                return;
            case 2:
                long currentProcessingTime = testProcessingTimeService.getCurrentProcessingTime();
                list.add(new StreamRecord("msg", currentProcessingTime));
                sourceContext.collectWithTimestamp("msg", currentProcessingTime);
                return;
            case 3:
                long currentProcessingTime2 = testProcessingTimeService.getCurrentProcessingTime();
                list.add(new Watermark(currentProcessingTime2));
                sourceContext.emitWatermark(new Watermark(currentProcessingTime2));
                return;
            default:
                return;
        }
    }

    @TestTemplate
    void testAutomaticWatermarkContext() throws Exception {
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        testProcessingTimeService.setCurrentTime(20L);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        SourceFunction.SourceContext sourceContext = StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime, testProcessingTimeService, new Object(), new CollectorOutput(arrayList), 40L, 100L, true);
        testProcessingTimeService.setCurrentTime(20 + 40);
        arrayList2.add(new Watermark(testProcessingTimeService.getCurrentProcessingTime() - (testProcessingTimeService.getCurrentProcessingTime() % 40)));
        testProcessingTimeService.setCurrentTime(20 + (2 * 40));
        arrayList2.add(new Watermark(testProcessingTimeService.getCurrentProcessingTime() - (testProcessingTimeService.getCurrentProcessingTime() % 40)));
        testProcessingTimeService.setCurrentTime(20 + 100);
        arrayList2.add(WatermarkStatus.IDLE);
        Assertions.assertThat(arrayList).isEqualTo(arrayList2);
        testProcessingTimeService.setCurrentTime(20 + (3 * 40));
        testProcessingTimeService.setCurrentTime(20 + (4 * 40));
        testProcessingTimeService.setCurrentTime(20 + (2 * 100));
        testProcessingTimeService.setCurrentTime(20 + (6 * 40));
        testProcessingTimeService.setCurrentTime(20 + (7 * 40));
        testProcessingTimeService.setCurrentTime(20 + (3 * 100));
        Assertions.assertThat(arrayList).isEqualTo(arrayList2);
        testProcessingTimeService.setCurrentTime(20 + (3 * 100) + (100 / 10));
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$operators$StreamSourceContextIdleDetectionTests$TestMethod[this.testMethod.ordinal()]) {
            case SourceOperatorTestContext.SUBTASK_INDEX /* 1 */:
                arrayList2.add(WatermarkStatus.ACTIVE);
                sourceContext.collect("msg");
                arrayList2.add(new StreamRecord("msg", testProcessingTimeService.getCurrentProcessingTime()));
                arrayList2.add(new Watermark(testProcessingTimeService.getCurrentProcessingTime() - (testProcessingTimeService.getCurrentProcessingTime() % 40)));
                Assertions.assertThat(arrayList).isEqualTo(arrayList2);
                break;
            case 2:
                arrayList2.add(WatermarkStatus.ACTIVE);
                sourceContext.collectWithTimestamp("msg", testProcessingTimeService.getCurrentProcessingTime());
                arrayList2.add(new StreamRecord("msg", testProcessingTimeService.getCurrentProcessingTime()));
                arrayList2.add(new Watermark(testProcessingTimeService.getCurrentProcessingTime() - (testProcessingTimeService.getCurrentProcessingTime() % 40)));
                Assertions.assertThat(arrayList).isEqualTo(arrayList2);
                break;
            case 3:
                sourceContext.emitWatermark(new Watermark(testProcessingTimeService.getCurrentProcessingTime()));
                Assertions.assertThat(arrayList).isEqualTo(arrayList2);
                break;
        }
        testProcessingTimeService.setCurrentTime(20 + (8 * 40));
        testProcessingTimeService.setCurrentTime(20 + (3 * 100) + ((3 * 100) / 10));
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$operators$StreamSourceContextIdleDetectionTests$TestMethod[this.testMethod.ordinal()]) {
            case SourceOperatorTestContext.SUBTASK_INDEX /* 1 */:
                sourceContext.collect("msg");
                arrayList2.add(new StreamRecord("msg", testProcessingTimeService.getCurrentProcessingTime()));
                Assertions.assertThat(arrayList).isEqualTo(arrayList2);
                break;
            case 2:
                sourceContext.collectWithTimestamp("msg", testProcessingTimeService.getCurrentProcessingTime());
                arrayList2.add(new StreamRecord("msg", testProcessingTimeService.getCurrentProcessingTime()));
                Assertions.assertThat(arrayList).isEqualTo(arrayList2);
                break;
            case 3:
                sourceContext.emitWatermark(new Watermark(testProcessingTimeService.getCurrentProcessingTime()));
                Assertions.assertThat(arrayList).isEqualTo(arrayList2);
                break;
        }
        testProcessingTimeService.setCurrentTime(20 + (9 * 40));
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$operators$StreamSourceContextIdleDetectionTests$TestMethod[this.testMethod.ordinal()]) {
            case SourceOperatorTestContext.SUBTASK_INDEX /* 1 */:
            case 2:
                arrayList2.add(new Watermark(testProcessingTimeService.getCurrentProcessingTime() - (testProcessingTimeService.getCurrentProcessingTime() % 40)));
                Assertions.assertThat(arrayList).isEqualTo(arrayList2);
                break;
            case 3:
                Assertions.assertThat(arrayList).isEqualTo(arrayList2);
                break;
        }
        testProcessingTimeService.setCurrentTime(20 + (10 * 40));
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$operators$StreamSourceContextIdleDetectionTests$TestMethod[this.testMethod.ordinal()]) {
            case SourceOperatorTestContext.SUBTASK_INDEX /* 1 */:
            case 2:
                arrayList2.add(new Watermark(testProcessingTimeService.getCurrentProcessingTime() - (testProcessingTimeService.getCurrentProcessingTime() % 40)));
                Assertions.assertThat(arrayList).isEqualTo(arrayList2);
                break;
            case 3:
                Assertions.assertThat(arrayList).isEqualTo(arrayList2);
                break;
        }
        testProcessingTimeService.setCurrentTime(20 + (4 * 100) + (100 / 10));
        Assertions.assertThat(arrayList).isEqualTo(arrayList2);
        testProcessingTimeService.setCurrentTime(20 + (11 * 40));
        if (this.testMethod != TestMethod.EMIT_WATERMARK) {
            arrayList2.add(WatermarkStatus.IDLE);
        }
        Assertions.assertThat(arrayList).isEqualTo(arrayList2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Parameters(name = "TestMethod = {0}")
    private static Collection<TestMethod[]> timeCharacteristic() {
        return Arrays.asList(new TestMethod[]{TestMethod.COLLECT}, new TestMethod[]{TestMethod.COLLECT_WITH_TIMESTAMP}, new TestMethod[]{TestMethod.EMIT_WATERMARK});
    }
}
