/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl;
import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class SubtaskGatewayImplTest {
    @Test
    public void eventsPassThroughOpenGateway() {
        EventReceivingTasks receiver = EventReceivingTasks.createForRunningTasks();
        SubtaskGatewayImpl gateway = new SubtaskGatewayImpl(SubtaskGatewayImplTest.getUniqueElement(receiver.getAccessesForSubtask(11)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        TestOperatorEvent event = new TestOperatorEvent();
        CompletableFuture future = gateway.sendEvent((OperatorEvent)event);
        Assertions.assertThat(receiver.events).containsExactly((Object[])new EventReceivingTasks.EventWithSubtask[]{new EventReceivingTasks.EventWithSubtask(event, 11)});
        Assertions.assertThat((CompletableFuture)future).isDone();
    }

    @Test
    public void closingMarkedGateway() {
        EventReceivingTasks receiver = EventReceivingTasks.createForRunningTasks();
        SubtaskGatewayImpl gateway = new SubtaskGatewayImpl(SubtaskGatewayImplTest.getUniqueElement(receiver.getAccessesForSubtask(11)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        gateway.markForCheckpoint(200L);
        boolean isClosed = gateway.tryCloseGateway(200L);
        Assertions.assertThat((boolean)isClosed).isTrue();
    }

    @Test
    public void notClosingUnmarkedGateway() {
        EventReceivingTasks receiver = EventReceivingTasks.createForRunningTasks();
        SubtaskGatewayImpl gateway = new SubtaskGatewayImpl(SubtaskGatewayImplTest.getUniqueElement(receiver.getAccessesForSubtask(11)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        boolean isClosed = gateway.tryCloseGateway(123L);
        Assertions.assertThat((boolean)isClosed).isFalse();
    }

    @Test
    public void notClosingGatewayForOtherMark() {
        EventReceivingTasks receiver = EventReceivingTasks.createForRunningTasks();
        SubtaskGatewayImpl gateway = new SubtaskGatewayImpl(SubtaskGatewayImplTest.getUniqueElement(receiver.getAccessesForSubtask(11)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        gateway.markForCheckpoint(100L);
        boolean isClosed = gateway.tryCloseGateway(123L);
        Assertions.assertThat((boolean)isClosed).isFalse();
    }

    @Test
    public void eventsBlockedByClosedGateway() {
        EventReceivingTasks receiver = EventReceivingTasks.createForRunningTasks();
        SubtaskGatewayImpl gateway = new SubtaskGatewayImpl(SubtaskGatewayImplTest.getUniqueElement(receiver.getAccessesForSubtask(1)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        gateway.markForCheckpoint(1L);
        gateway.tryCloseGateway(1L);
        CompletableFuture future = gateway.sendEvent((OperatorEvent)new TestOperatorEvent());
        Assertions.assertThat(receiver.events).isEmpty();
        Assertions.assertThat((CompletableFuture)future).isNotDone();
    }

    @Test
    public void eventsReleasedAfterOpeningGateway() {
        EventReceivingTasks receiver = EventReceivingTasks.createForRunningTasks();
        SubtaskGatewayImpl gateway0 = new SubtaskGatewayImpl(SubtaskGatewayImplTest.getUniqueElement(receiver.getAccessesForSubtask(0)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        SubtaskGatewayImpl gateway3 = new SubtaskGatewayImpl(SubtaskGatewayImplTest.getUniqueElement(receiver.getAccessesForSubtask(3)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        List<SubtaskGatewayImpl> gateways = Arrays.asList(gateway3, gateway0);
        gateways.forEach(x -> x.markForCheckpoint(17L));
        gateways.forEach(x -> x.tryCloseGateway(17L));
        TestOperatorEvent event1 = new TestOperatorEvent();
        TestOperatorEvent event2 = new TestOperatorEvent();
        CompletableFuture future1 = gateway3.sendEvent((OperatorEvent)event1);
        CompletableFuture future2 = gateway0.sendEvent((OperatorEvent)event2);
        gateways.forEach(SubtaskGatewayImpl::openGatewayAndUnmarkAllCheckpoint);
        Assertions.assertThat(receiver.events).containsExactly((Object[])new EventReceivingTasks.EventWithSubtask[]{new EventReceivingTasks.EventWithSubtask(event1, 3), new EventReceivingTasks.EventWithSubtask(event2, 0)});
        Assertions.assertThat((CompletableFuture)future1).isDone();
        Assertions.assertThat((CompletableFuture)future2).isDone();
    }

    @Test
    public void releasedEventsForwardSendFailures() {
        EventReceivingTasks receiver = EventReceivingTasks.createForRunningTasksFailingRpcs((Throwable)new FlinkException("test"));
        SubtaskGatewayImpl gateway = new SubtaskGatewayImpl(SubtaskGatewayImplTest.getUniqueElement(receiver.getAccessesForSubtask(10)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new IncompleteFuturesTracker());
        gateway.markForCheckpoint(17L);
        gateway.tryCloseGateway(17L);
        CompletableFuture future = gateway.sendEvent((OperatorEvent)new TestOperatorEvent());
        gateway.openGatewayAndUnmarkAllCheckpoint();
        Assertions.assertThat((CompletableFuture)future).isCompletedExceptionally();
    }

    private static <T> T getUniqueElement(Collection<T> collection) {
        Iterator<T> iterator = collection.iterator();
        T element = iterator.next();
        Preconditions.checkState((!iterator.hasNext() ? 1 : 0) != 0);
        return element;
    }
}

