/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.resourcemanager.DefaultJobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdActions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class DefaultJobLeaderIdServiceTest
extends TestLogger {
    @Test(timeout=10000L)
    public void testAddingJob() throws Exception {
        JobID jobId = new JobID();
        String address = "foobar";
        JobMasterId leaderId = JobMasterId.generate();
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        Time timeout = Time.milliseconds((long)5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions)Mockito.mock(JobLeaderIdActions.class);
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, scheduledExecutor, timeout);
        jobLeaderIdService.start(jobLeaderIdActions);
        jobLeaderIdService.addJob(jobId);
        CompletableFuture leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
        leaderRetrievalService.notifyListener("foobar", leaderId.toUUID());
        Assert.assertEquals((Object)leaderId, leaderIdFuture.get());
        Assert.assertTrue((boolean)jobLeaderIdService.containsJob(jobId));
    }

    @Test(timeout=10000L)
    public void testRemovingJob() throws Exception {
        JobID jobId = new JobID();
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        Time timeout = Time.milliseconds((long)5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions)Mockito.mock(JobLeaderIdActions.class);
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, scheduledExecutor, timeout);
        jobLeaderIdService.start(jobLeaderIdActions);
        jobLeaderIdService.addJob(jobId);
        CompletableFuture leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
        jobLeaderIdService.removeJob(jobId);
        Assert.assertFalse((boolean)jobLeaderIdService.containsJob(jobId));
        try {
            leaderIdFuture.get();
            Assert.fail((String)"The leader id future should be completed exceptionally.");
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
    }

    @Test
    public void testInitialJobTimeout() throws Exception {
        JobID jobId = new JobID();
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        Time timeout = Time.milliseconds((long)5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions)Mockito.mock(JobLeaderIdActions.class);
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, scheduledExecutor, timeout);
        jobLeaderIdService.start(jobLeaderIdActions);
        jobLeaderIdService.addJob(jobId);
        Assert.assertTrue((boolean)jobLeaderIdService.containsJob(jobId));
        ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
        ((ScheduledExecutor)Mockito.verify((Object)scheduledExecutor)).schedule((Runnable)runnableArgumentCaptor.capture(), Matchers.anyLong(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)));
        Runnable timeoutRunnable = (Runnable)runnableArgumentCaptor.getValue();
        timeoutRunnable.run();
        ArgumentCaptor timeoutIdArgumentCaptor = ArgumentCaptor.forClass(UUID.class);
        ((JobLeaderIdActions)Mockito.verify((Object)jobLeaderIdActions, (VerificationMode)Mockito.times((int)1))).notifyJobTimeout((JobID)Matchers.eq((Object)jobId), (UUID)timeoutIdArgumentCaptor.capture());
        Assert.assertTrue((boolean)jobLeaderIdService.isValidTimeout(jobId, (UUID)timeoutIdArgumentCaptor.getValue()));
    }

    @Test(timeout=10000L)
    public void jobTimeoutAfterLostLeadership() throws Exception {
        JobID jobId = new JobID();
        String address = "foobar";
        JobMasterId leaderId = JobMasterId.generate();
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
        ScheduledFuture timeout1 = (ScheduledFuture)Mockito.mock(ScheduledFuture.class);
        ScheduledFuture timeout2 = (ScheduledFuture)Mockito.mock(ScheduledFuture.class);
        final ArrayDeque<ScheduledFuture> timeoutQueue = new ArrayDeque<ScheduledFuture>(Arrays.asList(timeout1, timeout2));
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        final AtomicReference lastRunnable = new AtomicReference();
        ((ScheduledExecutor)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                lastRunnable.set((Runnable)invocation.getArguments()[0]);
                return timeoutQueue.poll();
            }
        }).when((Object)scheduledExecutor)).schedule((Runnable)Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)));
        Time timeout = Time.milliseconds((long)5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions)Mockito.mock(JobLeaderIdActions.class);
        final AtomicReference lastTimeoutId = new AtomicReference();
        ((JobLeaderIdActions)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                lastTimeoutId.set((UUID)invocation.getArguments()[1]);
                return null;
            }
        }).when((Object)jobLeaderIdActions)).notifyJobTimeout((JobID)Matchers.eq((Object)jobId), (UUID)Matchers.any(UUID.class));
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, scheduledExecutor, timeout);
        jobLeaderIdService.start(jobLeaderIdActions);
        jobLeaderIdService.addJob(jobId);
        CompletableFuture leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
        leaderRetrievalService.notifyListener("foobar", leaderId.toUUID());
        Assert.assertEquals((Object)leaderId, leaderIdFuture.get());
        Assert.assertTrue((boolean)jobLeaderIdService.containsJob(jobId));
        ((ScheduledFuture)Mockito.verify((Object)timeout1, (VerificationMode)Mockito.times((int)1))).cancel(Matchers.anyBoolean());
        ((ScheduledExecutor)Mockito.verify((Object)scheduledExecutor, (VerificationMode)Mockito.times((int)1))).schedule((Runnable)Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)));
        Runnable runnable = (Runnable)lastRunnable.get();
        Assert.assertNotNull((Object)runnable);
        runnable.run();
        ((JobLeaderIdActions)Mockito.verify((Object)jobLeaderIdActions, (VerificationMode)Mockito.times((int)1))).notifyJobTimeout((JobID)Matchers.eq((Object)jobId), (UUID)Matchers.any(UUID.class));
        Assert.assertFalse((boolean)jobLeaderIdService.isValidTimeout(jobId, (UUID)lastTimeoutId.get()));
        leaderRetrievalService.notifyListener("", null);
        ((ScheduledExecutor)Mockito.verify((Object)scheduledExecutor, (VerificationMode)Mockito.times((int)2))).schedule((Runnable)Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)));
        runnable = (Runnable)lastRunnable.get();
        Assert.assertNotNull((Object)runnable);
        runnable.run();
        ((JobLeaderIdActions)Mockito.verify((Object)jobLeaderIdActions, (VerificationMode)Mockito.times((int)2))).notifyJobTimeout((JobID)Matchers.eq((Object)jobId), (UUID)Matchers.any(UUID.class));
        Assert.assertTrue((boolean)jobLeaderIdService.isValidTimeout(jobId, (UUID)lastTimeoutId.get()));
    }

    @Test(timeout=10000L)
    public void testLeaderFutureWaitsForValidLeader() throws Exception {
        JobID jobId = new JobID();
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, (ScheduledExecutor)new ManuallyTriggeredScheduledExecutor(), Time.milliseconds((long)5000L));
        jobLeaderIdService.start((JobLeaderIdActions)new NoOpJobLeaderIdActions());
        jobLeaderIdService.addJob(jobId);
        leaderRetrievalService.notifyListener("foo", UUID.randomUUID());
        leaderRetrievalService.notifyListener(null, null);
        CompletableFuture leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
        Assert.assertThat((Object)leaderIdFuture.isDone(), (Matcher)CoreMatchers.is((Object)false));
        UUID newLeaderId = UUID.randomUUID();
        leaderRetrievalService.notifyListener("foo", newLeaderId);
        Assert.assertThat(leaderIdFuture.get(), (Matcher)CoreMatchers.is((Object)JobMasterId.fromUuidOrNull((UUID)newLeaderId)));
    }

    @Test
    public void testIsStarted() throws Exception {
        JobID jobId = new JobID();
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        Time timeout = Time.milliseconds((long)5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions)Mockito.mock(JobLeaderIdActions.class);
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, scheduledExecutor, timeout);
        Assert.assertFalse((boolean)jobLeaderIdService.isStarted());
        jobLeaderIdService.start(jobLeaderIdActions);
        Assert.assertTrue((boolean)jobLeaderIdService.isStarted());
        jobLeaderIdService.stop();
        Assert.assertFalse((boolean)jobLeaderIdService.isStarted());
    }

    private static class NoOpJobLeaderIdActions
    implements JobLeaderIdActions {
        private NoOpJobLeaderIdActions() {
        }

        public void jobLeaderLostLeadership(JobID jobId, JobMasterId oldJobMasterId) {
        }

        public void notifyJobTimeout(JobID jobId, UUID timeoutId) {
        }

        public void handleError(Throwable error) {
        }
    }
}

