/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.AbstractPartitionTrackerTest;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestingUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class JobMasterPartitionReleaseTest {
    @TempDir
    private static File temporaryFolder;
    private static final Duration testingTimeout;
    private static TestingRpcService rpcService;
    private TestingFatalErrorHandler testingFatalErrorHandler;

    JobMasterPartitionReleaseTest() {
    }

    @BeforeAll
    private static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @BeforeEach
    private void setup() throws IOException {
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
    }

    @AfterEach
    private void teardown() throws Exception {
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
        }
        rpcService.clearGateways();
    }

    @AfterAll
    private static void teardownClass() {
        if (rpcService != null) {
            rpcService.closeAsync();
            rpcService = null;
        }
    }

    @Test
    void testPartitionTableCleanupOnDisconnect() throws Exception {
        CompletableFuture disconnectTaskExecutorFuture = new CompletableFuture();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setDisconnectJobManagerConsumer((jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID)).createTestingTaskExecutorGateway();
        try (TestSetup testSetup = new TestSetup(rpcService, this.testingFatalErrorHandler, testingTaskExecutorGateway);){
            JobMasterGateway jobMasterGateway = (JobMasterGateway)testSetup.jobMaster.getSelfGateway(JobMasterGateway.class);
            jobMasterGateway.disconnectTaskManager(testSetup.getTaskExecutorResourceID(), new Exception("test"));
            disconnectTaskExecutorFuture.get();
            FlinkAssertions.assertThatFuture(testSetup.getStopTrackingPartitionsTargetResourceId()).eventuallySucceeds().isEqualTo((Object)testSetup.getTaskExecutorResourceID());
        }
    }

    @Test
    void testPartitionReleaseOrPromotionOnJobSuccess() throws Exception {
        this.testPartitionReleaseOrPromotionOnJobTermination(TestSetup::getPartitionsForReleaseOrPromote, ExecutionState.FINISHED);
    }

    @Test
    void testPartitionReleaseOrPromotionOnJobFailure() throws Exception {
        this.testPartitionReleaseOrPromotionOnJobTermination(TestSetup::getPartitionsForRelease, ExecutionState.FAILED);
    }

    private void testPartitionReleaseOrPromotionOnJobTermination(Function<TestSetup, CompletableFuture<Collection<ResultPartitionID>>> callSelector, ExecutionState finalExecutionState) throws Exception {
        CompletableFuture taskDeploymentDescriptorFuture = new CompletableFuture();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((tdd, ignored) -> {
            taskDeploymentDescriptorFuture.complete(tdd);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        try (TestSetup testSetup = new TestSetup(rpcService, this.testingFatalErrorHandler, testingTaskExecutorGateway);){
            ResultPartitionID partitionID0 = new ResultPartitionID();
            ResultPartitionID partitionID1 = new ResultPartitionID();
            testSetup.getPartitionTracker().setGetAllTrackedPartitionsSupplier(() -> {
                ResultPartitionDeploymentDescriptor partitionDesc0 = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(partitionID0, true);
                ResultPartitionDeploymentDescriptor partitionDesc1 = AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(partitionID1, false);
                return Arrays.asList(partitionDesc0, partitionDesc1);
            });
            JobMasterGateway jobMasterGateway = testSetup.getJobMasterGateway();
            TaskDeploymentDescriptor taskDeploymentDescriptor = (TaskDeploymentDescriptor)taskDeploymentDescriptorFuture.get();
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(taskDeploymentDescriptor.getExecutionAttemptId(), finalExecutionState));
            Assertions.assertThat(callSelector.apply(testSetup).get()).containsExactlyInAnyOrder((Object[])new ResultPartitionID[]{partitionID0, partitionID1});
        }
    }

    static {
        testingTimeout = Duration.ofSeconds(10L);
    }

    private static class TestSetup
    implements AutoCloseable {
        private final LocalUnresolvedTaskManagerLocation localTaskManagerUnresolvedLocation = new LocalUnresolvedTaskManagerLocation();
        private final CompletableFuture<ResourceID> taskExecutorIdForStopTracking = new CompletableFuture();
        private final CompletableFuture<Collection<ResultPartitionID>> partitionsForRelease = new CompletableFuture();
        private final CompletableFuture<Collection<ResultPartitionID>> clusterPartitionsForPromote = new CompletableFuture();
        private final JobMaster jobMaster;
        private final TestingJobMasterPartitionTracker partitionTracker;

        public TestSetup(TestingRpcService rpcService, FatalErrorHandler fatalErrorHandler, TaskExecutorGateway taskExecutorGateway) throws Exception {
            TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
            haServices.setCheckpointRecoveryFactory((CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory());
            haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService(null, null));
            this.partitionTracker = new TestingJobMasterPartitionTracker();
            this.partitionTracker.setStopTrackingAllPartitionsConsumer(this.taskExecutorIdForStopTracking::complete);
            this.partitionTracker.setStopTrackingAndReleasePartitionsConsumer(this.partitionsForRelease::complete);
            this.partitionTracker.setStopTrackingAndPromotePartitionsConsumer(this.clusterPartitionsForPromote::complete);
            Configuration configuration = new Configuration();
            configuration.set(BlobServerOptions.STORAGE_DIRECTORY, (Object)temporaryFolder.getAbsolutePath());
            HeartbeatServicesImpl heartbeatServices = new HeartbeatServicesImpl(1000L, 5000000L);
            JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
            this.jobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(configuration).withHighAvailabilityServices(haServices).withFatalErrorHandler(fatalErrorHandler).withHeartbeatServices((HeartbeatServices)heartbeatServices).withPartitionTrackerFactory(ignored -> this.partitionTracker).createJobMaster();
            this.jobMaster.start();
            this.registerTaskExecutorAtJobMaster(rpcService, this.getJobMasterGateway(), jobGraph.getJobID(), taskExecutorGateway);
        }

        private void registerTaskExecutorAtJobMaster(TestingRpcService rpcService, JobMasterGateway jobMasterGateway, JobID jobId, TaskExecutorGateway taskExecutorGateway) throws ExecutionException, InterruptedException {
            rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
            jobMasterGateway.registerTaskManager(jobId, TaskManagerRegistrationInformation.create((String)taskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)this.localTaskManagerUnresolvedLocation, (UUID)TestingUtils.zeroUUID()), testingTimeout).get();
            Set<SlotOffer> slotOffers = Collections.singleton(new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY));
            jobMasterGateway.offerSlots(this.localTaskManagerUnresolvedLocation.getResourceID(), slotOffers, testingTimeout).get();
        }

        public TestingJobMasterPartitionTracker getPartitionTracker() {
            return this.partitionTracker;
        }

        public JobMasterGateway getJobMasterGateway() {
            return (JobMasterGateway)this.jobMaster.getSelfGateway(JobMasterGateway.class);
        }

        public ResourceID getTaskExecutorResourceID() {
            return this.localTaskManagerUnresolvedLocation.getResourceID();
        }

        public CompletableFuture<ResourceID> getStopTrackingPartitionsTargetResourceId() {
            return this.taskExecutorIdForStopTracking;
        }

        public CompletableFuture<Collection<ResultPartitionID>> getPartitionsForRelease() {
            return this.partitionsForRelease;
        }

        public CompletableFuture<Collection<ResultPartitionID>> getPartitionsForReleaseOrPromote() {
            return this.partitionsForRelease.thenCombine(this.clusterPartitionsForPromote, (resultPartitionIds, resultPartitionIds2) -> {
                HashSet res = new HashSet();
                res.addAll(resultPartitionIds);
                res.addAll(resultPartitionIds2);
                return res;
            });
        }

        @Override
        public void close() throws Exception {
            if (this.jobMaster != null) {
                RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{this.jobMaster});
            }
        }
    }
}

