/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.DummyPayload;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.SharedSlot;
import org.apache.flink.runtime.scheduler.SharedSlotTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class SharedSlotTest
extends TestLogger {
    private static final ExecutionVertexID EV1 = ExecutionGraphTestUtils.createRandomExecutionVertexId();
    private static final ExecutionVertexID EV2 = ExecutionGraphTestUtils.createRandomExecutionVertexId();
    private static final ExecutionSlotSharingGroup SG = SharedSlotTestingUtils.createExecutionSlotSharingGroup(EV1, EV2);
    private static final SlotRequestId PHYSICAL_SLOT_REQUEST_ID = new SlotRequestId();
    private static final ResourceProfile RP = ResourceProfile.newBuilder().setCpuCores(2.0).build();

    @Test
    public void testCreation() {
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().slotWillBeOccupiedIndefinitely().build();
        Assert.assertThat((Object)sharedSlot.getPhysicalSlotRequestId(), (Matcher)CoreMatchers.is((Object)PHYSICAL_SLOT_REQUEST_ID));
        Assert.assertThat((Object)sharedSlot.getPhysicalSlotResourceProfile(), (Matcher)CoreMatchers.is((Object)RP));
        Assert.assertThat((Object)sharedSlot.willOccupySlotIndefinitely(), (Matcher)CoreMatchers.is((Object)true));
        Assert.assertThat((Object)sharedSlot.isEmpty(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void testAssignAsPayloadToPhysicalSlot() {
        CompletableFuture<TestingPhysicalSlot> slotContextFuture = new CompletableFuture<TestingPhysicalSlot>();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).build();
        TestingPhysicalSlot physicalSlot = new TestingPhysicalSlot(RP, new AllocationID());
        slotContextFuture.complete(physicalSlot);
        Assert.assertThat((Object)physicalSlot.getPayload(), (Matcher)CoreMatchers.is((Object)sharedSlot));
    }

    @Test
    public void testLogicalSlotAllocation() {
        CompletableFuture<TestingPhysicalSlot> slotContextFuture = new CompletableFuture<TestingPhysicalSlot>();
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).slotWillBeOccupiedIndefinitely().withExternalReleaseCallback(released::complete).build();
        CompletableFuture logicalSlotFuture = sharedSlot.allocateLogicalSlot(EV1);
        Assert.assertThat((Object)logicalSlotFuture.isDone(), (Matcher)CoreMatchers.is((Object)false));
        AllocationID allocationId = new AllocationID();
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        slotContextFuture.complete(new TestingPhysicalSlot(allocationId, taskManagerLocation, 3, taskManagerGateway, RP));
        Assert.assertThat((Object)sharedSlot.isEmpty(), (Matcher)CoreMatchers.is((Object)false));
        Assert.assertThat((Object)released.isDone(), (Matcher)CoreMatchers.is((Object)false));
        Assert.assertThat((Object)logicalSlotFuture.isDone(), (Matcher)CoreMatchers.is((Object)true));
        LogicalSlot logicalSlot = (LogicalSlot)logicalSlotFuture.join();
        Assert.assertThat((Object)logicalSlot.getAllocationId(), (Matcher)CoreMatchers.is((Object)allocationId));
        Assert.assertThat((Object)logicalSlot.getTaskManagerLocation(), (Matcher)CoreMatchers.is((Object)((Object)taskManagerLocation)));
        Assert.assertThat((Object)logicalSlot.getTaskManagerGateway(), (Matcher)CoreMatchers.is((Object)taskManagerGateway));
        Assert.assertThat((Object)logicalSlot.getLocality(), (Matcher)CoreMatchers.is((Object)Locality.UNKNOWN));
    }

    @Test
    public void testLogicalSlotFailureDueToPhysicalSlotFailure() throws InterruptedException {
        CompletableFuture slotContextFuture = new CompletableFuture();
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(released::complete).build();
        CompletableFuture logicalSlotFuture = sharedSlot.allocateLogicalSlot(EV1);
        Throwable cause = new Throwable();
        slotContextFuture.completeExceptionally(cause);
        Assert.assertThat((Object)logicalSlotFuture.isCompletedExceptionally(), (Matcher)CoreMatchers.is((Object)true));
        try {
            logicalSlotFuture.get();
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.is((Object)cause));
        }
        Assert.assertThat((Object)sharedSlot.isEmpty(), (Matcher)CoreMatchers.is((Object)true));
        Assert.assertThat((Object)released.isDone(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void testCancelCompletedLogicalSlotRequest() {
        CompletableFuture<TestingPhysicalSlot> slotContextFuture = new CompletableFuture<TestingPhysicalSlot>();
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(released::complete).build();
        CompletableFuture logicalSlotFuture = sharedSlot.allocateLogicalSlot(EV1);
        slotContextFuture.complete(new TestingPhysicalSlot(RP, new AllocationID()));
        sharedSlot.cancelLogicalSlotRequest(EV1, new Throwable());
        Assert.assertThat((Object)logicalSlotFuture.isCompletedExceptionally(), (Matcher)CoreMatchers.is((Object)false));
        Assert.assertThat((Object)sharedSlot.isEmpty(), (Matcher)CoreMatchers.is((Object)false));
        Assert.assertThat((Object)released.isDone(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void testCancelPendingLogicalSlotRequest() throws InterruptedException {
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withExternalReleaseCallback(released::complete).build();
        CompletableFuture logicalSlotFuture = sharedSlot.allocateLogicalSlot(EV1);
        Throwable cause = new Throwable();
        sharedSlot.cancelLogicalSlotRequest(EV1, cause);
        Assert.assertThat((Object)logicalSlotFuture.isCompletedExceptionally(), (Matcher)CoreMatchers.is((Object)true));
        try {
            logicalSlotFuture.get();
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.is((Object)cause));
        }
        Assert.assertThat((Object)sharedSlot.isEmpty(), (Matcher)CoreMatchers.is((Object)true));
        Assert.assertThat((Object)released.isDone(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void testReturnAllocatedLogicalSlot() {
        CompletableFuture<TestingPhysicalSlot> slotContextFuture = new CompletableFuture<TestingPhysicalSlot>();
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(released::complete).build();
        CompletableFuture logicalSlotFuture = sharedSlot.allocateLogicalSlot(EV1);
        slotContextFuture.complete(new TestingPhysicalSlot(RP, new AllocationID()));
        sharedSlot.returnLogicalSlot((LogicalSlot)logicalSlotFuture.join());
        Assert.assertThat((Object)sharedSlot.isEmpty(), (Matcher)CoreMatchers.is((Object)true));
        Assert.assertThat((Object)released.isDone(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void testReleaseIfPhysicalSlotRequestIsIncomplete() {
        CompletableFuture slotContextFuture = new CompletableFuture();
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(released::complete).build();
        sharedSlot.allocateLogicalSlot(EV1);
        try {
            sharedSlot.release(new Throwable());
            Assert.fail((String)"IllegalStateException is expected trying to release a shared slot with incomplete physical slot request");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        Assert.assertThat((Object)sharedSlot.isEmpty(), (Matcher)CoreMatchers.is((Object)false));
        Assert.assertThat((Object)released.isDone(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void testReleaseIfPhysicalSlotIsAllocated() {
        CompletableFuture<TestingPhysicalSlot> slotContextFuture = CompletableFuture.completedFuture(new TestingPhysicalSlot(RP, new AllocationID()));
        CompletableFuture released = new CompletableFuture();
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(released::complete).build();
        LogicalSlot logicalSlot = (LogicalSlot)sharedSlot.allocateLogicalSlot(EV1).join();
        CompletableFuture terminalFuture = new CompletableFuture();
        logicalSlot.tryAssignPayload((LogicalSlot.Payload)new DummyPayload(terminalFuture));
        Assert.assertThat((Object)terminalFuture.isDone(), (Matcher)CoreMatchers.is((Object)false));
        sharedSlot.release(new Throwable());
        Assert.assertThat((Object)terminalFuture.isDone(), (Matcher)CoreMatchers.is((Object)true));
        Assert.assertThat((Object)sharedSlot.isEmpty(), (Matcher)CoreMatchers.is((Object)true));
        Assert.assertThat((Object)released.isDone(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void tesDuplicatedReturnLogicalSlotFails() {
        CompletableFuture<TestingPhysicalSlot> slotContextFuture = CompletableFuture.completedFuture(new TestingPhysicalSlot(RP, new AllocationID()));
        AtomicInteger released = new AtomicInteger(0);
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(g -> released.incrementAndGet()).build();
        LogicalSlot logicalSlot = (LogicalSlot)sharedSlot.allocateLogicalSlot(EV1).join();
        sharedSlot.returnLogicalSlot(logicalSlot);
        try {
            sharedSlot.returnLogicalSlot(logicalSlot);
            Assert.fail((String)"Duplicated 'returnLogicalSlot' call should fail with IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testReleaseEmptyDoesNotCallAllocatorReleaseBack() {
        CompletableFuture<TestingPhysicalSlot> slotContextFuture = CompletableFuture.completedFuture(new TestingPhysicalSlot(RP, new AllocationID()));
        CompletableFuture<SharedSlot> sharedSlotReleaseFuture = new CompletableFuture<SharedSlot>();
        AtomicInteger released = new AtomicInteger(0);
        SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).withExternalReleaseCallback(g -> {
            ((SharedSlot)sharedSlotReleaseFuture.join()).release(new Throwable());
            released.incrementAndGet();
        }).build();
        sharedSlotReleaseFuture.complete(sharedSlot);
        LogicalSlot logicalSlot = (LogicalSlot)sharedSlot.allocateLogicalSlot(EV1).join();
        Assert.assertThat((Object)released.get(), (Matcher)CoreMatchers.is((Object)0));
        sharedSlot.returnLogicalSlot(logicalSlot);
        Assert.assertThat((Object)released.get(), (Matcher)CoreMatchers.is((Object)1));
        sharedSlot.release(new Throwable());
        Assert.assertThat((Object)released.get(), (Matcher)CoreMatchers.is((Object)1));
    }

    @Test
    public void testReturnLogicalSlotWhileReleasingDoesNotCauseConcurrentModificationException() {
        CompletableFuture<TestingPhysicalSlot> slotContextFuture = CompletableFuture.completedFuture(new TestingPhysicalSlot(RP, new AllocationID()));
        final SharedSlot sharedSlot = SharedSlotBuilder.newBuilder().withSlotContextFuture(slotContextFuture).build();
        LogicalSlot logicalSlot1 = (LogicalSlot)sharedSlot.allocateLogicalSlot(EV1).join();
        final LogicalSlot logicalSlot2 = (LogicalSlot)sharedSlot.allocateLogicalSlot(EV2).join();
        logicalSlot1.tryAssignPayload(new LogicalSlot.Payload(){

            public void fail(Throwable cause) {
                sharedSlot.returnLogicalSlot(logicalSlot2);
            }

            public CompletableFuture<?> getTerminalStateFuture() {
                return CompletableFuture.completedFuture(null);
            }
        });
        sharedSlot.release(new Throwable());
    }

    private static class SharedSlotBuilder {
        private CompletableFuture<PhysicalSlot> slotContextFuture = new CompletableFuture();
        private boolean slotWillBeOccupiedIndefinitely = false;
        private Consumer<ExecutionSlotSharingGroup> externalReleaseCallback = group -> {};

        private SharedSlotBuilder() {
        }

        private SharedSlotBuilder withSlotContextFuture(CompletableFuture<PhysicalSlot> slotContextFuture) {
            this.slotContextFuture = slotContextFuture;
            return this;
        }

        private SharedSlotBuilder slotWillBeOccupiedIndefinitely() {
            this.slotWillBeOccupiedIndefinitely = true;
            return this;
        }

        private SharedSlotBuilder withExternalReleaseCallback(Consumer<ExecutionSlotSharingGroup> releaseCallback) {
            this.externalReleaseCallback = releaseCallback;
            return this;
        }

        private SharedSlot build() {
            return new SharedSlot(PHYSICAL_SLOT_REQUEST_ID, RP, SG, this.slotContextFuture, this.slotWillBeOccupiedIndefinitely, this.externalReleaseCallback);
        }

        private static SharedSlotBuilder newBuilder() {
            return new SharedSlotBuilder();
        }
    }
}

