package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingPayload;
import org.apache.flink.runtime.jobmaster.slotpool.DummyPayload;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.BiConsumerWithException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.class */
public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
    private static final Time ALLOCATION_TIMEOUT = Time.milliseconds(100);
    private static final ResourceProfile RESOURCE_PROFILE = ResourceProfile.fromResources(3.0d, 5);
    private static final ExecutionVertexID EV1 = ExecutionGraphTestUtils.createRandomExecutionVertexId();
    private static final ExecutionVertexID EV2 = ExecutionGraphTestUtils.createRandomExecutionVertexId();
    private static final ExecutionVertexID EV3 = ExecutionGraphTestUtils.createRandomExecutionVertexId();
    private static final ExecutionVertexID EV4 = ExecutionGraphTestUtils.createRandomExecutionVertexId();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest$AllocationContext.class */
    public static class AllocationContext {
        private final TestingPhysicalSlotProvider slotProvider;
        private final TestingSlotSharingStrategy slotSharingStrategy;
        private final SlotSharingExecutionSlotAllocator allocator;
        private final TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest$AllocationContext$Builder.class */
        public static class Builder {
            private final Map<ExecutionVertexID[], ResourceProfile> groups;
            private boolean slotWillBeOccupiedIndefinitely;
            private PhysicalSlotRequestBulkChecker bulkChecker;
            private TestingPhysicalSlotProvider physicalSlotProvider;

            private Builder() {
                this.groups = new HashMap();
                this.slotWillBeOccupiedIndefinitely = false;
                this.bulkChecker = new TestingPhysicalSlotRequestBulkChecker();
                this.physicalSlotProvider = TestingPhysicalSlotProvider.createWithInfiniteSlotCreation();
            }

            /* JADX INFO: Access modifiers changed from: private */
            public Builder addGroup(ExecutionVertexID... executionVertexIDArr) {
                this.groups.put(executionVertexIDArr, ResourceProfile.UNKNOWN);
                return this;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public Builder addGroupAndResource(ResourceProfile resourceProfile, ExecutionVertexID... executionVertexIDArr) {
                this.groups.put(executionVertexIDArr, resourceProfile);
                return this;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public Builder setSlotWillBeOccupiedIndefinitely(boolean z) {
                this.slotWillBeOccupiedIndefinitely = z;
                return this;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public Builder withBulkChecker(PhysicalSlotRequestBulkChecker physicalSlotRequestBulkChecker) {
                this.bulkChecker = physicalSlotRequestBulkChecker;
                return this;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public Builder withPhysicalSlotProvider(TestingPhysicalSlotProvider testingPhysicalSlotProvider) {
                this.physicalSlotProvider = testingPhysicalSlotProvider;
                return this;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public AllocationContext build() {
                TestingSharedSlotProfileRetrieverFactory testingSharedSlotProfileRetrieverFactory = new TestingSharedSlotProfileRetrieverFactory();
                TestingSlotSharingStrategy createWithGroupsAndResources = TestingSlotSharingStrategy.createWithGroupsAndResources(this.groups);
                return new AllocationContext(this.physicalSlotProvider, createWithGroupsAndResources, new SlotSharingExecutionSlotAllocator(this.physicalSlotProvider, this.slotWillBeOccupiedIndefinitely, createWithGroupsAndResources, testingSharedSlotProfileRetrieverFactory, this.bulkChecker, SlotSharingExecutionSlotAllocatorTest.ALLOCATION_TIMEOUT, executionVertexID -> {
                    return SlotSharingExecutionSlotAllocatorTest.RESOURCE_PROFILE;
                }), testingSharedSlotProfileRetrieverFactory);
            }
        }

        private AllocationContext(TestingPhysicalSlotProvider testingPhysicalSlotProvider, TestingSlotSharingStrategy testingSlotSharingStrategy, SlotSharingExecutionSlotAllocator slotSharingExecutionSlotAllocator, TestingSharedSlotProfileRetrieverFactory testingSharedSlotProfileRetrieverFactory) {
            this.slotProvider = testingPhysicalSlotProvider;
            this.slotSharingStrategy = testingSlotSharingStrategy;
            this.allocator = slotSharingExecutionSlotAllocator;
            this.slotProfileRetrieverFactory = testingSharedSlotProfileRetrieverFactory;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public SlotSharingExecutionSlotAllocator getAllocator() {
            return this.allocator;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<SlotExecutionVertexAssignment> allocateSlotsFor(ExecutionVertexID... executionVertexIDArr) {
            return this.allocator.allocateSlotsFor(Arrays.asList(executionVertexIDArr));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public TestingSlotSharingStrategy getSlotSharingStrategy() {
            return this.slotSharingStrategy;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public TestingPhysicalSlotProvider getSlotProvider() {
            return this.slotProvider;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public TestingSharedSlotProfileRetrieverFactory getSlotProfileRetrieverFactory() {
            return this.slotProfileRetrieverFactory;
        }

        private static Builder newBuilder() {
            return new Builder();
        }

        static /* synthetic */ Builder access$000() {
            return newBuilder();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest$TestingSharedSlotProfileRetrieverFactory.class */
    public static class TestingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
        private final List<Set<ExecutionVertexID>> askedBulks;
        private final List<ExecutionSlotSharingGroup> askedGroups;

        private TestingSharedSlotProfileRetrieverFactory() {
            this.askedBulks = new ArrayList();
            this.askedGroups = new ArrayList();
        }

        public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID> set) {
            this.askedBulks.add(set);
            return (executionSlotSharingGroup, resourceProfile) -> {
                this.askedGroups.add(executionSlotSharingGroup);
                return SlotProfileTestingUtils.noLocality(resourceProfile);
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<Set<ExecutionVertexID>> getAskedBulks() {
            return Collections.unmodifiableList(this.askedBulks);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<ExecutionSlotSharingGroup> getAskedGroups() {
            return Collections.unmodifiableList(this.askedGroups);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest$TestingSlotSharingStrategy.class */
    public static class TestingSlotSharingStrategy implements SlotSharingStrategy {
        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroups;

        private TestingSlotSharingStrategy(Map<ExecutionVertexID, ExecutionSlotSharingGroup> map) {
            this.executionSlotSharingGroups = map;
        }

        public ExecutionSlotSharingGroup getExecutionSlotSharingGroup(ExecutionVertexID executionVertexID) {
            return this.executionSlotSharingGroups.get(executionVertexID);
        }

        public Set<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups() {
            return new HashSet(this.executionSlotSharingGroups.values());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static TestingSlotSharingStrategy createWithGroupsAndResources(Map<ExecutionVertexID[], ResourceProfile> map) {
            HashMap hashMap = new HashMap();
            for (Map.Entry<ExecutionVertexID[], ResourceProfile> entry : map.entrySet()) {
                ExecutionSlotSharingGroup executionSlotSharingGroup = new ExecutionSlotSharingGroup();
                executionSlotSharingGroup.setResourceProfile(entry.getValue());
                for (ExecutionVertexID executionVertexID : entry.getKey()) {
                    executionSlotSharingGroup.addVertex(executionVertexID);
                    hashMap.put(executionVertexID, executionSlotSharingGroup);
                }
            }
            return new TestingSlotSharingStrategy(hashMap);
        }
    }

    @Test
    public void testSlotProfileRequestAskedBulkAndGroup() {
        AllocationContext build = AllocationContext.access$000().addGroup(EV1, EV2).build();
        ExecutionSlotSharingGroup executionSlotSharingGroup = build.getSlotSharingStrategy().getExecutionSlotSharingGroup(EV1);
        build.allocateSlotsFor(EV1, EV2);
        List askedBulks = build.getSlotProfileRetrieverFactory().getAskedBulks();
        Assert.assertThat(askedBulks, Matchers.hasSize(1));
        Assert.assertThat(askedBulks.get(0), Matchers.containsInAnyOrder(new ExecutionVertexID[]{EV1, EV2}));
        Assert.assertThat(build.getSlotProfileRetrieverFactory().getAskedGroups(), Matchers.containsInAnyOrder(new ExecutionSlotSharingGroup[]{executionSlotSharingGroup}));
    }

    @Test
    public void testSlotRequestProfile() {
        AllocationContext build = AllocationContext.access$000().addGroup(EV1, EV2, EV3).build();
        ResourceProfile multiply = RESOURCE_PROFILE.multiply(3);
        build.allocateSlotsFor(EV1, EV2);
        Optional<PhysicalSlotRequest> findFirst = build.getSlotProvider().getRequests().values().stream().findFirst();
        Assert.assertThat(Boolean.valueOf(findFirst.isPresent()), CoreMatchers.is(true));
        findFirst.ifPresent(physicalSlotRequest -> {
            Assert.assertThat(physicalSlotRequest.getSlotProfile().getPhysicalSlotResourceProfile(), CoreMatchers.is(multiply));
        });
    }

    @Test
    public void testAllocatePhysicalSlotForNewSharedSlot() {
        AllocationContext build = AllocationContext.access$000().addGroup(EV1, EV2).addGroup(EV3, EV4).build();
        Assert.assertThat(getAssignIds(build.allocateSlotsFor(EV1, EV2, EV3, EV4)), Matchers.containsInAnyOrder(new ExecutionVertexID[]{EV1, EV2, EV3, EV4}));
        Assert.assertThat(build.getSlotProvider().getRequests().keySet(), Matchers.hasSize(2));
    }

    @Test
    public void testAllocateLogicalSlotFromAvailableSharedSlot() {
        AllocationContext build = AllocationContext.access$000().addGroup(EV1, EV2).build();
        build.allocateSlotsFor(EV1);
        Assert.assertThat(getAssignIds(build.allocateSlotsFor(EV2)), Matchers.containsInAnyOrder(new ExecutionVertexID[]{EV2}));
        Assert.assertThat(build.getSlotProvider().getRequests().keySet(), Matchers.hasSize(1));
    }

    @Test
    public void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws ExecutionException, InterruptedException {
        AllocationContext build = AllocationContext.access$000().addGroup(EV1).build();
        Assert.assertThat(Boolean.valueOf(((SlotExecutionVertexAssignment) build.allocateSlotsFor(EV1).get(0)).getLogicalSlotFuture().get() == ((SlotExecutionVertexAssignment) build.allocateSlotsFor(EV1).get(0)).getLogicalSlotFuture().get()), CoreMatchers.is(true));
    }

    @Test
    public void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() {
        AllocationContext build = AllocationContext.access$000().addGroup(EV1).withPhysicalSlotProvider(TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation()).build();
        CompletableFuture logicalSlotFuture = ((SlotExecutionVertexAssignment) build.allocateSlotsFor(EV1).get(0)).getLogicalSlotFuture();
        SlotRequestId slotRequestId = build.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
        Assert.assertThat(Boolean.valueOf(logicalSlotFuture.isDone()), CoreMatchers.is(false));
        build.getSlotProvider().getResponses().get(slotRequestId).completeExceptionally(new Throwable());
        Assert.assertThat(Boolean.valueOf(logicalSlotFuture.isCompletedExceptionally()), CoreMatchers.is(true));
        build.allocateSlotsFor(EV1);
        Assert.assertThat(build.getSlotProvider().getRequests().keySet(), Matchers.hasSize(2));
    }

    @Test
    public void testSlotWillBeOccupiedIndefinitelyFalse() throws ExecutionException, InterruptedException {
        testSlotWillBeOccupiedIndefinitely(false);
    }

    @Test
    public void testSlotWillBeOccupiedIndefinitelyTrue() throws ExecutionException, InterruptedException {
        testSlotWillBeOccupiedIndefinitely(true);
    }

    private static void testSlotWillBeOccupiedIndefinitely(boolean z) throws ExecutionException, InterruptedException {
        AllocationContext build = AllocationContext.access$000().addGroup(EV1).setSlotWillBeOccupiedIndefinitely(z).build();
        build.allocateSlotsFor(EV1);
        PhysicalSlotRequest firstRequestOrFail = build.getSlotProvider().getFirstRequestOrFail();
        Assert.assertThat(Boolean.valueOf(firstRequestOrFail.willSlotBeOccupiedIndefinitely()), CoreMatchers.is(Boolean.valueOf(z)));
        TestingPhysicalSlot testingPhysicalSlot = build.getSlotProvider().getResponses().get(firstRequestOrFail.getSlotRequestId()).get();
        Assert.assertThat(testingPhysicalSlot.getPayload(), Matchers.notNullValue());
        Assert.assertThat(Boolean.valueOf(testingPhysicalSlot.getPayload().willOccupySlotIndefinitely()), CoreMatchers.is(Boolean.valueOf(z)));
    }

    @Test
    public void testReturningLogicalSlotsRemovesSharedSlot() throws Exception {
        testLogicalSlotRequestCancellationOrRelease(false, true, (allocationContext, slotExecutionVertexAssignment) -> {
            ((LogicalSlot) slotExecutionVertexAssignment.getLogicalSlotFuture().get()).releaseSlot((Throwable) null);
        });
    }

    @Test
    public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() throws Exception {
        testLogicalSlotRequestCancellationOrRelease(true, true, (allocationContext, slotExecutionVertexAssignment) -> {
            allocationContext.getAllocator().cancel(slotExecutionVertexAssignment.getExecutionVertexId());
            try {
                slotExecutionVertexAssignment.getLogicalSlotFuture().get();
                Assert.fail("The logical future must finish with the cancellation exception");
            } catch (InterruptedException | ExecutionException e) {
                Assert.assertThat(e.getCause(), Matchers.instanceOf(CancellationException.class));
            }
        });
    }

    @Test
    public void testCompletedLogicalSlotCancelationDoesNotCancelPhysicalSlotRequestAndDoesNotRemoveSharedSlot() throws Exception {
        testLogicalSlotRequestCancellationOrRelease(false, false, (allocationContext, slotExecutionVertexAssignment) -> {
            allocationContext.getAllocator().cancel(slotExecutionVertexAssignment.getExecutionVertexId());
            slotExecutionVertexAssignment.getLogicalSlotFuture().get();
        });
    }

    private static void testLogicalSlotRequestCancellationOrRelease(boolean z, boolean z2, BiConsumerWithException<AllocationContext, SlotExecutionVertexAssignment, Exception> biConsumerWithException) throws Exception {
        AllocationContext.Builder addGroup = AllocationContext.access$000().addGroup(EV1, EV2, EV3);
        if (z) {
            addGroup.withPhysicalSlotProvider(TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation());
        }
        AllocationContext build = addGroup.build();
        List allocateSlotsFor = build.allocateSlotsFor(EV1, EV2);
        Assert.assertThat(build.getSlotProvider().getRequests().keySet(), Matchers.hasSize(1));
        biConsumerWithException.accept(build, allocateSlotsFor.get(0));
        List allocateSlotsFor2 = build.allocateSlotsFor(EV1, EV2);
        Assert.assertThat(build.getSlotProvider().getRequests().keySet(), Matchers.hasSize(1));
        Iterator it = allocateSlotsFor2.iterator();
        while (it.hasNext()) {
            biConsumerWithException.accept(build, (SlotExecutionVertexAssignment) it.next());
        }
        Assert.assertThat(Boolean.valueOf(build.getSlotProvider().getCancellations().containsKey(build.getSlotProvider().getFirstRequestOrFail().getSlotRequestId())), CoreMatchers.is(Boolean.valueOf(z2)));
        build.allocateSlotsFor(EV3);
        Assert.assertThat(build.getSlotProvider().getRequests().keySet(), Matchers.hasSize(z2 ? 2 : 1));
    }

    @Test
    public void testPhysicalSlotReleaseLogicalSlots() throws ExecutionException, InterruptedException {
        AllocationContext build = AllocationContext.access$000().addGroup(EV1, EV2).build();
        List list = (List) build.allocateSlotsFor(EV1, EV2).stream().map(slotExecutionVertexAssignment -> {
            TestingPayload testingPayload = new TestingPayload();
            slotExecutionVertexAssignment.getLogicalSlotFuture().thenAccept(logicalSlot -> {
                logicalSlot.tryAssignPayload(testingPayload);
            });
            return testingPayload;
        }).collect(Collectors.toList());
        SlotRequestId slotRequestId = build.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
        TestingPhysicalSlot testingPhysicalSlot = build.getSlotProvider().getFirstResponseOrFail().get();
        Assert.assertThat(Boolean.valueOf(list.stream().allMatch(testingPayload -> {
            return testingPayload.getTerminalStateFuture().isDone();
        })), CoreMatchers.is(false));
        Assert.assertThat(testingPhysicalSlot.getPayload(), Matchers.notNullValue());
        testingPhysicalSlot.getPayload().release(new Throwable());
        Assert.assertThat(Boolean.valueOf(list.stream().allMatch(testingPayload2 -> {
            return testingPayload2.getTerminalStateFuture().isDone();
        })), CoreMatchers.is(true));
        Assert.assertThat(Boolean.valueOf(build.getSlotProvider().getCancellations().containsKey(slotRequestId)), CoreMatchers.is(true));
        build.allocateSlotsFor(EV1, EV2);
        Assert.assertThat(build.getSlotProvider().getRequests().keySet(), Matchers.hasSize(2));
    }

    @Test
    public void testSchedulePendingRequestBulkTimeoutCheck() {
        TestingPhysicalSlotRequestBulkChecker testingPhysicalSlotRequestBulkChecker = new TestingPhysicalSlotRequestBulkChecker();
        createBulkCheckerContextWithEv12GroupAndEv3Group(testingPhysicalSlotRequestBulkChecker).allocateSlotsFor(EV1, EV3);
        PhysicalSlotRequestBulk bulk = testingPhysicalSlotRequestBulkChecker.getBulk();
        Assert.assertThat(bulk.getPendingRequests(), Matchers.hasSize(2));
        Assert.assertThat(bulk.getPendingRequests(), Matchers.containsInAnyOrder(new ResourceProfile[]{RESOURCE_PROFILE.multiply(2), RESOURCE_PROFILE}));
        Assert.assertThat(bulk.getAllocationIdsOfFulfilledRequests(), Matchers.hasSize(0));
        Assert.assertThat(testingPhysicalSlotRequestBulkChecker.getTimeout(), CoreMatchers.is(ALLOCATION_TIMEOUT));
    }

    @Test
    public void testRequestFulfilledInBulk() {
        TestingPhysicalSlotRequestBulkChecker testingPhysicalSlotRequestBulkChecker = new TestingPhysicalSlotRequestBulkChecker();
        AllocationContext createBulkCheckerContextWithEv12GroupAndEv3Group = createBulkCheckerContextWithEv12GroupAndEv3Group(testingPhysicalSlotRequestBulkChecker);
        createBulkCheckerContextWithEv12GroupAndEv3Group.allocateSlotsFor(EV1, EV3);
        AllocationID allocationID = new AllocationID();
        ResourceProfile fulfilOneOfTwoSlotRequestsAndGetPendingProfile = fulfilOneOfTwoSlotRequestsAndGetPendingProfile(createBulkCheckerContextWithEv12GroupAndEv3Group, allocationID);
        PhysicalSlotRequestBulk bulk = testingPhysicalSlotRequestBulkChecker.getBulk();
        Assert.assertThat(bulk.getPendingRequests(), Matchers.hasSize(1));
        Assert.assertThat(bulk.getPendingRequests(), Matchers.containsInAnyOrder(new ResourceProfile[]{fulfilOneOfTwoSlotRequestsAndGetPendingProfile}));
        Assert.assertThat(bulk.getAllocationIdsOfFulfilledRequests(), Matchers.hasSize(1));
        Assert.assertThat(bulk.getAllocationIdsOfFulfilledRequests(), Matchers.containsInAnyOrder(new AllocationID[]{allocationID}));
    }

    @Test
    public void testRequestBulkCancel() {
        TestingPhysicalSlotRequestBulkChecker testingPhysicalSlotRequestBulkChecker = new TestingPhysicalSlotRequestBulkChecker();
        AllocationContext createBulkCheckerContextWithEv12GroupAndEv3Group = createBulkCheckerContextWithEv12GroupAndEv3Group(testingPhysicalSlotRequestBulkChecker);
        List allocateSlotsFor = createBulkCheckerContextWithEv12GroupAndEv3Group.allocateSlotsFor(EV1, EV3);
        fulfilOneOfTwoSlotRequestsAndGetPendingProfile(createBulkCheckerContextWithEv12GroupAndEv3Group, new AllocationID());
        PhysicalSlotRequestBulk bulk = testingPhysicalSlotRequestBulkChecker.getBulk();
        List allocateSlotsFor2 = createBulkCheckerContextWithEv12GroupAndEv3Group.allocateSlotsFor(EV2);
        bulk.cancel(new Throwable());
        CompletableFuture logicalSlotFuture = ((SlotExecutionVertexAssignment) allocateSlotsFor.get(0)).getLogicalSlotFuture();
        boolean isCompletedExceptionally = logicalSlotFuture.isCompletedExceptionally();
        CompletableFuture logicalSlotFuture2 = ((SlotExecutionVertexAssignment) allocateSlotsFor.get(1)).getLogicalSlotFuture();
        boolean isCompletedExceptionally2 = logicalSlotFuture2.isCompletedExceptionally();
        releaseLogicalSlot(isCompletedExceptionally ? (LogicalSlot) logicalSlotFuture2.join() : (LogicalSlot) logicalSlotFuture.join());
        createBulkCheckerContextWithEv12GroupAndEv3Group.allocateSlotsFor(EV1, EV3);
        Assert.assertThat(createBulkCheckerContextWithEv12GroupAndEv3Group.getSlotProvider().getRequests().values(), Matchers.hasSize(3));
        Assert.assertThat(Boolean.valueOf(isCompletedExceptionally != isCompletedExceptionally2), CoreMatchers.is(true));
        Assert.assertThat(Boolean.valueOf(((SlotExecutionVertexAssignment) allocateSlotsFor2.get(0)).getLogicalSlotFuture().isCompletedExceptionally()), CoreMatchers.is(false));
    }

    private static void releaseLogicalSlot(LogicalSlot logicalSlot) {
        logicalSlot.tryAssignPayload(new DummyPayload(CompletableFuture.completedFuture(null)));
        logicalSlot.releaseSlot(new Throwable());
    }

    @Test
    public void testBulkClearIfPhysicalSlotRequestFails() {
        TestingPhysicalSlotRequestBulkChecker testingPhysicalSlotRequestBulkChecker = new TestingPhysicalSlotRequestBulkChecker();
        AllocationContext createBulkCheckerContextWithEv12GroupAndEv3Group = createBulkCheckerContextWithEv12GroupAndEv3Group(testingPhysicalSlotRequestBulkChecker);
        createBulkCheckerContextWithEv12GroupAndEv3Group.allocateSlotsFor(EV1, EV3);
        createBulkCheckerContextWithEv12GroupAndEv3Group.getSlotProvider().getResultForRequestId(createBulkCheckerContextWithEv12GroupAndEv3Group.getSlotProvider().getFirstRequestOrFail().getSlotRequestId()).completeExceptionally(new Throwable());
        Assert.assertThat(testingPhysicalSlotRequestBulkChecker.getBulk().getPendingRequests(), Matchers.hasSize(0));
    }

    @Test
    public void failLogicalSlotsIfPhysicalSlotIsFailed() {
        TestingPhysicalSlotRequestBulkChecker testingPhysicalSlotRequestBulkChecker = new TestingPhysicalSlotRequestBulkChecker();
        AllocationContext build = AllocationContext.access$000().addGroup(EV1, EV2).withBulkChecker(testingPhysicalSlotRequestBulkChecker).withPhysicalSlotProvider(TestingPhysicalSlotProvider.createWithFailingPhysicalSlotCreation(new FlinkException("test failure"))).build();
        Iterator it = build.allocateSlotsFor(EV1, EV2).iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((SlotExecutionVertexAssignment) it.next()).getLogicalSlotFuture().isCompletedExceptionally());
        }
        Assert.assertThat(testingPhysicalSlotRequestBulkChecker.getBulk().getPendingRequests(), CoreMatchers.is(Matchers.empty()));
        Assert.assertThat(build.getSlotProvider().getCancellations().keySet(), CoreMatchers.is(build.getSlotProvider().getRequests().keySet()));
    }

    @Test
    public void testSlotRequestProfileFromExecutionSlotSharingGroup() {
        ResourceProfile fromResources = ResourceProfile.fromResources(1.0d, 10);
        ResourceProfile fromResources2 = ResourceProfile.fromResources(2.0d, 20);
        AllocationContext build = AllocationContext.access$000().addGroupAndResource(fromResources, EV1, EV3).addGroupAndResource(fromResources2, EV2, EV4).build();
        build.allocateSlotsFor(EV1, EV2);
        Assert.assertThat(Integer.valueOf(build.getSlotProvider().getRequests().values().size()), CoreMatchers.is(2));
        Assert.assertThat(build.getSlotProvider().getRequests().values().stream().map((v0) -> {
            return v0.getSlotProfile();
        }).map((v0) -> {
            return v0.getPhysicalSlotResourceProfile();
        }).collect(Collectors.toList()), Matchers.containsInAnyOrder(new ResourceProfile[]{fromResources, fromResources2}));
    }

    private static List<ExecutionVertexID> getAssignIds(Collection<SlotExecutionVertexAssignment> collection) {
        return (List) collection.stream().map((v0) -> {
            return v0.getExecutionVertexId();
        }).collect(Collectors.toList());
    }

    private static AllocationContext createBulkCheckerContextWithEv12GroupAndEv3Group(PhysicalSlotRequestBulkChecker physicalSlotRequestBulkChecker) {
        return AllocationContext.access$000().addGroup(EV1, EV2).addGroup(EV3).withBulkChecker(physicalSlotRequestBulkChecker).withPhysicalSlotProvider(TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation()).build();
    }

    private static ResourceProfile fulfilOneOfTwoSlotRequestsAndGetPendingProfile(AllocationContext allocationContext, AllocationID allocationID) {
        Map<SlotRequestId, PhysicalSlotRequest> requests = allocationContext.getSlotProvider().getRequests();
        ArrayList arrayList = new ArrayList(requests.keySet());
        Assert.assertThat(arrayList, Matchers.hasSize(2));
        SlotRequestId slotRequestId = (SlotRequestId) arrayList.get(0);
        SlotRequestId slotRequestId2 = (SlotRequestId) arrayList.get(1);
        allocationContext.getSlotProvider().getResultForRequestId(slotRequestId).complete(TestingPhysicalSlot.builder().withAllocationID(allocationID).build());
        return requests.get(slotRequestId2).getSlotProfile().getPhysicalSlotResourceProfile();
    }
}
