package org.apache.flink.runtime.io.network;

import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/TaskEventDispatcherTest.class */
class TaskEventDispatcherTest {

    /* loaded from: input_file:org/apache/flink/runtime/io/network/TaskEventDispatcherTest$OneShotEventListener.class */
    private static class OneShotEventListener implements EventListener<TaskEvent> {
        private final TaskEvent expected;
        boolean fired = false;

        OneShotEventListener(TaskEvent taskEvent) {
            this.expected = taskEvent;
        }

        public void onEvent(TaskEvent taskEvent) {
            Preconditions.checkState(!this.fired, "Should only fire once");
            this.fired = true;
            Preconditions.checkArgument(taskEvent == this.expected, "Fired on unexpected event: %s (expected: %s)", new Object[]{taskEvent, this.expected});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/TaskEventDispatcherTest$ZeroShotEventListener.class */
    public static class ZeroShotEventListener implements EventListener<TaskEvent> {
        private ZeroShotEventListener() {
        }

        public void onEvent(TaskEvent taskEvent) {
            throw new IllegalStateException("Should never fire");
        }
    }

    TaskEventDispatcherTest() {
    }

    @Test
    void registerPartitionTwice() {
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        taskEventDispatcher.registerPartition(resultPartitionID);
        Assertions.assertThatThrownBy(() -> {
            taskEventDispatcher.registerPartition(resultPartitionID);
        }).hasMessageContaining("already registered at task event dispatcher").isInstanceOf(IllegalStateException.class);
    }

    @Test
    void subscribeToEventNotRegistered() {
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        Assertions.assertThatThrownBy(() -> {
            taskEventDispatcher.subscribeToEvent(new ResultPartitionID(), new ZeroShotEventListener(), TaskEvent.class);
        }).hasMessageContaining("not registered at task event dispatcher").isInstanceOf(IllegalStateException.class);
    }

    @Test
    void publishSubscribe() {
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        AllWorkersDoneEvent allWorkersDoneEvent = new AllWorkersDoneEvent();
        TerminationEvent terminationEvent = new TerminationEvent();
        Assertions.assertThat(taskEventDispatcher.publish(resultPartitionID, allWorkersDoneEvent)).isFalse();
        taskEventDispatcher.registerPartition(resultPartitionID);
        taskEventDispatcher.registerPartition(resultPartitionID2);
        Assertions.assertThat(taskEventDispatcher.publish(resultPartitionID, allWorkersDoneEvent)).isTrue();
        OneShotEventListener oneShotEventListener = new OneShotEventListener(allWorkersDoneEvent);
        ZeroShotEventListener zeroShotEventListener = new ZeroShotEventListener();
        ZeroShotEventListener zeroShotEventListener2 = new ZeroShotEventListener();
        OneShotEventListener oneShotEventListener2 = new OneShotEventListener(terminationEvent);
        taskEventDispatcher.subscribeToEvent(resultPartitionID, oneShotEventListener, AllWorkersDoneEvent.class);
        taskEventDispatcher.subscribeToEvent(resultPartitionID2, zeroShotEventListener, AllWorkersDoneEvent.class);
        taskEventDispatcher.subscribeToEvent(resultPartitionID, zeroShotEventListener2, TaskEvent.class);
        taskEventDispatcher.subscribeToEvent(resultPartitionID, oneShotEventListener2, TerminationEvent.class);
        Assertions.assertThat(taskEventDispatcher.publish(resultPartitionID, allWorkersDoneEvent)).isTrue();
        Assertions.assertThat(oneShotEventListener.fired).withFailMessage("listener should have fired for AllWorkersDoneEvent", new Object[0]).isTrue();
        Assertions.assertThat(oneShotEventListener2.fired).withFailMessage("listener should not have fired for AllWorkersDoneEvent", new Object[0]).isFalse();
        Assertions.assertThat(taskEventDispatcher.publish(resultPartitionID, terminationEvent)).isTrue();
        Assertions.assertThat(oneShotEventListener2.fired).withFailMessage("listener should have fired for TerminationEvent", new Object[0]).isTrue();
    }

    @Test
    void unregisterPartition() {
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        AllWorkersDoneEvent allWorkersDoneEvent = new AllWorkersDoneEvent();
        Assertions.assertThat(taskEventDispatcher.publish(resultPartitionID, allWorkersDoneEvent)).isFalse();
        taskEventDispatcher.registerPartition(resultPartitionID);
        taskEventDispatcher.registerPartition(resultPartitionID2);
        OneShotEventListener oneShotEventListener = new OneShotEventListener(allWorkersDoneEvent);
        ZeroShotEventListener zeroShotEventListener = new ZeroShotEventListener();
        OneShotEventListener oneShotEventListener2 = new OneShotEventListener(allWorkersDoneEvent);
        taskEventDispatcher.subscribeToEvent(resultPartitionID, oneShotEventListener, AllWorkersDoneEvent.class);
        taskEventDispatcher.subscribeToEvent(resultPartitionID2, zeroShotEventListener, AllWorkersDoneEvent.class);
        taskEventDispatcher.subscribeToEvent(resultPartitionID, oneShotEventListener2, AllWorkersDoneEvent.class);
        taskEventDispatcher.unregisterPartition(resultPartitionID2);
        Assertions.assertThat(taskEventDispatcher.publish(resultPartitionID, allWorkersDoneEvent)).isTrue();
        Assertions.assertThat(oneShotEventListener.fired).withFailMessage("listener should have fired for AllWorkersDoneEvent", new Object[0]).isTrue();
        Assertions.assertThat(oneShotEventListener2.fired).withFailMessage("listener should have fired for AllWorkersDoneEvent", new Object[0]).isTrue();
        Assertions.assertThat(taskEventDispatcher.publish(resultPartitionID2, allWorkersDoneEvent)).isFalse();
    }

    @Test
    void clearAll() throws Exception {
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        taskEventDispatcher.registerPartition(resultPartitionID);
        taskEventDispatcher.subscribeToEvent(resultPartitionID, new ZeroShotEventListener(), AllWorkersDoneEvent.class);
        taskEventDispatcher.clearAll();
        Assertions.assertThat(taskEventDispatcher.publish(resultPartitionID, new AllWorkersDoneEvent())).isFalse();
    }
}
