/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.NoOpTaskExecutorBlobService;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
import org.apache.flink.runtime.taskexecutor.JobLeaderService;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesBuilder;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutor;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.testutils.TestFileUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

class TaskExecutorExecutionDeploymentReconciliationTest {
    private static final Duration timeout = Duration.ofSeconds(10L);
    private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    private final SettableLeaderRetrievalService jobManagerLeaderRetriever = new SettableLeaderRetrievalService();
    private final SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
    private final JobID jobId = new JobID();
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    @RegisterExtension
    static final AllCallbackWrapper<TestingRpcServiceExtension> RPC_SERVICE_EXTENSION_WRAPPER = new AllCallbackWrapper((CustomExtension)new TestingRpcServiceExtension());
    @RegisterExtension
    private final TestingFatalErrorHandlerExtension testingFatalErrorHandlerExtension = new TestingFatalErrorHandlerExtension();
    @TempDir
    private Path tempDir;

    TaskExecutorExecutionDeploymentReconciliationTest() {
    }

    @BeforeEach
    void setup() {
        this.haServices.setResourceManagerLeaderRetriever(this.resourceManagerLeaderRetriever);
        this.haServices.setJobMasterLeaderRetriever(this.jobId, this.jobManagerLeaderRetriever);
    }

    @AfterEach
    void shutdown() {
        ((TestingRpcServiceExtension)RPC_SERVICE_EXTENSION_WRAPPER.getCustomExtension()).getTestingRpcService().clearGateways();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testDeployedExecutionReporting() throws Exception {
        OneShotLatch slotOfferLatch = new OneShotLatch();
        ArrayBlockingQueue<Set<ExecutionAttemptID>> deployedExecutionsQueue = new ArrayBlockingQueue<Set<ExecutionAttemptID>>(3);
        CompletableFuture<Void> taskFinishedFuture = new CompletableFuture<Void>();
        ResourceID jobManagerResourceId = ResourceID.generate();
        TestingJobMasterGateway jobMasterGateway = TaskExecutorExecutionDeploymentReconciliationTest.setupJobManagerGateway(slotOfferLatch, deployedExecutionsQueue, taskFinishedFuture, jobManagerResourceId);
        CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<SlotReport>();
        TestingResourceManagerGateway testingResourceManagerGateway = TaskExecutorExecutionDeploymentReconciliationTest.setupResourceManagerGateway(initialSlotReportFuture);
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, false, Reference.owned((Object)new File[]{TempDirUtils.newFolder((Path)this.tempDir)}), Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(TaskSlotUtils.createTaskSlotTable(1, timeout, (ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor())).setShuffleEnvironment((ShuffleEnvironment<?, ?>)new NettyShuffleEnvironmentBuilder().build()).setTaskStateManager(localStateStoresManager).build();
        TestingTaskExecutor taskExecutor = this.createTestingTaskExecutor(taskManagerServices);
        try {
            taskExecutor.start();
            taskExecutor.waitUntilStarted();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            TaskDeploymentDescriptor taskDeploymentDescriptor = TaskExecutorExecutionDeploymentReconciliationTest.createTaskDeploymentDescriptor(this.jobId);
            this.connectComponentsAndRequestSlot(jobMasterGateway, testingResourceManagerGateway, taskExecutorGateway, taskManagerServices.getJobLeaderService(), initialSlotReportFuture, taskDeploymentDescriptor.getAllocationId());
            TestingInvokable.sync = new BlockerSync();
            slotOfferLatch.await();
            AllocatedSlotReport slotAllocationReport = new AllocatedSlotReport(this.jobId, Collections.singleton(new AllocatedSlotInfo(0, taskDeploymentDescriptor.getAllocationId())));
            taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, slotAllocationReport);
            Assertions.assertThat((Collection)((Collection)deployedExecutionsQueue.take())).isEmpty();
            taskExecutorGateway.submitTask(taskDeploymentDescriptor, jobMasterGateway.getFencingToken(), timeout).get();
            TestingInvokable.sync.awaitBlocker();
            taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, slotAllocationReport);
            Assertions.assertThat((Collection)((Collection)deployedExecutionsQueue.take())).contains((Object[])new ExecutionAttemptID[]{taskDeploymentDescriptor.getExecutionAttemptId()});
            TestingInvokable.sync.releaseBlocker();
            taskFinishedFuture.get();
            taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, slotAllocationReport);
            Assertions.assertThat((Collection)((Collection)deployedExecutionsQueue.take())).isEmpty();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices) throws IOException {
        Configuration configuration = new Configuration();
        return new TestingTaskExecutor(((TestingRpcServiceExtension)RPC_SERVICE_EXTENSION_WRAPPER.getCustomExtension()).getTestingRpcService(), TaskManagerConfiguration.fromConfiguration((Configuration)configuration, (TaskExecutorResourceSpec)TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution((Configuration)configuration), (String)InetAddress.getLoopbackAddress().getHostAddress(), (File)TestFileUtils.createTempDir()), this.haServices, taskManagerServices, ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, (HeartbeatServices)new HeartbeatServicesImpl(1000L, 30000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, NoOpTaskExecutorBlobService.INSTANCE, this.testingFatalErrorHandlerExtension.getTestingFatalErrorHandler(), new TestingTaskExecutorPartitionTracker(), new DelegationTokenReceiverRepository(configuration, null));
    }

    private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobId) throws IOException {
        return TaskDeploymentDescriptorBuilder.newBuilder(jobId, TestingInvokable.class).build();
    }

    private static TestingJobMasterGateway setupJobManagerGateway(OneShotLatch slotOfferLatch, BlockingQueue<Set<ExecutionAttemptID>> deployedExecutionsFuture, CompletableFuture<Void> taskFinishedFuture, ResourceID jobManagerResourceId) {
        return new TestingJobMasterGatewayBuilder().setRegisterTaskManagerFunction((ignoredJobId, ignoredTaskManagerRegistrationInformation) -> CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jobManagerResourceId))).setOfferSlotsFunction((resourceID, slotOffers) -> {
            slotOfferLatch.trigger();
            return CompletableFuture.completedFuture(slotOffers);
        }).setTaskManagerHeartbeatFunction((resourceID, taskExecutorToJobManagerHeartbeatPayload) -> {
            ExecutionDeploymentReport executionDeploymentReport = taskExecutorToJobManagerHeartbeatPayload.getExecutionDeploymentReport();
            deployedExecutionsFuture.add(executionDeploymentReport.getExecutions());
            return FutureUtils.completedVoidFuture();
        }).setUpdateTaskExecutionStateFunction(taskExecutionState -> {
            if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) {
                taskFinishedFuture.complete(null);
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
    }

    private static TestingResourceManagerGateway setupResourceManagerGateway(CompletableFuture<SlotReport> initialSlotReportFuture) {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
            initialSlotReportFuture.complete((SlotReport)resourceIDInstanceIDSlotReportTuple3.f2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        testingResourceManagerGateway.setRegisterTaskExecutorFunction(input -> CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), new ClusterInformation("blobServerHost", 55555), null)));
        return testingResourceManagerGateway;
    }

    private void connectComponentsAndRequestSlot(JobMasterGateway jobMasterGateway, ResourceManagerGateway resourceManagerGateway, TaskExecutorGateway taskExecutorGateway, JobLeaderService jobLeaderService, CompletableFuture<SlotReport> initialSlotReportFuture, AllocationID allocationId) throws Exception {
        String jobMasterAddress = "jm";
        ((TestingRpcServiceExtension)RPC_SERVICE_EXTENSION_WRAPPER.getCustomExtension()).getTestingRpcService().registerGateway("jm", (RpcGateway)jobMasterGateway);
        ((TestingRpcServiceExtension)RPC_SERVICE_EXTENSION_WRAPPER.getCustomExtension()).getTestingRpcService().registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        jobLeaderService.addJob(this.jobId, "jm");
        this.jobManagerLeaderRetriever.notifyListener("jm", UUID.randomUUID());
        this.resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), ((ResourceManagerId)resourceManagerGateway.getFencingToken()).toUUID());
        Optional slotStatusOptional = StreamSupport.stream(initialSlotReportFuture.get().spliterator(), false).findAny();
        Assertions.assertThat(slotStatusOptional).isPresent();
        taskExecutorGateway.requestSlot(((SlotStatus)slotStatusOptional.get()).getSlotID(), this.jobId, allocationId, ResourceProfile.ZERO, "jm", (ResourceManagerId)resourceManagerGateway.getFencingToken(), timeout).get();
    }

    public static class TestingInvokable
    extends AbstractInvokable {
        static BlockerSync sync;

        public TestingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            sync.block();
        }
    }
}

