/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class SourceOperatorSplitWatermarkAlignmentTest {
    SourceOperatorSplitWatermarkAlignmentTest() {
    }

    @Test
    void testSplitWatermarkAlignment() throws Exception {
        MockSourceReader sourceReader = new MockSourceReader(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
        TestingSourceOperator operator = new TestingSourceOperator(sourceReader, WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new TestWatermarkGenerator()).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(r, l) -> r.intValue()).withWatermarkAlignment("group-1", Duration.ofMillis(1L)), (ProcessingTimeService)new TestProcessingTimeService(), (OperatorEventGateway)new MockOperatorEventGateway(), 1, 5, true);
        Environment env = this.getTestingEnvironment();
        operator.setup((StreamTask)new SourceOperatorStreamTask(env), new MockStreamConfig(new Configuration(), 1), new MockOutput(new ArrayList()));
        operator.initializeState((StreamTaskStateInitializer)new StreamTaskStateInitializerImpl(env, (StateBackend)new MemoryStateBackend()));
        operator.open();
        MockSourceSplit split1 = new MockSourceSplit(0, 0, 10);
        MockSourceSplit split2 = new MockSourceSplit(1, 10, 20);
        split1.addRecord(5);
        split1.addRecord(11);
        split2.addRecord(3);
        split2.addRecord(12);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(split1, split2), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        operator.emitNext(dataOutput);
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent(4L));
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).containsExactly((Object[])new String[]{"0"});
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent(5L));
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).isEmpty();
        operator.emitNext(dataOutput);
        operator.emitNext(dataOutput);
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).containsExactly((Object[])new String[]{"0"});
        operator.emitNext(dataOutput);
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).containsExactly((Object[])new String[]{"0", "1"});
    }

    @Test
    void testBackpressureAndIdleness() throws Exception {
        int i;
        long idleTimeout = 100L;
        MockSourceReader sourceReader = new MockSourceReader(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        SourceOperator<Integer, MockSourceSplit> operator = this.createAndOpenSourceOperatorWithIdleness(sourceReader, processingTimeService, idleTimeout);
        MockSourceSplit split0 = new MockSourceSplit(0, 0, 10).addRecord(42).addRecord(44);
        MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(split0, split1), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        operator.emitNext(dataOutput);
        TaskIOMetricGroup taskIOMetricGroup = operator.getContainingTask().getEnvironment().getMetricGroup().getIOMetricGroup();
        taskIOMetricGroup.getHardBackPressuredTimePerSecond().markStart();
        for (i = 0; i < 10; ++i) {
            processingTimeService.advance(idleTimeout);
            operator.emitNext(dataOutput);
        }
        Assertions.assertThat(dataOutput.getEvents()).doesNotContain(new Object[]{WatermarkStatus.IDLE});
        Assertions.assertThat(dataOutput.getEvents()).doNotHave((Condition)new AnyWatermark());
        taskIOMetricGroup.getHardBackPressuredTimePerSecond().markEnd();
        taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markStart();
        for (i = 0; i < 10; ++i) {
            processingTimeService.advance(idleTimeout);
        }
        Assertions.assertThat(dataOutput.getEvents()).doesNotContain(new Object[]{WatermarkStatus.IDLE});
        Assertions.assertThat(dataOutput.getEvents()).doNotHave((Condition)new AnyWatermark());
        taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markEnd();
        for (i = 0; i < 10; ++i) {
            processingTimeService.advance(idleTimeout);
        }
        Assertions.assertThat(dataOutput.getEvents()).contains(new Object[]{WatermarkStatus.IDLE});
        Assertions.assertThat(dataOutput.getEvents()).doNotHave((Condition)new AnyWatermark());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testSingleSplitWatermarkAlignmentAndIdleness(boolean usePerSplitOutputs) throws Exception {
        long idleTimeout = 100L;
        MockSourceReader sourceReader = new MockSourceReader(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, usePerSplitOutputs);
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        processingTimeService.setCurrentTime(1L);
        SourceOperator<Integer, MockSourceSplit> operator = this.createAndOpenSourceOperatorWithIdleness(sourceReader, processingTimeService, idleTimeout);
        MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
        int maxAllowedWatermark = 4;
        int maxEmittedWatermark = maxAllowedWatermark + 1;
        split0.addRecord(1).addRecord(1).addRecord(1).addRecord(1).addRecord(maxEmittedWatermark).addRecord(maxEmittedWatermark + 100);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(split0), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)maxAllowedWatermark));
        for (int i = 0; i < 10; ++i) {
            operator.emitNext(dataOutput);
            processingTimeService.advance(idleTimeout);
        }
        Assertions.assertThat(dataOutput.getEvents()).doesNotContain(new Object[]{WatermarkStatus.IDLE});
    }

    @Test
    void testMultiSplitWatermarkAlignmentAndIdleness() throws Exception {
        long idleTimeout = 100L;
        MockSourceReader sourceReader = new MockSourceReader(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        SourceOperator<Integer, MockSourceSplit> operator = this.createAndOpenSourceOperatorWithIdleness(sourceReader, processingTimeService, idleTimeout);
        MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
        MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
        int maxAllowedWatermark = 4;
        int maxEmittedWatermark = maxAllowedWatermark + 1;
        split0.addRecord(maxEmittedWatermark).addRecord(maxEmittedWatermark + 100);
        split1.addRecord(3).addRecord(3).addRecord(3).addRecord(3).addRecord(3).addRecord(3).addRecord(3);
        split1.addRecord(maxEmittedWatermark + 100);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(split0, split1), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        operator.emitNext(dataOutput);
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)maxAllowedWatermark));
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).containsExactly((Object[])new String[]{"0"});
        while (operator.isAvailable()) {
            processingTimeService.advance(idleTimeout - 1L);
            operator.emitNext(dataOutput);
        }
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).containsExactly((Object[])new String[]{"0", "1"});
        Assertions.assertThat(dataOutput.getEvents()).doNotHave((Condition)new WatermarkAbove(maxEmittedWatermark));
    }

    @Test
    void testSplitWatermarkAlignmentWithFinishedSplit() throws Exception {
        long idleTimeout = 100L;
        MockSourceReader sourceReader = new MockSourceReader(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        SourceOperator<Integer, MockSourceSplit> operator = this.createAndOpenSourceOperatorWithIdleness(sourceReader, processingTimeService, idleTimeout);
        MockSourceSplit split0 = new MockSourceSplit(0, 0, 1);
        MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
        int maxAllowedWatermark = 4;
        int maxEmittedWatermark = maxAllowedWatermark + 1;
        split0.addRecord(maxEmittedWatermark);
        split1.addRecord(3);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(split0, split1), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        while (operator.emitNext(dataOutput) == DataInputStatus.MORE_AVAILABLE) {
        }
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)maxAllowedWatermark));
        Assertions.assertThat((Collection)sourceReader.getPausedSplits()).isEmpty();
    }

    private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWithIdleness(MockSourceReader sourceReader, TestProcessingTimeService processingTimeService, long idleTimeout) throws Exception {
        TestingSourceOperator<Integer> operator = new TestingSourceOperator<Integer>((SourceReader<Integer, MockSourceSplit>)sourceReader, (WatermarkStrategy<Integer>)WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new TestWatermarkGenerator()).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(r, l) -> r.intValue()).withWatermarkAlignment("group-1", Duration.ofMillis(1L)).withIdleness(Duration.ofMillis(idleTimeout)), (ProcessingTimeService)processingTimeService, (OperatorEventGateway)new MockOperatorEventGateway(), 1, 5, true);
        Environment env = this.getTestingEnvironment();
        operator.setup((StreamTask)new SourceOperatorStreamTask(env), new MockStreamConfig(new Configuration(), 1), new MockOutput(new ArrayList()));
        operator.initializeState((StreamTaskStateInitializer)new StreamTaskStateInitializerImpl(env, (StateBackend)new MemoryStateBackend()));
        operator.open();
        return operator;
    }

    private Environment getTestingEnvironment() {
        return new StreamMockEnvironment(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), 1, (TaskStateManager)new TestTaskStateManager());
    }

    public static class AnyWatermark
    extends Condition<Object> {
        public AnyWatermark() {
            super(event -> event instanceof Watermark, "any watermark", new Object[0]);
        }
    }

    public static class WatermarkAbove
    extends Condition<Object> {
        public WatermarkAbove(int maxEmittedWatermark) {
            super(event -> {
                if (!(event instanceof Watermark)) {
                    return false;
                }
                Watermark w = (Watermark)event;
                return w.getTimestamp() > (long)maxEmittedWatermark;
            }, "watermark value of greater than %d", new Object[]{maxEmittedWatermark});
        }
    }

    private static class TestWatermarkGenerator
    implements WatermarkGenerator<Integer> {
        private long maxWatermark = Long.MIN_VALUE;

        private TestWatermarkGenerator() {
        }

        public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) {
            if (eventTimestamp > this.maxWatermark) {
                this.maxWatermark = eventTimestamp;
                output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(this.maxWatermark));
            }
        }

        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(this.maxWatermark));
        }
    }
}

