/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentState;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.testutils.TestingUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class JobMasterExecutionDeploymentReconciliationTest {
    private static final Duration testingTimeout = Duration.ofSeconds(10L);
    private final HeartbeatServices heartbeatServices = new HeartbeatServicesImpl(Integer.MAX_VALUE, Integer.MAX_VALUE);
    private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    private final SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
    public static final TestingRpcServiceExtension TESTING_RPC_SERVICE_EXTENSION = new TestingRpcServiceExtension();
    @RegisterExtension
    private static final AllCallbackWrapper<TestingRpcServiceExtension> RPC_SERVICE_EXTENSION_WRAPPER = new AllCallbackWrapper((CustomExtension)TESTING_RPC_SERVICE_EXTENSION);
    @RegisterExtension
    private final TestingFatalErrorHandlerExtension testingFatalErrorHandlerExtension = new TestingFatalErrorHandlerExtension();

    JobMasterExecutionDeploymentReconciliationTest() {
    }

    @BeforeEach
    private void setup() {
        this.haServices.setResourceManagerLeaderRetriever(this.resourceManagerLeaderRetriever);
        this.haServices.setResourceManagerLeaderElection(new TestingLeaderElection());
        this.haServices.setCheckpointRecoveryFactory((CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory());
    }

    @Test
    void testExecutionDeploymentReconciliation() throws Exception {
        JobMasterBuilder.TestingOnCompletionActions onCompletionActions = new JobMasterBuilder.TestingOnCompletionActions();
        TestingExecutionDeploymentTrackerWrapper deploymentTrackerWrapper = new TestingExecutionDeploymentTrackerWrapper();
        JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        try (JobMaster jobMaster = this.createAndStartJobMaster(onCompletionActions, deploymentTrackerWrapper, jobGraph);){
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            TESTING_RPC_SERVICE_EXTENSION.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
            CompletableFuture<ExecutionAttemptID> taskCancellationFuture = new CompletableFuture<ExecutionAttemptID>();
            TaskExecutorGateway taskExecutorGateway = this.createTaskExecutorGateway(taskCancellationFuture);
            LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            this.registerTaskExecutorAndOfferSlots(jobMasterGateway, jobGraph.getJobID(), taskExecutorGateway, localUnresolvedTaskManagerLocation);
            ExecutionAttemptID deployedExecution = deploymentTrackerWrapper.getTaskDeploymentFuture().get();
            FlinkAssertions.assertThatFuture(taskCancellationFuture).isNotDone();
            ExecutionAttemptID unknownDeployment = ExecutionGraphTestUtils.createExecutionAttemptId();
            jobMasterGateway.heartbeatFromTaskManager(localUnresolvedTaskManagerLocation.getResourceID(), new TaskExecutorToJobManagerHeartbeatPayload(new AccumulatorReport(Collections.emptyList()), new ExecutionDeploymentReport(Collections.singleton(unknownDeployment))));
            FlinkAssertions.assertThatFuture(taskCancellationFuture).eventuallySucceeds().isEqualTo((Object)unknownDeployment);
            FlinkAssertions.assertThatFuture(deploymentTrackerWrapper.getStopFuture()).eventuallySucceeds().isEqualTo((Object)deployedExecution);
            Assertions.assertThat((Comparable)onCompletionActions.getJobReachedGloballyTerminalStateFuture().get().getArchivedExecutionGraph().getState()).isEqualTo((Object)JobStatus.FAILED);
        }
    }

    @Test
    void testExecutionDeploymentReconciliationForPendingExecution() throws Exception {
        TestingExecutionDeploymentTrackerWrapper deploymentTrackerWrapper = new TestingExecutionDeploymentTrackerWrapper();
        JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        try (JobMaster jobMaster = this.createAndStartJobMaster(deploymentTrackerWrapper, jobGraph);){
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            TESTING_RPC_SERVICE_EXTENSION.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
            CompletableFuture<ExecutionAttemptID> taskSubmissionFuture = new CompletableFuture<ExecutionAttemptID>();
            CompletableFuture<ExecutionAttemptID> taskCancellationFuture = new CompletableFuture<ExecutionAttemptID>();
            CompletableFuture<Acknowledge> taskSubmissionAcknowledgeFuture = new CompletableFuture<Acknowledge>();
            TaskExecutorGateway taskExecutorGateway = this.createTaskExecutorGateway(taskCancellationFuture, taskSubmissionFuture, taskSubmissionAcknowledgeFuture);
            LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            this.registerTaskExecutorAndOfferSlots(jobMasterGateway, jobGraph.getJobID(), taskExecutorGateway, localUnresolvedTaskManagerLocation);
            ExecutionAttemptID pendingExecutionId = taskSubmissionFuture.get();
            jobMasterGateway.heartbeatFromTaskManager(localUnresolvedTaskManagerLocation.getResourceID(), new TaskExecutorToJobManagerHeartbeatPayload(new AccumulatorReport(Collections.emptyList()), new ExecutionDeploymentReport(Collections.singleton(pendingExecutionId))));
            taskSubmissionAcknowledgeFuture.complete(Acknowledge.get());
            deploymentTrackerWrapper.getTaskDeploymentFuture().get();
            FlinkAssertions.assertThatFuture(taskCancellationFuture).isNotDone();
        }
    }

    private JobMaster createAndStartJobMaster(ExecutionDeploymentTracker executionDeploymentTracker, JobGraph jobGraph) throws Exception {
        return this.createAndStartJobMaster(new JobMasterBuilder.TestingOnCompletionActions(), executionDeploymentTracker, jobGraph);
    }

    private JobMaster createAndStartJobMaster(OnCompletionActions onCompletionActions, ExecutionDeploymentTracker executionDeploymentTracker, JobGraph jobGraph) throws Exception {
        JobMaster jobMaster = new JobMasterBuilder(jobGraph, TESTING_RPC_SERVICE_EXTENSION.getTestingRpcService()).withFatalErrorHandler(this.testingFatalErrorHandlerExtension.getTestingFatalErrorHandler()).withHighAvailabilityServices(this.haServices).withHeartbeatServices(this.heartbeatServices).withExecutionDeploymentTracker(executionDeploymentTracker).withOnCompletionActions(onCompletionActions).createJobMaster();
        jobMaster.start();
        return jobMaster;
    }

    private TaskExecutorGateway createTaskExecutorGateway(CompletableFuture<ExecutionAttemptID> taskCancellationFuture) {
        return this.createTaskExecutorGateway(taskCancellationFuture, new CompletableFuture<ExecutionAttemptID>(), CompletableFuture.completedFuture(Acknowledge.get()));
    }

    private TaskExecutorGateway createTaskExecutorGateway(CompletableFuture<ExecutionAttemptID> taskCancellationFuture, CompletableFuture<ExecutionAttemptID> taskSubmissionFuture, CompletableFuture<Acknowledge> taskSubmissionResponse) {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).setCancelTaskFunction(executionAttemptId -> {
            taskCancellationFuture.complete((ExecutionAttemptID)executionAttemptId);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setSubmitTaskConsumer((tdd, ignored) -> {
            taskSubmissionFuture.complete(tdd.getExecutionAttemptId());
            return taskSubmissionResponse;
        }).createTestingTaskExecutorGateway();
        TESTING_RPC_SERVICE_EXTENSION.getTestingRpcService().registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        return taskExecutorGateway;
    }

    private void registerTaskExecutorAndOfferSlots(JobMasterGateway jobMasterGateway, JobID jobId, TaskExecutorGateway taskExecutorGateway, UnresolvedTaskManagerLocation taskManagerLocation) throws ExecutionException, InterruptedException {
        jobMasterGateway.registerTaskManager(jobId, TaskManagerRegistrationInformation.create((String)taskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)taskManagerLocation, (UUID)TestingUtils.zeroUUID()), testingTimeout).get();
        Set<SlotOffer> slotOffers = Collections.singleton(new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY));
        jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), slotOffers, testingTimeout).get();
    }

    private static class TestingExecutionDeploymentTrackerWrapper
    implements ExecutionDeploymentTracker {
        private final ExecutionDeploymentTracker originalTracker;
        private final CompletableFuture<ExecutionAttemptID> taskDeploymentFuture;
        private final CompletableFuture<ExecutionAttemptID> stopFuture;

        private TestingExecutionDeploymentTrackerWrapper() {
            this((ExecutionDeploymentTracker)new DefaultExecutionDeploymentTracker());
        }

        private TestingExecutionDeploymentTrackerWrapper(ExecutionDeploymentTracker originalTracker) {
            this.originalTracker = originalTracker;
            this.taskDeploymentFuture = new CompletableFuture();
            this.stopFuture = new CompletableFuture();
        }

        public void startTrackingPendingDeploymentOf(ExecutionAttemptID executionAttemptId, ResourceID host) {
            this.originalTracker.startTrackingPendingDeploymentOf(executionAttemptId, host);
        }

        public void completeDeploymentOf(ExecutionAttemptID executionAttemptId) {
            this.originalTracker.completeDeploymentOf(executionAttemptId);
            this.taskDeploymentFuture.complete(executionAttemptId);
        }

        public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId) {
            this.originalTracker.stopTrackingDeploymentOf(executionAttemptId);
            this.stopFuture.complete(executionAttemptId);
        }

        public Map<ExecutionAttemptID, ExecutionDeploymentState> getExecutionsOn(ResourceID host) {
            return this.originalTracker.getExecutionsOn(host);
        }

        public CompletableFuture<ExecutionAttemptID> getTaskDeploymentFuture() {
            return this.taskDeploymentFuture;
        }

        public CompletableFuture<ExecutionAttemptID> getStopFuture() {
            return this.stopFuture;
        }
    }
}

