/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.Optional;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.MailboxWatermarkProcessor;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.streaming.util.CollectorOutput;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class MailboxWatermarkProcessorTest {
    MailboxWatermarkProcessorTest() {
    }

    @Test
    void testEmitWatermarkInsideMailbox() throws Exception {
        int priority = 42;
        ArrayList<StreamElement> emittedElements = new ArrayList<StreamElement>();
        TaskMailboxImpl mailbox = new TaskMailboxImpl();
        NoOpInternalTimeServiceManager timerService = new NoOpInternalTimeServiceManager();
        MailboxWatermarkProcessor watermarkProcessor = new MailboxWatermarkProcessor(new CollectorOutput(emittedElements), (MailboxExecutor)new MailboxExecutorImpl((TaskMailbox)mailbox, priority, StreamTaskActionExecutor.IMMEDIATE), (InternalTimeServiceManager)timerService);
        ArrayList<Watermark> expectedOutput = new ArrayList<Watermark>();
        watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(1L));
        watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(2L));
        watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(3L));
        expectedOutput.add(new Watermark(1L));
        expectedOutput.add(new Watermark(2L));
        expectedOutput.add(new Watermark(3L));
        Assertions.assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);
        mailbox.put(new Mail(() -> {}, -1, "checkpoint mail", new Object[0]));
        watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(4L));
        watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(5L));
        Assertions.assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);
        Assertions.assertThat((Optional)mailbox.tryTake(priority)).isEqualTo(Optional.empty());
        Assertions.assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);
        while (mailbox.hasMail()) {
            mailbox.take(-1).run();
        }
        expectedOutput.add(new Watermark(5L));
        Assertions.assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);
    }

    private static class NoOpInternalTimeServiceManager
    implements InternalTimeServiceManager<Object> {
        private NoOpInternalTimeServiceManager() {
        }

        public <N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<Object> keySerializer, TypeSerializer<N> namespaceSerializer, Triggerable<Object, N> triggerable) {
            throw new UnsupportedOperationException();
        }

        public void advanceWatermark(Watermark watermark) throws Exception {
            throw new UnsupportedOperationException();
        }

        public boolean tryAdvanceWatermark(Watermark watermark, InternalTimeServiceManager.ShouldStopAdvancingFn shouldStopAdvancingFn) throws Exception {
            return !shouldStopAdvancingFn.test();
        }

        public void snapshotToRawKeyedState(KeyedStateCheckpointOutputStream stateCheckpointOutputStream, String operatorName) throws Exception {
            throw new UnsupportedOperationException();
        }
    }
}

