/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.DefaultJobTable;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.JobTable;
import org.apache.flink.runtime.taskexecutor.NoOpPartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.TestingJobServices;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpCheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class DefaultJobTableTest {
    private static final SupplierWithException<JobTable.JobServices, RuntimeException> DEFAULT_JOB_SERVICES_SUPPLIER = () -> TestingJobServices.newBuilder().build();
    private final JobID jobId = new JobID();
    private DefaultJobTable jobTable;

    DefaultJobTableTest() {
    }

    @BeforeEach
    void setup() {
        this.jobTable = DefaultJobTable.create();
    }

    @AfterEach
    void teardown() {
        if (this.jobTable != null) {
            this.jobTable.close();
        }
    }

    @Test
    void getOrCreateJob_NoRegisteredJob_WillCreateNewJob() {
        JobTable.Job newJob = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        Assertions.assertThat((Comparable)newJob.getJobId()).isEqualTo((Object)this.jobId);
        Assertions.assertThat((Optional)this.jobTable.getJob(this.jobId)).isPresent();
    }

    @Test
    void getOrCreateJob_RegisteredJob_WillReturnRegisteredJob() {
        JobTable.Job newJob = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        JobTable.Job otherJob = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        Assertions.assertThat((Object)otherJob).isSameAs((Object)newJob);
    }

    @Test
    void closeJob_WillCloseJobServices() throws InterruptedException {
        OneShotLatch shutdownLibraryCacheManagerLatch = new OneShotLatch();
        TestingJobServices jobServices = TestingJobServices.newBuilder().setCloseRunnable(() -> ((OneShotLatch)shutdownLibraryCacheManagerLatch).trigger()).build();
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, () -> jobServices);
        job.close();
        shutdownLibraryCacheManagerLatch.await();
    }

    @Test
    void closeJob_WillRemoveItFromJobTable() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        job.close();
        Assertions.assertThat((Optional)this.jobTable.getJob(this.jobId)).isNotPresent();
    }

    @Test
    void connectJob_NotConnected_Succeeds() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        ResourceID resourceId = ResourceID.generate();
        JobTable.Connection connection = this.connectJob(job, resourceId);
        Assertions.assertThat((Comparable)connection.getJobId()).isEqualTo((Object)this.jobId);
        Assertions.assertThat((Object)connection.getResourceId()).isEqualTo((Object)resourceId);
        Assertions.assertThat((Optional)this.jobTable.getConnection(this.jobId)).isPresent();
        Assertions.assertThat((Optional)this.jobTable.getConnection(resourceId)).isPresent();
    }

    private JobTable.Connection connectJob(JobTable.Job job, ResourceID resourceId) {
        return job.connect(resourceId, (JobMasterGateway)new TestingJobMasterGatewayBuilder().build(), (TaskManagerActions)new NoOpTaskManagerActions(), (CheckpointResponder)NoOpCheckpointResponder.INSTANCE, (GlobalAggregateManager)new TestGlobalAggregateManager(), (PartitionProducerStateChecker)new NoOpPartitionProducerStateChecker());
    }

    @Test
    void connectJob_Connected_Fails() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        this.connectJob(job, ResourceID.generate());
        Assertions.assertThatThrownBy(() -> this.connectJob(job, ResourceID.generate())).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void disconnectConnection_RemovesConnection() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        ResourceID resourceId = ResourceID.generate();
        JobTable.Connection connection = this.connectJob(job, resourceId);
        connection.disconnect();
        Assertions.assertThat((Optional)this.jobTable.getConnection(this.jobId)).isNotPresent();
        Assertions.assertThat((Optional)this.jobTable.getConnection(resourceId)).isNotPresent();
    }

    @Test
    void access_AfterBeingClosed_WillFail() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        job.close();
        Assertions.assertThatThrownBy(() -> ((JobTable.Job)job).asConnection()).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void connectJob_AfterBeingClosed_WillFail() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        job.close();
        Assertions.assertThatThrownBy(() -> this.connectJob(job, ResourceID.generate())).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void accessJobManagerGateway_AfterBeingDisconnected_WillFail() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        JobTable.Connection connection = this.connectJob(job, ResourceID.generate());
        connection.disconnect();
        Assertions.assertThatThrownBy(() -> ((JobTable.Connection)connection).getJobManagerGateway()).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void close_WillCloseAllRegisteredJobs() throws InterruptedException {
        CountDownLatch shutdownLibraryCacheManagerLatch = new CountDownLatch(2);
        TestingJobServices jobServices1 = TestingJobServices.newBuilder().setCloseRunnable(shutdownLibraryCacheManagerLatch::countDown).build();
        TestingJobServices jobServices2 = TestingJobServices.newBuilder().setCloseRunnable(shutdownLibraryCacheManagerLatch::countDown).build();
        this.jobTable.getOrCreateJob(this.jobId, () -> jobServices1);
        this.jobTable.getOrCreateJob(new JobID(), () -> jobServices2);
        this.jobTable.close();
        shutdownLibraryCacheManagerLatch.await();
        Assertions.assertThat((boolean)this.jobTable.isEmpty()).isTrue();
    }
}

