package org.apache.flink.streaming.api.operators.source;

import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.RecordTimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarksTest.class */
class SourceOutputWithWatermarksTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarksTest$TestWatermarkGenerator.class */
    private static final class TestWatermarkGenerator<T> implements WatermarkGenerator<T> {
        private long lastTimestamp;

        private TestWatermarkGenerator() {
        }

        public void onEvent(T t, long j, WatermarkOutput watermarkOutput) {
            this.lastTimestamp = j;
            watermarkOutput.emitWatermark(new Watermark(j));
        }

        public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
            watermarkOutput.emitWatermark(new Watermark(this.lastTimestamp));
        }
    }

    SourceOutputWithWatermarksTest() {
    }

    private static <E> SourceOutputWithWatermarks<E> createWithSameOutputs(PushingAsyncDataInput.DataOutput<E> dataOutput, TimestampAssigner<E> timestampAssigner, WatermarkGenerator<E> watermarkGenerator) {
        WatermarkToDataOutput watermarkToDataOutput = new WatermarkToDataOutput(dataOutput);
        return new SourceOutputWithWatermarks<>(dataOutput, watermarkToDataOutput, watermarkToDataOutput, timestampAssigner, watermarkGenerator);
    }

    @Test
    void testNoTimestampValue() {
        CollectingDataOutput collectingDataOutput = new CollectingDataOutput();
        createWithSameOutputs(collectingDataOutput, new RecordTimestampAssigner(), new NoWatermarksGenerator()).collect(17);
        Object obj = collectingDataOutput.events.get(0);
        Assertions.assertThat(obj).isInstanceOf(StreamRecord.class);
        Assertions.assertThat(((StreamRecord) obj).getTimestamp()).isEqualTo(Long.MIN_VALUE);
    }

    @Test
    void eventsAreBeforeWatermarks() {
        CollectingDataOutput collectingDataOutput = new CollectingDataOutput();
        createWithSameOutputs(collectingDataOutput, new RecordTimestampAssigner(), new TestWatermarkGenerator()).collect(42, 12345L);
        Assertions.assertThat(collectingDataOutput.events).contains(new Object[]{new StreamRecord(42, 12345L), new org.apache.flink.streaming.api.watermark.Watermark(12345L)});
    }
}
