/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.io.Serializable;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.StreamRecordMatchers;
import org.apache.flink.streaming.util.WatermarkMatchers;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.assertj.core.api.HamcrestCondition;
import org.junit.jupiter.api.Test;

class TimestampsAndWatermarksOperatorTest {
    private static final long AUTO_WATERMARK_INTERVAL = 50L;

    TimestampsAndWatermarksOperatorTest() {
    }

    @Test
    void inputWatermarksAreNotForwarded() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = TimestampsAndWatermarksOperatorTest.createTestHarness(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PeriodicWatermarkGenerator()).withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> new LongExtractor()));
        testHarness.processWatermark(TimestampsAndWatermarksOperatorTest.createLegacyWatermark(42L));
        testHarness.setProcessingTime(50L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
    }

    @Test
    void inputStatusesAreNotForwarded() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = TimestampsAndWatermarksOperatorTest.createTestHarness(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PeriodicWatermarkGenerator()).withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> new LongExtractor()));
        testHarness.processWatermarkStatus(WatermarkStatus.IDLE);
        testHarness.setProcessingTime(50L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
    }

    @Test
    void longMaxInputWatermarkIsForwarded() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = TimestampsAndWatermarksOperatorTest.createTestHarness(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PeriodicWatermarkGenerator()).withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> new LongExtractor()));
        testHarness.processWatermark(TimestampsAndWatermarksOperatorTest.createLegacyWatermark(Long.MAX_VALUE));
        Assertions.assertThat((Object)TimestampsAndWatermarksOperatorTest.pollNextLegacyWatermark(testHarness)).is((Condition)HamcrestCondition.matching(WatermarkMatchers.legacyWatermark(Long.MAX_VALUE)));
    }

    @Test
    void periodicWatermarksEmitOnPeriodicEmitStreamMode() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = TimestampsAndWatermarksOperatorTest.createTestHarness(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PeriodicWatermarkGenerator()).withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> new LongExtractor()));
        testHarness.processElement(new StreamRecord((Object)2L, 1L));
        testHarness.setProcessingTime(50L);
        Assertions.assertThat(TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness)).is((Condition)HamcrestCondition.matching(StreamRecordMatchers.streamRecord(2L, 2L)));
        Assertions.assertThat((Object)TimestampsAndWatermarksOperatorTest.pollNextLegacyWatermark(testHarness)).is((Condition)HamcrestCondition.matching(WatermarkMatchers.legacyWatermark(1L)));
        testHarness.processElement(new StreamRecord((Object)4L, 1L));
        testHarness.setProcessingTime(100L);
        Assertions.assertThat(TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness)).is((Condition)HamcrestCondition.matching(StreamRecordMatchers.streamRecord(4L, 4L)));
        Assertions.assertThat((Object)TimestampsAndWatermarksOperatorTest.pollNextLegacyWatermark(testHarness)).is((Condition)HamcrestCondition.matching(WatermarkMatchers.legacyWatermark(3L)));
    }

    @Test
    void periodicWatermarksBatchMode() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = TimestampsAndWatermarksOperatorTest.createBatchHarness(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PeriodicWatermarkGenerator()).withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> new LongExtractor()));
        testHarness.processElement(new StreamRecord((Object)2L, 1L));
        testHarness.setProcessingTime(50L);
        Assertions.assertThat(TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness)).is((Condition)HamcrestCondition.matching(StreamRecordMatchers.streamRecord(2L, 2L)));
        Assertions.assertThat((Object)TimestampsAndWatermarksOperatorTest.pollNextLegacyWatermark(testHarness)).isNull();
        testHarness.processElement(new StreamRecord((Object)4L, 1L));
        testHarness.setProcessingTime(100L);
        Assertions.assertThat(TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness)).is((Condition)HamcrestCondition.matching(StreamRecordMatchers.streamRecord(4L, 4L)));
        Assertions.assertThat((Object)TimestampsAndWatermarksOperatorTest.pollNextLegacyWatermark(testHarness)).isNull();
    }

    @Test
    void periodicWatermarksOnlyEmitOnPeriodicEmitStreamMode() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = TimestampsAndWatermarksOperatorTest.createTestHarness(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PeriodicWatermarkGenerator()).withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> new LongExtractor()));
        testHarness.processElement(new StreamRecord((Object)2L, 1L));
        Assertions.assertThat(TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness)).is((Condition)HamcrestCondition.matching(StreamRecordMatchers.streamRecord(2L, 2L)));
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
    }

    @Test
    void periodicWatermarksDoNotRegressStreamMode() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = TimestampsAndWatermarksOperatorTest.createTestHarness(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PeriodicWatermarkGenerator()).withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> new LongExtractor()));
        testHarness.processElement(new StreamRecord((Object)4L, 1L));
        testHarness.setProcessingTime(50L);
        Assertions.assertThat(TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness)).is((Condition)HamcrestCondition.matching(StreamRecordMatchers.streamRecord(4L, 4L)));
        Assertions.assertThat((Object)TimestampsAndWatermarksOperatorTest.pollNextLegacyWatermark(testHarness)).is((Condition)HamcrestCondition.matching(WatermarkMatchers.legacyWatermark(3L)));
        testHarness.processElement(new StreamRecord((Object)2L, 1L));
        testHarness.setProcessingTime(50L);
        Assertions.assertThat(TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness)).is((Condition)HamcrestCondition.matching(StreamRecordMatchers.streamRecord(2L, 2L)));
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
    }

    @Test
    void punctuatedWatermarksEmitImmediatelyStreamMode() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = TimestampsAndWatermarksOperatorTest.createTestHarness(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PunctuatedWatermarkGenerator()).withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> new TupleExtractor()));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)true, (Object)2L), 1L));
        Assertions.assertThat(TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness)).is((Condition)HamcrestCondition.matching(StreamRecordMatchers.streamRecord(new Tuple2((Object)true, (Object)2L), 2L)));
        Assertions.assertThat((Object)TimestampsAndWatermarksOperatorTest.pollNextLegacyWatermark(testHarness)).is((Condition)HamcrestCondition.matching(WatermarkMatchers.legacyWatermark(2L)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)true, (Object)4L), 1L));
        Assertions.assertThat(TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness)).is((Condition)HamcrestCondition.matching(StreamRecordMatchers.streamRecord(new Tuple2((Object)true, (Object)4L), 4L)));
        Assertions.assertThat((Object)TimestampsAndWatermarksOperatorTest.pollNextLegacyWatermark(testHarness)).is((Condition)HamcrestCondition.matching(WatermarkMatchers.legacyWatermark(4L)));
    }

    @Test
    void punctuatedWatermarksBatchMode() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = TimestampsAndWatermarksOperatorTest.createBatchHarness(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PunctuatedWatermarkGenerator()).withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> new TupleExtractor()));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)true, (Object)2L), 1L));
        Assertions.assertThat(TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness)).is((Condition)HamcrestCondition.matching(StreamRecordMatchers.streamRecord(new Tuple2((Object)true, (Object)2L), 2L)));
        Assertions.assertThat((Object)TimestampsAndWatermarksOperatorTest.pollNextLegacyWatermark(testHarness)).isNull();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)true, (Object)4L), 1L));
        Assertions.assertThat(TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness)).is((Condition)HamcrestCondition.matching(StreamRecordMatchers.streamRecord(new Tuple2((Object)true, (Object)4L), 4L)));
        Assertions.assertThat((Object)TimestampsAndWatermarksOperatorTest.pollNextLegacyWatermark(testHarness)).isNull();
    }

    @Test
    void punctuatedWatermarksDoNotRegressStreamMode() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = TimestampsAndWatermarksOperatorTest.createTestHarness(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PunctuatedWatermarkGenerator()).withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> new TupleExtractor()));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)true, (Object)4L), 1L));
        Assertions.assertThat(TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness)).is((Condition)HamcrestCondition.matching(StreamRecordMatchers.streamRecord(new Tuple2((Object)true, (Object)4L), 4L)));
        Assertions.assertThat((Object)TimestampsAndWatermarksOperatorTest.pollNextLegacyWatermark(testHarness)).is((Condition)HamcrestCondition.matching(WatermarkMatchers.legacyWatermark(4L)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)true, (Object)2L), 1L));
        Assertions.assertThat(TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness)).is((Condition)HamcrestCondition.matching(StreamRecordMatchers.streamRecord(new Tuple2((Object)true, (Object)2L), 2L)));
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
    }

    @Test
    void testNegativeTimestamps() throws Exception {
        long[] values;
        OneInputStreamOperatorTestHarness testHarness = TimestampsAndWatermarksOperatorTest.createTestHarness(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new NeverWatermarkGenerator()).withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> new LongExtractor()));
        for (long value : values = new long[]{Long.MIN_VALUE, -1L, 0L, 1L, 2L, 3L, Long.MAX_VALUE}) {
            testHarness.processElement(new StreamRecord((Object)value));
        }
        for (long value : values) {
            Assertions.assertThat((long)TimestampsAndWatermarksOperatorTest.pollNextStreamRecord(testHarness).getTimestamp()).isEqualTo(value);
        }
    }

    @Test
    void testGetMetricGroup() throws Exception {
        AtomicReference lastTimestampGauge = new AtomicReference();
        OneInputStreamOperatorTestHarness testHarness = TimestampsAndWatermarksOperatorTest.createTestHarness(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> {
            WatermarkGeneratorWithMetrics generator = new WatermarkGeneratorWithMetrics(ctx.getMetricGroup());
            lastTimestampGauge.set(generator.lastTimestampGauge);
            return generator;
        }).withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> new LongExtractor()));
        testHarness.processElement(new StreamRecord((Object)42L));
        Assertions.assertThat((Long)((Long)((Gauge)lastTimestampGauge.get()).getValue())).isEqualTo(42L);
    }

    @Test
    void watermarksWithIdlenessUnderBackpressure() throws Exception {
        int i;
        long idleTimeout = 100L;
        TimestampsAndWatermarksOperator operator = new TimestampsAndWatermarksOperator(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PunctuatedWatermarkGenerator()).withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> new TupleExtractor()).withIdleness(Duration.ofMillis(idleTimeout)), true);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        testHarness.open();
        TaskIOMetricGroup taskIOMetricGroup = testHarness.getEnvironment().getMetricGroup().getIOMetricGroup();
        taskIOMetricGroup.getHardBackPressuredTimePerSecond().markStart();
        for (i = 0; i < 10; ++i) {
            testHarness.advanceTime(idleTimeout);
        }
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        taskIOMetricGroup.getHardBackPressuredTimePerSecond().markEnd();
        taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markStart();
        for (i = 10; i < 20; ++i) {
            testHarness.advanceTime(idleTimeout);
        }
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markEnd();
        for (i = 20; i < 30; ++i) {
            testHarness.advanceTime(idleTimeout);
        }
        Assertions.assertThat(testHarness.getOutput()).containsExactly(new Object[]{WatermarkStatus.IDLE});
    }

    private static <T> OneInputStreamOperatorTestHarness<T, T> createTestHarness(WatermarkStrategy<T> watermarkStrategy) throws Exception {
        TimestampsAndWatermarksOperator operator = new TimestampsAndWatermarksOperator(watermarkStrategy, true);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        testHarness.getExecutionConfig().setAutoWatermarkInterval(50L);
        testHarness.open();
        return testHarness;
    }

    private static <T> OneInputStreamOperatorTestHarness<T, T> createBatchHarness(WatermarkStrategy<T> watermarkStrategy) throws Exception {
        TimestampsAndWatermarksOperator operator = new TimestampsAndWatermarksOperator(watermarkStrategy, false);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        testHarness.open();
        return testHarness;
    }

    private static <T> StreamRecord<T> pollNextStreamRecord(OneInputStreamOperatorTestHarness<?, T> testHarness) {
        return (StreamRecord)testHarness.getOutput().poll();
    }

    private static org.apache.flink.streaming.api.watermark.Watermark pollNextLegacyWatermark(OneInputStreamOperatorTestHarness<?, ?> testHarness) {
        return (org.apache.flink.streaming.api.watermark.Watermark)testHarness.getOutput().poll();
    }

    private static org.apache.flink.streaming.api.watermark.Watermark createLegacyWatermark(long timestamp) {
        return new org.apache.flink.streaming.api.watermark.Watermark(timestamp);
    }

    private static class WatermarkGeneratorWithMetrics
    implements WatermarkGenerator<Long>,
    Serializable {
        private long lastTimestamp;
        Gauge<Long> lastTimestampGauge;

        public WatermarkGeneratorWithMetrics(MetricGroup metricGroup) {
            this.lastTimestampGauge = metricGroup.gauge("lastTimestamp", () -> this.lastTimestamp);
        }

        public void onEvent(Long event, long eventTimestamp, WatermarkOutput output) {
            this.lastTimestamp = eventTimestamp;
        }

        public void onPeriodicEmit(WatermarkOutput output) {
        }
    }

    private static class NeverWatermarkGenerator
    implements WatermarkGenerator<Long>,
    Serializable {
        private NeverWatermarkGenerator() {
        }

        public void onEvent(Long event, long eventTimestamp, WatermarkOutput output) {
        }

        public void onPeriodicEmit(WatermarkOutput output) {
        }
    }

    private static class PunctuatedWatermarkGenerator
    implements WatermarkGenerator<Tuple2<Boolean, Long>>,
    Serializable {
        private PunctuatedWatermarkGenerator() {
        }

        public void onEvent(Tuple2<Boolean, Long> event, long eventTimestamp, WatermarkOutput output) {
            if (((Boolean)event.f0).booleanValue()) {
                output.emitWatermark(new Watermark(((Long)event.f1).longValue()));
            }
        }

        public void onPeriodicEmit(WatermarkOutput output) {
        }
    }

    private static class PeriodicWatermarkGenerator
    implements WatermarkGenerator<Long>,
    Serializable {
        private long currentWatermark = Long.MIN_VALUE;

        private PeriodicWatermarkGenerator() {
        }

        public void onEvent(Long event, long eventTimestamp, WatermarkOutput output) {
            this.currentWatermark = eventTimestamp;
        }

        public void onPeriodicEmit(WatermarkOutput output) {
            long effectiveWatermark = this.currentWatermark == Long.MIN_VALUE ? Long.MIN_VALUE : this.currentWatermark - 1L;
            output.emitWatermark(new Watermark(effectiveWatermark));
        }
    }

    private static class TupleExtractor
    implements TimestampAssigner<Tuple2<Boolean, Long>> {
        private TupleExtractor() {
        }

        public long extractTimestamp(Tuple2<Boolean, Long> element, long recordTimestamp) {
            return (Long)element.f1;
        }
    }

    private static class LongExtractor
    implements TimestampAssigner<Long> {
        private LongExtractor() {
        }

        public long extractTimestamp(Long element, long recordTimestamp) {
            return element;
        }
    }
}

