package org.apache.flink.runtime.clusterframework;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingResourceManager;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.Option;

/* loaded from: input_file:org/apache/flink/runtime/clusterframework/ResourceManagerTest.class */
public class ResourceManagerTest extends TestLogger {
    private static ActorSystem system;
    private static ActorGateway fakeJobManager;
    private static ActorGateway resourceManager;
    private static Configuration config = new Configuration();
    private TestingHighAvailabilityServices highAvailabilityServices;
    private TestingLeaderRetrievalService jobManagerLeaderRetrievalService;

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem(config);
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }

    @Before
    public void setupTest() {
        this.jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
        this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, this.jobManagerLeaderRetrievalService);
    }

    @After
    public void teardownTest() throws Exception {
        if (this.jobManagerLeaderRetrievalService != null) {
            this.jobManagerLeaderRetrievalService.stop();
            this.jobManagerLeaderRetrievalService = null;
        }
        if (this.highAvailabilityServices != null) {
            this.highAvailabilityServices.closeAndCleanupAllData();
            this.highAvailabilityServices = null;
        }
    }

    @Test
    public void testJobManagerRegistrationAndReconciliation() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.clusterframework.ResourceManagerTest.1
            {
                new JavaTestKit.Within(duration("10 seconds")) { // from class: org.apache.flink.runtime.clusterframework.ResourceManagerTest.1.1
                    protected void run() {
                        ActorGateway unused = ResourceManagerTest.fakeJobManager = TestingUtils.createForwardingActor(ResourceManagerTest.system, getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, Option.empty());
                        ResourceManagerTest.this.jobManagerLeaderRetrievalService.notifyListener(ResourceManagerTest.fakeJobManager.path(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                        ActorGateway unused2 = ResourceManagerTest.resourceManager = TestingUtils.createResourceManager(ResourceManagerTest.system, ResourceManagerTest.config, ResourceManagerTest.this.highAvailabilityServices);
                        expectMsgClass(RegisterResourceManager.class);
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(ResourceID.generate());
                        arrayList.add(ResourceID.generate());
                        arrayList.add(ResourceID.generate());
                        ResourceManagerTest.resourceManager.tell(new RegisterResourceManagerSuccessful(ResourceManagerTest.fakeJobManager.actor(), arrayList), ResourceManagerTest.fakeJobManager);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        TestingResourceManager.GetRegisteredResourcesReply getRegisteredResourcesReply = (TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
                        Iterator it = arrayList.iterator();
                        while (it.hasNext()) {
                            if (!getRegisteredResourcesReply.resources.contains((ResourceID) it.next())) {
                                Assert.fail("Expected to find all resources that were provided during registration.");
                            }
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testDelayedJobManagerRegistration() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.clusterframework.ResourceManagerTest.2
            {
                new JavaTestKit.Within(duration("10 seconds")) { // from class: org.apache.flink.runtime.clusterframework.ResourceManagerTest.2.1
                    protected void run() {
                        Configuration clone = ResourceManagerTest.config.clone();
                        clone.setString("akka.lookup.timeout", "1 s");
                        ActorGateway unused = ResourceManagerTest.fakeJobManager = TestingUtils.createForwardingActor(ResourceManagerTest.system, getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, Option.empty());
                        ResourceManagerTest.this.jobManagerLeaderRetrievalService.notifyListener(ResourceManagerTest.fakeJobManager.path(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                        ActorGateway unused2 = ResourceManagerTest.resourceManager = TestingUtils.createResourceManager(ResourceManagerTest.system, clone, ResourceManagerTest.this.highAvailabilityServices);
                        getLastSender().tell(new JobManagerMessages.LeaderSessionMessage((UUID) null, new Object()), ResourceManagerTest.fakeJobManager.actor());
                        expectMsgClass(RegisterResourceManager.class);
                        expectMsgClass(RegisterResourceManager.class);
                    }
                };
            }
        };
    }

    @Test
    public void testTriggerReconnect() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.clusterframework.ResourceManagerTest.3
            {
                new JavaTestKit.Within(duration("10 seconds")) { // from class: org.apache.flink.runtime.clusterframework.ResourceManagerTest.3.1
                    protected void run() {
                        Configuration clone = ResourceManagerTest.config.clone();
                        clone.setString("akka.lookup.timeout", "99999 s");
                        ActorGateway unused = ResourceManagerTest.fakeJobManager = TestingUtils.createForwardingActor(ResourceManagerTest.system, getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, Option.empty());
                        ResourceManagerTest.this.jobManagerLeaderRetrievalService.notifyListener(ResourceManagerTest.fakeJobManager.path(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                        ActorGateway unused2 = ResourceManagerTest.resourceManager = TestingUtils.createResourceManager(ResourceManagerTest.system, clone, ResourceManagerTest.this.highAvailabilityServices);
                        ResourceManagerTest.resourceManager.tell(new RegisterResourceManagerSuccessful(ResourceManagerTest.fakeJobManager.actor(), Collections.emptyList()), ResourceManagerTest.fakeJobManager);
                        ResourceManagerTest.resourceManager.tell(new TriggerRegistrationAtJobManager(ResourceManagerTest.fakeJobManager.actor()), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(RegisterResourceManager.class);
                    }
                };
            }
        };
    }

    @Test
    public void testTaskManagerRegistration() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.clusterframework.ResourceManagerTest.4
            {
                new JavaTestKit.Within(duration("10 seconds")) { // from class: org.apache.flink.runtime.clusterframework.ResourceManagerTest.4.1
                    protected void run() {
                        ActorGateway unused = ResourceManagerTest.fakeJobManager = TestingUtils.createForwardingActor(ResourceManagerTest.system, getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, Option.empty());
                        ResourceManagerTest.this.jobManagerLeaderRetrievalService.notifyListener(ResourceManagerTest.fakeJobManager.path(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                        ActorGateway unused2 = ResourceManagerTest.resourceManager = TestingUtils.createResourceManager(ResourceManagerTest.system, ResourceManagerTest.config, ResourceManagerTest.this.highAvailabilityServices);
                        expectMsgClass(RegisterResourceManager.class);
                        ResourceManagerTest.resourceManager.tell(new RegisterResourceManagerSuccessful(ResourceManagerTest.fakeJobManager.actor(), Collections.emptyList()), ResourceManagerTest.fakeJobManager);
                        ResourceID generate = ResourceID.generate();
                        ResourceManagerTest.resourceManager.tell(new NotifyResourceStarted(generate), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(Acknowledge.class);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        Assert.assertEquals(1L, ((TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class)).resources.size());
                        ResourceManagerTest.resourceManager.tell(new NotifyResourceStarted(generate), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(Acknowledge.class);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        Assert.assertEquals(1L, ((TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class)).resources.size());
                        ResourceManagerTest.resourceManager.tell(new NotifyResourceStarted((ResourceID) null), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(Acknowledge.class);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        Assert.assertEquals(1L, ((TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class)).resources.size());
                    }
                };
            }
        };
    }

    @Test
    public void testResourceRemoval() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.clusterframework.ResourceManagerTest.5
            {
                new JavaTestKit.Within(duration("10 seconds")) { // from class: org.apache.flink.runtime.clusterframework.ResourceManagerTest.5.1
                    protected void run() {
                        ActorGateway unused = ResourceManagerTest.fakeJobManager = TestingUtils.createForwardingActor(ResourceManagerTest.system, getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, Option.empty());
                        ResourceManagerTest.this.jobManagerLeaderRetrievalService.notifyListener(ResourceManagerTest.fakeJobManager.path(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                        ActorGateway unused2 = ResourceManagerTest.resourceManager = TestingUtils.createResourceManager(ResourceManagerTest.system, ResourceManagerTest.config, ResourceManagerTest.this.highAvailabilityServices);
                        expectMsgClass(RegisterResourceManager.class);
                        ResourceManagerTest.resourceManager.tell(new RegisterResourceManagerSuccessful(ResourceManagerTest.fakeJobManager.actor(), Collections.emptyList()), ResourceManagerTest.fakeJobManager);
                        ResourceID generate = ResourceID.generate();
                        ResourceManagerTest.resourceManager.tell(new RemoveResource(generate), ResourceManagerTest.fakeJobManager);
                        ResourceManagerTest.resourceManager.tell(new NotifyResourceStarted(generate), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(Acknowledge.class);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        TestingResourceManager.GetRegisteredResourcesReply getRegisteredResourcesReply = (TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
                        Assert.assertEquals(1L, getRegisteredResourcesReply.resources.size());
                        Assert.assertTrue(getRegisteredResourcesReply.resources.contains(generate));
                        ResourceManagerTest.resourceManager.tell(new RemoveResource(generate), ResourceManagerTest.fakeJobManager);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        Assert.assertEquals(0L, ((TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class)).resources.size());
                    }
                };
            }
        };
    }

    @Test
    public void testResourceFailureNotification() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.clusterframework.ResourceManagerTest.6
            {
                new JavaTestKit.Within(duration("10 seconds")) { // from class: org.apache.flink.runtime.clusterframework.ResourceManagerTest.6.1
                    protected void run() {
                        ActorGateway unused = ResourceManagerTest.fakeJobManager = TestingUtils.createForwardingActor(ResourceManagerTest.system, getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, Option.empty());
                        ResourceManagerTest.this.jobManagerLeaderRetrievalService.notifyListener(ResourceManagerTest.fakeJobManager.path(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                        ActorGateway unused2 = ResourceManagerTest.resourceManager = TestingUtils.createResourceManager(ResourceManagerTest.system, ResourceManagerTest.config, ResourceManagerTest.this.highAvailabilityServices);
                        expectMsgClass(RegisterResourceManager.class);
                        ResourceManagerTest.resourceManager.tell(new RegisterResourceManagerSuccessful(ResourceManagerTest.fakeJobManager.actor(), Collections.emptyList()), ResourceManagerTest.fakeJobManager);
                        ResourceID generate = ResourceID.generate();
                        ResourceID generate2 = ResourceID.generate();
                        ResourceManagerTest.resourceManager.tell(new NotifyResourceStarted(generate), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(Acknowledge.class);
                        ResourceManagerTest.resourceManager.tell(new NotifyResourceStarted(generate2), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(Acknowledge.class);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        TestingResourceManager.GetRegisteredResourcesReply getRegisteredResourcesReply = (TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
                        Assert.assertEquals(2L, getRegisteredResourcesReply.resources.size());
                        Assert.assertTrue(getRegisteredResourcesReply.resources.contains(generate));
                        Assert.assertTrue(getRegisteredResourcesReply.resources.contains(generate2));
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.FailResource(generate), ResourceManagerTest.fakeJobManager);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.FailResource(generate2), ResourceManagerTest.fakeJobManager);
                        ResourceRemoved resourceRemoved = (ResourceRemoved) expectMsgClass(ResourceRemoved.class);
                        ResourceRemoved resourceRemoved2 = (ResourceRemoved) expectMsgClass(ResourceRemoved.class);
                        Assert.assertEquals(generate, resourceRemoved.resourceId());
                        Assert.assertEquals(generate2, resourceRemoved2.resourceId());
                    }
                };
            }
        };
    }

    @Test
    public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
        ResourceID resourceID = new ResourceID("tm");
        ResourceID generate = ResourceID.generate();
        RpcGateway rpcGateway = (TaskExecutorGateway) Mockito.mock(TaskExecutorGateway.class);
        TestingSerialRpcService testingSerialRpcService = new TestingSerialRpcService();
        testingSerialRpcService.registerGateway("tm", rpcGateway);
        ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
        TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        testingHighAvailabilityServices.setResourceManagerLeaderElectionService(testingLeaderElectionService);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class);
        try {
            StandaloneResourceManager standaloneResourceManager = new StandaloneResourceManager(testingSerialRpcService, "resourcemanager", generate, resourceManagerConfiguration, testingHighAvailabilityServices, new TestingHeartbeatServices(1L, 5L, scheduledExecutor), new SlotManager(testingSerialRpcService.getScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime()), (MetricRegistry) Mockito.mock(MetricRegistry.class), (JobLeaderIdService) Mockito.mock(JobLeaderIdService.class), new TestingFatalErrorHandler());
            standaloneResourceManager.start();
            UUID randomUUID = UUID.randomUUID();
            testingLeaderElectionService.isLeader(randomUUID);
            Assert.assertTrue(((RegistrationResponse) standaloneResourceManager.registerTaskExecutor(randomUUID, "tm", resourceID, new SlotReport()).get(5L, TimeUnit.SECONDS)) instanceof TaskExecutorRegistrationSuccess);
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Runnable.class);
            ((ScheduledExecutor) Mockito.verify(scheduledExecutor, Mockito.times(2))).scheduleAtFixedRate((Runnable) forClass.capture(), Matchers.eq(0L), Matchers.eq(1L), (TimeUnit) Matchers.eq(TimeUnit.MILLISECONDS));
            List allValues = forClass.getAllValues();
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Runnable.class);
            ((ScheduledExecutor) Mockito.verify(scheduledExecutor)).schedule((Runnable) forClass2.capture(), Matchers.eq(5L), (TimeUnit) Matchers.eq(TimeUnit.MILLISECONDS));
            Runnable runnable = (Runnable) forClass2.getValue();
            Iterator it = allValues.iterator();
            while (it.hasNext()) {
                ((Runnable) it.next()).run();
            }
            ((TaskExecutorGateway) Mockito.verify(rpcGateway, Mockito.times(1))).heartbeatFromResourceManager((ResourceID) Matchers.eq(generate));
            runnable.run();
            ((TaskExecutorGateway) Mockito.verify(rpcGateway)).disconnectResourceManager((Exception) Matchers.any(TimeoutException.class));
            testingSerialRpcService.stopService();
        } catch (Throwable th) {
            testingSerialRpcService.stopService();
            throw th;
        }
    }

    @Test
    public void testHeartbeatTimeoutWithJobManager() throws Exception {
        ResourceID resourceID = new ResourceID("jm");
        ResourceID generate = ResourceID.generate();
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        JobID jobID = new JobID();
        RpcGateway rpcGateway = (JobMasterGateway) Mockito.mock(JobMasterGateway.class);
        TestingSerialRpcService testingSerialRpcService = new TestingSerialRpcService();
        testingSerialRpcService.registerGateway("jm", rpcGateway);
        ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
        TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService("jm", randomUUID2);
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        testingHighAvailabilityServices.setResourceManagerLeaderElectionService(testingLeaderElectionService);
        testingHighAvailabilityServices.setJobMasterLeaderRetriever(jobID, testingLeaderRetrievalService);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class);
        try {
            StandaloneResourceManager standaloneResourceManager = new StandaloneResourceManager(testingSerialRpcService, "resourcemanager", generate, resourceManagerConfiguration, testingHighAvailabilityServices, new TestingHeartbeatServices(1L, 5L, scheduledExecutor), new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime()), (MetricRegistry) Mockito.mock(MetricRegistry.class), new JobLeaderIdService(testingHighAvailabilityServices, testingSerialRpcService.getScheduledExecutor(), Time.minutes(5L)), new TestingFatalErrorHandler());
            standaloneResourceManager.start();
            testingLeaderElectionService.isLeader(randomUUID);
            Assert.assertTrue(((RegistrationResponse) standaloneResourceManager.registerJobManager(randomUUID, randomUUID2, resourceID, "jm", jobID).get(5L, TimeUnit.SECONDS)) instanceof JobMasterRegistrationSuccess);
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Runnable.class);
            ((ScheduledExecutor) Mockito.verify(scheduledExecutor, Mockito.times(2))).scheduleAtFixedRate((Runnable) forClass.capture(), Matchers.eq(0L), Matchers.eq(1L), (TimeUnit) Matchers.eq(TimeUnit.MILLISECONDS));
            List allValues = forClass.getAllValues();
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Runnable.class);
            ((ScheduledExecutor) Mockito.verify(scheduledExecutor)).schedule((Runnable) forClass2.capture(), Matchers.eq(5L), (TimeUnit) Matchers.eq(TimeUnit.MILLISECONDS));
            Runnable runnable = (Runnable) forClass2.getValue();
            Iterator it = allValues.iterator();
            while (it.hasNext()) {
                ((Runnable) it.next()).run();
            }
            ((JobMasterGateway) Mockito.verify(rpcGateway, Mockito.times(1))).heartbeatFromResourceManager((ResourceID) Matchers.eq(generate));
            runnable.run();
            ((JobMasterGateway) Mockito.verify(rpcGateway)).disconnectResourceManager((UUID) Matchers.eq(randomUUID2), (UUID) Matchers.eq(randomUUID), (Exception) Matchers.any(TimeoutException.class));
            testingSerialRpcService.stopService();
        } catch (Throwable th) {
            testingSerialRpcService.stopService();
            throw th;
        }
    }
}
