package org.apache.flink.runtime.source.coordinator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.core.fs.AutoCloseableRegistry;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.class */
class SourceCoordinatorAlignmentTest extends SourceCoordinatorTestBase {
    private static final Random RANDOM = new Random();

    SourceCoordinatorAlignmentTest() {
    }

    @Test
    void testWatermarkAlignment() throws Exception {
        AutoCloseableRegistry autoCloseableRegistry = new AutoCloseableRegistry();
        Throwable th = null;
        try {
            SourceCoordinator<?, ?> andStartNewSourceCoordinator = getAndStartNewSourceCoordinator(new WatermarkAlignmentParams(1000L, "group1", Long.MAX_VALUE), autoCloseableRegistry);
            reportWatermarkEvent(andStartNewSourceCoordinator, 0, 42L);
            assertLatestWatermarkAlignmentEvent(0, 1042L);
            reportWatermarkEvent(andStartNewSourceCoordinator, 1, 44L);
            assertLatestWatermarkAlignmentEvent(0, 1042L);
            assertLatestWatermarkAlignmentEvent(1, 1042L);
            reportWatermarkEvent(andStartNewSourceCoordinator, 0, 5000L);
            assertLatestWatermarkAlignmentEvent(0, 1044L);
            assertLatestWatermarkAlignmentEvent(1, 1044L);
            if (autoCloseableRegistry != null) {
                if (0 == 0) {
                    autoCloseableRegistry.close();
                    return;
                }
                try {
                    autoCloseableRegistry.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (autoCloseableRegistry != null) {
                if (0 != 0) {
                    try {
                        autoCloseableRegistry.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    autoCloseableRegistry.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testWatermarkAlignmentWithIdleness() throws Exception {
        AutoCloseableRegistry autoCloseableRegistry = new AutoCloseableRegistry();
        Throwable th = null;
        try {
            SourceCoordinator<?, ?> andStartNewSourceCoordinator = getAndStartNewSourceCoordinator(new WatermarkAlignmentParams(1000L, "group1", Long.MAX_VALUE), autoCloseableRegistry);
            reportWatermarkEvent(andStartNewSourceCoordinator, 0, 42L);
            assertLatestWatermarkAlignmentEvent(0, 1042L);
            reportWatermarkEvent(andStartNewSourceCoordinator, 1, 44L);
            assertLatestWatermarkAlignmentEvent(0, 1042L);
            assertLatestWatermarkAlignmentEvent(1, 1042L);
            reportWatermarkEvent(andStartNewSourceCoordinator, 0, Long.MAX_VALUE);
            assertLatestWatermarkAlignmentEvent(0, 1044L);
            assertLatestWatermarkAlignmentEvent(1, 1044L);
            reportWatermarkEvent(andStartNewSourceCoordinator, 0, 42L);
            assertLatestWatermarkAlignmentEvent(0, 1042L);
            assertLatestWatermarkAlignmentEvent(1, 1042L);
            reportWatermarkEvent(andStartNewSourceCoordinator, 0, Long.MAX_VALUE);
            reportWatermarkEvent(andStartNewSourceCoordinator, 1, Long.MAX_VALUE);
            assertLatestWatermarkAlignmentEvent(0, Long.MAX_VALUE);
            assertLatestWatermarkAlignmentEvent(1, Long.MAX_VALUE);
            reportWatermarkEvent(andStartNewSourceCoordinator, 0, 42L);
            assertLatestWatermarkAlignmentEvent(0, 1042L);
            assertLatestWatermarkAlignmentEvent(1, 1042L);
            reportWatermarkEvent(andStartNewSourceCoordinator, 1, 46L);
            assertLatestWatermarkAlignmentEvent(0, 1042L);
            assertLatestWatermarkAlignmentEvent(1, 1042L);
            if (autoCloseableRegistry != null) {
                if (0 == 0) {
                    autoCloseableRegistry.close();
                    return;
                }
                try {
                    autoCloseableRegistry.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (autoCloseableRegistry != null) {
                if (0 != 0) {
                    try {
                        autoCloseableRegistry.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    autoCloseableRegistry.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testWatermarkAlignmentWithTwoGroups() throws Exception {
        AutoCloseableRegistry autoCloseableRegistry = new AutoCloseableRegistry();
        Throwable th = null;
        try {
            SourceCoordinator<?, ?> andStartNewSourceCoordinator = getAndStartNewSourceCoordinator(new WatermarkAlignmentParams(1000L, "group1", Long.MAX_VALUE), autoCloseableRegistry);
            SourceCoordinator<?, ?> andStartNewSourceCoordinator2 = getAndStartNewSourceCoordinator(new WatermarkAlignmentParams(1000L, "group2", Long.MAX_VALUE), autoCloseableRegistry);
            reportWatermarkEvent(andStartNewSourceCoordinator, 0, 42L);
            assertLatestWatermarkAlignmentEvent(0, 1042L);
            reportWatermarkEvent(andStartNewSourceCoordinator2, 1, 44L);
            assertLatestWatermarkAlignmentEvent(0, 1042L);
            assertLatestWatermarkAlignmentEvent(1, 1044L);
            reportWatermarkEvent(andStartNewSourceCoordinator, 0, 5000L);
            assertLatestWatermarkAlignmentEvent(0, 6000L);
            assertLatestWatermarkAlignmentEvent(1, 1044L);
            if (autoCloseableRegistry != null) {
                if (0 == 0) {
                    autoCloseableRegistry.close();
                    return;
                }
                try {
                    autoCloseableRegistry.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (autoCloseableRegistry != null) {
                if (0 != 0) {
                    try {
                        autoCloseableRegistry.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    autoCloseableRegistry.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testAnnounceCombinedWatermarkWithoutStart() throws Exception {
        WatermarkAlignmentParams watermarkAlignmentParams = new WatermarkAlignmentParams(1000L, "group1", 1000L);
        Source<Integer, MockSourceSplit, Set<MockSourceSplit>> createMockSource = createMockSource();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        this.sourceCoordinator = new SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>("TestOperator", createMockSource, getNewSourceCoordinatorContext(), new CoordinatorStoreImpl(), watermarkAlignmentParams, null) { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorAlignmentTest.1
            void announceCombinedWatermark() {
                atomicInteger.incrementAndGet();
            }
        };
        this.sourceCoordinator.close();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        this.sourceCoordinator = new SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>("TestOperator", createMockSource, getNewSourceCoordinatorContext(), new CoordinatorStoreImpl(), watermarkAlignmentParams, null) { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorAlignmentTest.2
            void announceCombinedWatermark() {
                countDownLatch.countDown();
            }
        };
        this.sourceCoordinator.start();
        setReaderTaskReady(this.sourceCoordinator, 0, 0);
        countDownLatch.await();
        Assertions.assertThat(atomicInteger.get()).isZero();
        this.sourceCoordinator.close();
    }

    @Test
    void testSendWatermarkAlignmentEventFailed() throws Exception {
        WatermarkAlignmentParams watermarkAlignmentParams = new WatermarkAlignmentParams(1000L, "group1", 1000L);
        Source<Integer, MockSourceSplit, Set<MockSourceSplit>> createMockSource = createMockSource();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.sourceCoordinator = new SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>("TestOperator", createMockSource, getNewSourceCoordinatorContext(), new CoordinatorStoreImpl(), watermarkAlignmentParams, null) { // from class: org.apache.flink.runtime.source.coordinator.SourceCoordinatorAlignmentTest.3
            void announceCombinedWatermark() {
                RuntimeException runtimeException = null;
                try {
                    super.announceCombinedWatermark();
                } catch (RuntimeException e) {
                    runtimeException = e;
                }
                countDownLatch.countDown();
                if (runtimeException != null) {
                    throw runtimeException;
                }
            }
        };
        this.sourceCoordinator.start();
        this.sourceCoordinator.handleEventFromOperator(0, 0, new ReaderRegistrationEvent(0, createLocationFor(0, 0)));
        this.sourceCoordinator.handleEventFromOperator(0, 0, new ReportedWatermarkEvent(1000L));
        countDownLatch.await();
        setReaderTaskReady(this.sourceCoordinator, 0, 0);
        waitForSentEvents(5);
        Assertions.assertThat(this.operatorCoordinatorContext.isJobFailed()).isFalse();
    }

    @Test
    void testWatermarkAggregator() {
        SourceCoordinator.WatermarkAggregator watermarkAggregator = new SourceCoordinator.WatermarkAggregator();
        watermarkAggregator.aggregate(0, new Watermark(10L));
        Assertions.assertThat(watermarkAggregator.getAggregatedWatermark().getTimestamp()).isEqualTo(10L);
        watermarkAggregator.aggregate(1, new Watermark(12L));
        Assertions.assertThat(watermarkAggregator.getAggregatedWatermark().getTimestamp()).isEqualTo(10L);
        watermarkAggregator.aggregate(2, new Watermark(13L));
        Assertions.assertThat(watermarkAggregator.getAggregatedWatermark().getTimestamp()).isEqualTo(10L);
        watermarkAggregator.aggregate(1, new Watermark(9L));
        Assertions.assertThat(watermarkAggregator.getAggregatedWatermark().getTimestamp()).isEqualTo(9L);
        watermarkAggregator.aggregate(1, new Watermark(Long.MAX_VALUE));
        Assertions.assertThat(watermarkAggregator.getAggregatedWatermark().getTimestamp()).isEqualTo(10L);
        watermarkAggregator.aggregate(1, new Watermark(8L));
        Assertions.assertThat(watermarkAggregator.getAggregatedWatermark().getTimestamp()).isEqualTo(8L);
        watermarkAggregator.aggregate(1, new Watermark(20L));
        Assertions.assertThat(watermarkAggregator.getAggregatedWatermark().getTimestamp()).isEqualTo(10L);
        watermarkAggregator.aggregate(0, new Watermark(23L));
        Assertions.assertThat(watermarkAggregator.getAggregatedWatermark().getTimestamp()).isEqualTo(13L);
        watermarkAggregator.aggregate(2, new Watermark(22L));
        Assertions.assertThat(watermarkAggregator.getAggregatedWatermark().getTimestamp()).isEqualTo(20L);
    }

    @Test
    void testWatermarkAggregatorRandomly() {
        testWatermarkAggregatorRandomly(20, HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, true, true);
        testWatermarkAggregatorRandomly(20, HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, true, false);
        testWatermarkAggregatorRandomly(20, 2000, true, true);
        testWatermarkAggregatorRandomly(20, 2000, true, false);
        testWatermarkAggregatorRandomly(20, 5000, true, true);
        testWatermarkAggregatorRandomly(20, 5000, true, false);
        testWatermarkAggregatorRandomly(10, 10000, true, true);
        testWatermarkAggregatorRandomly(10, 10000, true, false);
    }

    private void testWatermarkAggregatorRandomly(int i, int i2, boolean z, boolean z2) {
        SourceCoordinator.WatermarkAggregator<Integer> watermarkAggregator = new SourceCoordinator.WatermarkAggregator<>();
        HashMap hashMap = new HashMap();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= i) {
                return;
            }
            for (int i3 = 0; i3 < i2; i3++) {
                long randomTimestamp = getRandomTimestamp(z2);
                watermarkAggregator.aggregate(Integer.valueOf(i3), new Watermark(randomTimestamp));
                if (z) {
                    hashMap.put(Integer.valueOf(i3), Long.valueOf(randomTimestamp));
                    assertAggregatedWatermark(watermarkAggregator, hashMap);
                }
            }
            j = j2 + 1;
        }
    }

    private long getRandomTimestamp(boolean z) {
        return (z && RANDOM.nextInt(100) == 0) ? RANDOM.nextBoolean() ? Long.MAX_VALUE : Long.MIN_VALUE : ((System.currentTimeMillis() / 1000) * 1000) + RANDOM.nextInt(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS);
    }

    private void assertAggregatedWatermark(SourceCoordinator.WatermarkAggregator<Integer> watermarkAggregator, Map<Integer, Long> map) {
        Assertions.assertThat(watermarkAggregator.getAggregatedWatermark().getTimestamp()).isEqualTo(map.values().stream().min((v0, v1) -> {
            return v0.compareTo(v1);
        }).orElseThrow(IllegalStateException::new).longValue());
    }

    private SourceCoordinator<?, ?> getAndStartNewSourceCoordinator(WatermarkAlignmentParams watermarkAlignmentParams, AutoCloseableRegistry autoCloseableRegistry) throws Exception {
        SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> newSourceCoordinator = getNewSourceCoordinator(watermarkAlignmentParams);
        autoCloseableRegistry.registerCloseable(newSourceCoordinator);
        newSourceCoordinator.start();
        setAllReaderTasksReady(newSourceCoordinator);
        return newSourceCoordinator;
    }

    private void reportWatermarkEvent(SourceCoordinator<?, ?> sourceCoordinator, int i, long j) {
        sourceCoordinator.handleEventFromOperator(i, 0, new ReportedWatermarkEvent(j));
        CoordinatorTestUtils.waitForCoordinatorToProcessActions(sourceCoordinator.getContext());
        sourceCoordinator.announceCombinedWatermark();
    }

    private void assertLatestWatermarkAlignmentEvent(int i, long j) {
        List<OperatorEvent> sentEventsForSubtask = this.receivingTasks.getSentEventsForSubtask(i);
        Assertions.assertThat(sentEventsForSubtask).isNotEmpty();
        Assertions.assertThat(sentEventsForSubtask.get(sentEventsForSubtask.size() - 1)).isEqualTo(new WatermarkAlignmentEvent(j));
    }
}
