/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.source;

import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.RecordTimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks;
import org.apache.flink.streaming.api.operators.source.WatermarkToDataOutput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class SourceOutputWithWatermarksTest {
    private static <E> SourceOutputWithWatermarks<E> createWithSameOutputs(PushingAsyncDataInput.DataOutput<E> recordsAndWatermarksOutput, TimestampAssigner<E> timestampAssigner, WatermarkGenerator<E> watermarkGenerator) {
        WatermarkToDataOutput watermarkOutput = new WatermarkToDataOutput(recordsAndWatermarksOutput);
        return new SourceOutputWithWatermarks(recordsAndWatermarksOutput, (WatermarkOutput)watermarkOutput, (WatermarkOutput)watermarkOutput, timestampAssigner, watermarkGenerator);
    }

    @Test
    public void testNoTimestampValue() {
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        SourceOutputWithWatermarks out = SourceOutputWithWatermarksTest.createWithSameOutputs(dataOutput, new RecordTimestampAssigner(), new NoWatermarksGenerator());
        out.collect((Object)17);
        Object event = dataOutput.events.get(0);
        Assert.assertThat((Object)event, (Matcher)Matchers.instanceOf(StreamRecord.class));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((StreamRecord)event).getTimestamp());
    }

    @Test
    public void eventsAreBeforeWatermarks() {
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        SourceOutputWithWatermarks out = SourceOutputWithWatermarksTest.createWithSameOutputs(dataOutput, new RecordTimestampAssigner(), new TestWatermarkGenerator());
        out.collect((Object)42, 12345L);
        Assert.assertThat(dataOutput.events, (Matcher)Matchers.contains((Object[])new Object[]{new StreamRecord((Object)42, 12345L), new Watermark(12345L)}));
    }

    private static final class TestWatermarkGenerator<T>
    implements WatermarkGenerator<T> {
        private long lastTimestamp;

        private TestWatermarkGenerator() {
        }

        public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
            this.lastTimestamp = eventTimestamp;
            output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(eventTimestamp));
        }

        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(this.lastTimestamp));
        }
    }
}

