package org.apache.flink.runtime.resourcemanager;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.class */
public class JobLeaderIdServiceTest extends TestLogger {
    @Test(timeout = 10000)
    public void testAddingJob() throws Exception {
        JobID jobID = new JobID();
        JobMasterId generate = JobMasterId.generate();
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService((String) null, (UUID) null);
        testingHighAvailabilityServices.setJobMasterLeaderRetriever(jobID, settableLeaderRetrievalService);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class);
        Time milliseconds = Time.milliseconds(5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions) Mockito.mock(JobLeaderIdActions.class);
        JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHighAvailabilityServices, scheduledExecutor, milliseconds);
        jobLeaderIdService.start(jobLeaderIdActions);
        jobLeaderIdService.addJob(jobID);
        CompletableFuture leaderId = jobLeaderIdService.getLeaderId(jobID);
        settableLeaderRetrievalService.notifyListener("foobar", generate.toUUID());
        Assert.assertEquals(generate, leaderId.get());
        Assert.assertTrue(jobLeaderIdService.containsJob(jobID));
    }

    @Test(timeout = 10000)
    public void testRemovingJob() throws Exception {
        JobID jobID = new JobID();
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        testingHighAvailabilityServices.setJobMasterLeaderRetriever(jobID, new SettableLeaderRetrievalService((String) null, (UUID) null));
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class);
        Time milliseconds = Time.milliseconds(5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions) Mockito.mock(JobLeaderIdActions.class);
        JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHighAvailabilityServices, scheduledExecutor, milliseconds);
        jobLeaderIdService.start(jobLeaderIdActions);
        jobLeaderIdService.addJob(jobID);
        CompletableFuture leaderId = jobLeaderIdService.getLeaderId(jobID);
        jobLeaderIdService.removeJob(jobID);
        Assert.assertFalse(jobLeaderIdService.containsJob(jobID));
        try {
            leaderId.get();
            Assert.fail("The leader id future should be completed exceptionally.");
        } catch (ExecutionException e) {
        }
    }

    @Test
    public void testInitialJobTimeout() throws Exception {
        JobID jobID = new JobID();
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        testingHighAvailabilityServices.setJobMasterLeaderRetriever(jobID, new SettableLeaderRetrievalService((String) null, (UUID) null));
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class);
        Time milliseconds = Time.milliseconds(5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions) Mockito.mock(JobLeaderIdActions.class);
        JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHighAvailabilityServices, scheduledExecutor, milliseconds);
        jobLeaderIdService.start(jobLeaderIdActions);
        jobLeaderIdService.addJob(jobID);
        Assert.assertTrue(jobLeaderIdService.containsJob(jobID));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Runnable.class);
        ((ScheduledExecutor) Mockito.verify(scheduledExecutor)).schedule((Runnable) forClass.capture(), Matchers.anyLong(), (TimeUnit) Matchers.any(TimeUnit.class));
        ((Runnable) forClass.getValue()).run();
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(UUID.class);
        ((JobLeaderIdActions) Mockito.verify(jobLeaderIdActions, Mockito.times(1))).notifyJobTimeout((JobID) Matchers.eq(jobID), (UUID) forClass2.capture());
        Assert.assertTrue(jobLeaderIdService.isValidTimeout(jobID, (UUID) forClass2.getValue()));
    }

    @Test(timeout = 10000)
    public void jobTimeoutAfterLostLeadership() throws Exception {
        JobID jobID = new JobID();
        JobMasterId generate = JobMasterId.generate();
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService((String) null, (UUID) null);
        testingHighAvailabilityServices.setJobMasterLeaderRetriever(jobID, settableLeaderRetrievalService);
        ScheduledFuture scheduledFuture = (ScheduledFuture) Mockito.mock(ScheduledFuture.class);
        final ArrayDeque arrayDeque = new ArrayDeque(Arrays.asList(scheduledFuture, (ScheduledFuture) Mockito.mock(ScheduledFuture.class)));
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class);
        final AtomicReference atomicReference = new AtomicReference();
        ((ScheduledExecutor) Mockito.doAnswer(new Answer() { // from class: org.apache.flink.runtime.resourcemanager.JobLeaderIdServiceTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                atomicReference.set((Runnable) invocationOnMock.getArguments()[0]);
                return arrayDeque.poll();
            }
        }).when(scheduledExecutor)).schedule((Runnable) Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit) Matchers.any(TimeUnit.class));
        Time milliseconds = Time.milliseconds(5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions) Mockito.mock(JobLeaderIdActions.class);
        final AtomicReference atomicReference2 = new AtomicReference();
        ((JobLeaderIdActions) Mockito.doAnswer(new Answer() { // from class: org.apache.flink.runtime.resourcemanager.JobLeaderIdServiceTest.2
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                atomicReference2.set((UUID) invocationOnMock.getArguments()[1]);
                return null;
            }
        }).when(jobLeaderIdActions)).notifyJobTimeout((JobID) Matchers.eq(jobID), (UUID) Matchers.any(UUID.class));
        JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHighAvailabilityServices, scheduledExecutor, milliseconds);
        jobLeaderIdService.start(jobLeaderIdActions);
        jobLeaderIdService.addJob(jobID);
        CompletableFuture leaderId = jobLeaderIdService.getLeaderId(jobID);
        settableLeaderRetrievalService.notifyListener("foobar", generate.toUUID());
        Assert.assertEquals(generate, leaderId.get());
        Assert.assertTrue(jobLeaderIdService.containsJob(jobID));
        ((ScheduledFuture) Mockito.verify(scheduledFuture, Mockito.times(1))).cancel(Matchers.anyBoolean());
        ((ScheduledExecutor) Mockito.verify(scheduledExecutor, Mockito.times(1))).schedule((Runnable) Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit) Matchers.any(TimeUnit.class));
        Runnable runnable = (Runnable) atomicReference.get();
        Assert.assertNotNull(runnable);
        runnable.run();
        ((JobLeaderIdActions) Mockito.verify(jobLeaderIdActions, Mockito.times(1))).notifyJobTimeout((JobID) Matchers.eq(jobID), (UUID) Matchers.any(UUID.class));
        Assert.assertFalse(jobLeaderIdService.isValidTimeout(jobID, (UUID) atomicReference2.get()));
        settableLeaderRetrievalService.notifyListener("", (UUID) null);
        ((ScheduledExecutor) Mockito.verify(scheduledExecutor, Mockito.times(2))).schedule((Runnable) Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit) Matchers.any(TimeUnit.class));
        Runnable runnable2 = (Runnable) atomicReference.get();
        Assert.assertNotNull(runnable2);
        runnable2.run();
        ((JobLeaderIdActions) Mockito.verify(jobLeaderIdActions, Mockito.times(2))).notifyJobTimeout((JobID) Matchers.eq(jobID), (UUID) Matchers.any(UUID.class));
        Assert.assertTrue(jobLeaderIdService.isValidTimeout(jobID, (UUID) atomicReference2.get()));
    }
}
