/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.core.fs.AutoCloseableRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorTestBase;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class SourceCoordinatorAlignmentTest
extends SourceCoordinatorTestBase {
    private static final Random RANDOM = new Random();

    SourceCoordinatorAlignmentTest() {
    }

    @Test
    void testWatermarkAlignment() throws Exception {
        try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry();){
            SourceCoordinator<?, ?> sourceCoordinator1 = this.getAndStartNewSourceCoordinator(new WatermarkAlignmentParams(1000L, "group1", Long.MAX_VALUE), closeableRegistry);
            int subtask0 = 0;
            int subtask1 = 1;
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 42L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask1, 44L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1042L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 5000L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1044L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1044L);
        }
    }

    @Test
    void testWatermarkAlignmentWithIdleness() throws Exception {
        try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry();){
            SourceCoordinator<?, ?> sourceCoordinator1 = this.getAndStartNewSourceCoordinator(new WatermarkAlignmentParams(1000L, "group1", Long.MAX_VALUE), closeableRegistry);
            int subtask0 = 0;
            int subtask1 = 1;
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 42L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask1, 44L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1042L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, Long.MAX_VALUE);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1044L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1044L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 42L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1042L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, Long.MAX_VALUE);
            this.reportWatermarkEvent(sourceCoordinator1, subtask1, Long.MAX_VALUE);
            this.assertLatestWatermarkAlignmentEvent(subtask0, Long.MAX_VALUE);
            this.assertLatestWatermarkAlignmentEvent(subtask1, Long.MAX_VALUE);
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 42L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1042L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask1, 46L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1042L);
        }
    }

    @Test
    void testWatermarkAlignmentWithTwoGroups() throws Exception {
        try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry();){
            long maxDrift = 1000L;
            SourceCoordinator<?, ?> sourceCoordinator1 = this.getAndStartNewSourceCoordinator(new WatermarkAlignmentParams(maxDrift, "group1", Long.MAX_VALUE), closeableRegistry);
            SourceCoordinator<?, ?> sourceCoordinator2 = this.getAndStartNewSourceCoordinator(new WatermarkAlignmentParams(maxDrift, "group2", Long.MAX_VALUE), closeableRegistry);
            int subtask0 = 0;
            int subtask1 = 1;
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 42L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.reportWatermarkEvent(sourceCoordinator2, subtask1, 44L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 1042L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1044L);
            this.reportWatermarkEvent(sourceCoordinator1, subtask0, 5000L);
            this.assertLatestWatermarkAlignmentEvent(subtask0, 6000L);
            this.assertLatestWatermarkAlignmentEvent(subtask1, 1044L);
        }
    }

    @Test
    void testAnnounceCombinedWatermarkWithoutStart() throws Exception {
        long maxDrift = 1000L;
        WatermarkAlignmentParams params = new WatermarkAlignmentParams(maxDrift, "group1", maxDrift);
        Source<Integer, MockSourceSplit, Set<MockSourceSplit>> mockSource = this.createMockSource();
        final AtomicInteger counter1 = new AtomicInteger(0);
        this.sourceCoordinator = new SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>(new JobID(), "TestOperator", mockSource, this.getNewSourceCoordinatorContext(), (CoordinatorStore)new CoordinatorStoreImpl(), params, null){

            void announceCombinedWatermark() {
                counter1.incrementAndGet();
            }
        };
        this.sourceCoordinator.close();
        final CountDownLatch latch = new CountDownLatch(2);
        this.sourceCoordinator = new SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>(new JobID(), "TestOperator", mockSource, this.getNewSourceCoordinatorContext(), (CoordinatorStore)new CoordinatorStoreImpl(), params, null){

            void announceCombinedWatermark() {
                latch.countDown();
            }
        };
        this.sourceCoordinator.start();
        this.setReaderTaskReady(this.sourceCoordinator, 0, 0);
        latch.await();
        Assertions.assertThat((int)counter1.get()).isZero();
        this.sourceCoordinator.close();
    }

    @Test
    void testSendWatermarkAlignmentEventFailed() throws Exception {
        long maxDrift = 1000L;
        WatermarkAlignmentParams params = new WatermarkAlignmentParams(maxDrift, "group1", maxDrift);
        Source<Integer, MockSourceSplit, Set<MockSourceSplit>> mockSource = this.createMockSource();
        final CountDownLatch latch = new CountDownLatch(1);
        this.sourceCoordinator = new SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>(new JobID(), "TestOperator", mockSource, this.getNewSourceCoordinatorContext(), (CoordinatorStore)new CoordinatorStoreImpl(), params, null){

            void announceCombinedWatermark() {
                RuntimeException exception = null;
                try {
                    super.announceCombinedWatermark();
                }
                catch (RuntimeException t) {
                    exception = t;
                }
                latch.countDown();
                if (exception != null) {
                    throw exception;
                }
            }
        };
        this.sourceCoordinator.start();
        boolean subtask = false;
        int attemptNumber = 0;
        this.sourceCoordinator.handleEventFromOperator(0, attemptNumber, (OperatorEvent)new ReaderRegistrationEvent(0, SourceCoordinatorAlignmentTest.createLocationFor(0, attemptNumber)));
        this.sourceCoordinator.handleEventFromOperator(0, attemptNumber, (OperatorEvent)new ReportedWatermarkEvent(1000L));
        latch.await();
        this.setReaderTaskReady(this.sourceCoordinator, 0, attemptNumber);
        this.waitForSentEvents(5);
        Assertions.assertThat((boolean)this.operatorCoordinatorContext.isJobFailed()).isFalse();
    }

    @Test
    void testWatermarkAggregator() {
        SourceCoordinator.WatermarkAggregator combinedWatermark = new SourceCoordinator.WatermarkAggregator();
        combinedWatermark.aggregate((Object)0, new Watermark(10L));
        Assertions.assertThat((long)combinedWatermark.getAggregatedWatermark().getTimestamp()).isEqualTo(10L);
        combinedWatermark.aggregate((Object)1, new Watermark(12L));
        Assertions.assertThat((long)combinedWatermark.getAggregatedWatermark().getTimestamp()).isEqualTo(10L);
        combinedWatermark.aggregate((Object)2, new Watermark(13L));
        Assertions.assertThat((long)combinedWatermark.getAggregatedWatermark().getTimestamp()).isEqualTo(10L);
        combinedWatermark.aggregate((Object)1, new Watermark(9L));
        Assertions.assertThat((long)combinedWatermark.getAggregatedWatermark().getTimestamp()).isEqualTo(9L);
        combinedWatermark.aggregate((Object)1, new Watermark(Long.MAX_VALUE));
        Assertions.assertThat((long)combinedWatermark.getAggregatedWatermark().getTimestamp()).isEqualTo(10L);
        combinedWatermark.aggregate((Object)1, new Watermark(8L));
        Assertions.assertThat((long)combinedWatermark.getAggregatedWatermark().getTimestamp()).isEqualTo(8L);
        combinedWatermark.aggregate((Object)1, new Watermark(20L));
        Assertions.assertThat((long)combinedWatermark.getAggregatedWatermark().getTimestamp()).isEqualTo(10L);
        combinedWatermark.aggregate((Object)0, new Watermark(23L));
        Assertions.assertThat((long)combinedWatermark.getAggregatedWatermark().getTimestamp()).isEqualTo(13L);
        combinedWatermark.aggregate((Object)2, new Watermark(22L));
        Assertions.assertThat((long)combinedWatermark.getAggregatedWatermark().getTimestamp()).isEqualTo(20L);
    }

    @Test
    void testWatermarkAggregatorRandomly() {
        this.testWatermarkAggregatorRandomly(20, 1000, true, true);
        this.testWatermarkAggregatorRandomly(20, 1000, true, false);
        this.testWatermarkAggregatorRandomly(20, 2000, true, true);
        this.testWatermarkAggregatorRandomly(20, 2000, true, false);
        this.testWatermarkAggregatorRandomly(20, 5000, true, true);
        this.testWatermarkAggregatorRandomly(20, 5000, true, false);
        this.testWatermarkAggregatorRandomly(10, 10000, true, true);
        this.testWatermarkAggregatorRandomly(10, 10000, true, false);
    }

    private void testWatermarkAggregatorRandomly(int roundNumber, int keyNumber, boolean checkResult, boolean testSourceIdle) {
        SourceCoordinator.WatermarkAggregator combinedWatermark = new SourceCoordinator.WatermarkAggregator();
        HashMap<Integer, Long> latestWatermarks = new HashMap<Integer, Long>();
        for (long round = 0L; round < (long)roundNumber; ++round) {
            for (int key = 0; key < keyNumber; ++key) {
                long timestamp = this.getRandomTimestamp(testSourceIdle);
                combinedWatermark.aggregate((Object)key, new Watermark(timestamp));
                if (!checkResult) continue;
                latestWatermarks.put(key, timestamp);
                this.assertAggregatedWatermark((SourceCoordinator.WatermarkAggregator<Integer>)combinedWatermark, latestWatermarks);
            }
        }
    }

    private long getRandomTimestamp(boolean testSourceIdle) {
        if (testSourceIdle && RANDOM.nextInt(100) == 0) {
            return RANDOM.nextBoolean() ? Long.MAX_VALUE : Long.MIN_VALUE;
        }
        return System.currentTimeMillis() / 1000L * 1000L + (long)RANDOM.nextInt(1000);
    }

    private void assertAggregatedWatermark(SourceCoordinator.WatermarkAggregator<Integer> combinedWatermark, Map<Integer, Long> latestWatermarks) {
        long expectAggregatedWatermark = latestWatermarks.values().stream().min(Comparable::compareTo).orElseThrow(IllegalStateException::new);
        Assertions.assertThat((long)combinedWatermark.getAggregatedWatermark().getTimestamp()).isEqualTo(expectAggregatedWatermark);
    }

    private SourceCoordinator<?, ?> getAndStartNewSourceCoordinator(WatermarkAlignmentParams watermarkAlignmentParams, AutoCloseableRegistry closeableRegistry) throws Exception {
        SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> sourceCoordinator = this.getNewSourceCoordinator(watermarkAlignmentParams);
        closeableRegistry.registerCloseable(sourceCoordinator);
        sourceCoordinator.start();
        this.setAllReaderTasksReady(sourceCoordinator);
        return sourceCoordinator;
    }

    private void reportWatermarkEvent(SourceCoordinator<?, ?> sourceCoordinator1, int subtask, long watermark) {
        sourceCoordinator1.handleEventFromOperator(subtask, 0, (OperatorEvent)new ReportedWatermarkEvent(watermark));
        CoordinatorTestUtils.waitForCoordinatorToProcessActions(sourceCoordinator1.getContext());
        sourceCoordinator1.announceCombinedWatermark();
    }

    private void assertLatestWatermarkAlignmentEvent(int subtask, long expectedWatermark) {
        List<OperatorEvent> events = this.receivingTasks.getSentEventsForSubtask(subtask);
        Assertions.assertThat(events).isNotEmpty();
        Assertions.assertThat((Object)events.get(events.size() - 1)).isEqualTo((Object)new WatermarkAlignmentEvent(expectedWatermark));
    }
}

