package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.SystemClock;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.class */
public class SlotPoolRpcTest extends TestLogger {
    private static RpcService rpcService;
    private static final Time timeout = Time.seconds(10);
    private static final Time fastTimeout = Time.milliseconds(1);

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest$TestingSlotPool.class */
    private static final class TestingSlotPool extends SlotPool {
        private volatile Consumer<SlotRequestId> releaseSlotConsumer;
        private volatile Consumer<SlotRequestId> timeoutPendingSlotRequestConsumer;

        public TestingSlotPool(RpcService rpcService, JobID jobID, Clock clock, Time time, Time time2) {
            super(rpcService, jobID, LocationPreferenceSchedulingStrategy.getInstance(), clock, time, time2);
            this.releaseSlotConsumer = null;
            this.timeoutPendingSlotRequestConsumer = null;
        }

        public void setReleaseSlotConsumer(Consumer<SlotRequestId> consumer) {
            this.releaseSlotConsumer = (Consumer) Preconditions.checkNotNull(consumer);
        }

        public void setTimeoutPendingSlotRequestConsumer(Consumer<SlotRequestId> consumer) {
            this.timeoutPendingSlotRequestConsumer = (Consumer) Preconditions.checkNotNull(consumer);
        }

        public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, @Nullable Throwable th) {
            Consumer<SlotRequestId> consumer = this.releaseSlotConsumer;
            CompletableFuture<Acknowledge> releaseSlot = super.releaseSlot(slotRequestId, slotSharingGroupId, th);
            if (consumer != null) {
                consumer.accept(slotRequestId);
            }
            return releaseSlot;
        }

        protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
            Consumer<SlotRequestId> consumer = this.timeoutPendingSlotRequestConsumer;
            if (consumer != null) {
                consumer.accept(slotRequestId);
            }
            super.timeoutPendingSlotRequest(slotRequestId);
        }

        CompletableFuture<Boolean> containsAllocatedSlot(AllocationID allocationID) {
            return callAsync(() -> {
                return Boolean.valueOf(getAllocatedSlots().contains(allocationID));
            }, SlotPoolRpcTest.timeout);
        }

        CompletableFuture<Boolean> containsAvailableSlot(AllocationID allocationID) {
            return callAsync(() -> {
                return Boolean.valueOf(getAvailableSlots().contains(allocationID));
            }, SlotPoolRpcTest.timeout);
        }

        CompletableFuture<Integer> getNumberOfPendingRequests() {
            return callAsync(() -> {
                return Integer.valueOf(getPendingRequests().size());
            }, SlotPoolRpcTest.timeout);
        }

        CompletableFuture<Integer> getNumberOfWaitingForResourceRequests() {
            return callAsync(() -> {
                return Integer.valueOf(getWaitingForResourceManager().size());
            }, SlotPoolRpcTest.timeout);
        }
    }

    @BeforeClass
    public static void setup() {
        rpcService = new AkkaRpcService(AkkaUtils.createLocalActorSystem(new Configuration()), Time.seconds(10L));
    }

    @AfterClass
    public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
        if (rpcService != null) {
            RpcUtils.terminateRpcService(rpcService, timeout);
            rpcService = null;
        }
    }

    @Test
    public void testSlotAllocationNoResourceManager() throws Exception {
        SlotPool slotPool = new SlotPool(rpcService, new JobID(), LocationPreferenceSchedulingStrategy.getInstance(), SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        try {
            slotPool.start(JobMasterId.generate(), "foobar");
            try {
                slotPool.allocateSlot(new SlotRequestId(), new ScheduledUnit(SchedulerTestUtils.getDummyTask()), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, fastTimeout).get();
                Assert.fail("We expected an ExecutionException.");
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(slotPool, timeout);
        }
    }

    @Test
    public void testCancelSlotAllocationWithoutResourceManager() throws Exception {
        TestingSlotPool testingSlotPool = new TestingSlotPool(rpcService, new JobID(), SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        try {
            CompletableFuture completableFuture = new CompletableFuture();
            testingSlotPool.setTimeoutPendingSlotRequestConsumer(slotRequestId -> {
                completableFuture.complete(slotRequestId);
            });
            testingSlotPool.start(JobMasterId.generate(), "foobar");
            try {
                testingSlotPool.getSelfGateway(SlotPoolGateway.class).allocateSlot(new SlotRequestId(), new ScheduledUnit(SchedulerTestUtils.getDummyTask()), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, fastTimeout).get();
                Assert.fail("We expected a TimeoutException.");
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
            }
            completableFuture.get();
            Assert.assertEquals(0L, testingSlotPool.getNumberOfWaitingForResourceRequests().get().intValue());
            RpcUtils.terminateRpcEndpoint(testingSlotPool, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(testingSlotPool, timeout);
            throw th;
        }
    }

    @Test
    public void testSlotAllocationTimeout() throws Exception {
        TestingSlotPool testingSlotPool = new TestingSlotPool(rpcService, new JobID(), SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        try {
            testingSlotPool.start(JobMasterId.generate(), "foobar");
            SlotPoolGateway selfGateway = testingSlotPool.getSelfGateway(SlotPoolGateway.class);
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.getClass();
            testingSlotPool.setTimeoutPendingSlotRequestConsumer((v1) -> {
                r1.complete(v1);
            });
            testingSlotPool.connectToResourceManager(new TestingResourceManagerGateway());
            try {
                selfGateway.allocateSlot(new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, fastTimeout).get();
                Assert.fail("We expected a TimeoutException.");
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
            }
            completableFuture.get();
            Assert.assertEquals(0L, testingSlotPool.getNumberOfPendingRequests().get().intValue());
            RpcUtils.terminateRpcEndpoint(testingSlotPool, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(testingSlotPool, timeout);
            throw th;
        }
    }

    @Test
    public void testExtraSlotsAreKept() throws Exception {
        TestingSlotPool testingSlotPool = new TestingSlotPool(rpcService, new JobID(), SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        try {
            testingSlotPool.start(JobMasterId.generate(), "foobar");
            SlotPoolGateway slotPoolGateway = (SlotPoolGateway) testingSlotPool.getSelfGateway(SlotPoolGateway.class);
            CompletableFuture completableFuture = new CompletableFuture();
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
                completableFuture.complete(slotRequest.getAllocationId());
            });
            CompletableFuture completableFuture2 = new CompletableFuture();
            completableFuture2.getClass();
            testingSlotPool.setTimeoutPendingSlotRequestConsumer((v1) -> {
                r1.complete(v1);
            });
            testingSlotPool.connectToResourceManager(testingResourceManagerGateway);
            try {
                slotPoolGateway.allocateSlot(new SlotRequestId(), new ScheduledUnit(SchedulerTestUtils.getDummyTask()), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, fastTimeout).get();
                Assert.fail("We expected a TimeoutException.");
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
            }
            completableFuture2.get();
            Assert.assertEquals(0L, testingSlotPool.getNumberOfPendingRequests().get().intValue());
            AllocationID allocationID = (AllocationID) completableFuture.get();
            SlotOffer slotOffer = new SlotOffer(allocationID, 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
            SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
            slotPoolGateway.registerTaskManager(localTaskManagerLocation.getResourceID()).get();
            Assert.assertTrue(((Boolean) slotPoolGateway.offerSlot(localTaskManagerLocation, simpleAckingTaskManagerGateway, slotOffer).get()).booleanValue());
            Assert.assertTrue(testingSlotPool.containsAvailableSlot(allocationID).get().booleanValue());
            RpcUtils.terminateRpcEndpoint(testingSlotPool, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(testingSlotPool, timeout);
            throw th;
        }
    }

    @Test
    public void testProviderAndOwnerSlotAllocationTimeout() throws Exception {
        TestingSlotPool testingSlotPool = new TestingSlotPool(rpcService, new JobID(), SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        CompletableFuture completableFuture = new CompletableFuture();
        testingSlotPool.setReleaseSlotConsumer(slotRequestId -> {
            completableFuture.complete(slotRequestId);
        });
        try {
            testingSlotPool.start(JobMasterId.generate(), "foobar");
            testingSlotPool.connectToResourceManager(new TestingResourceManagerGateway());
            try {
                testingSlotPool.getSlotProvider().allocateSlot(new DummyScheduledUnit(), true, SlotProfile.noRequirements(), fastTimeout).get();
                Assert.fail("We expected a TimeoutException.");
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
            }
            completableFuture.get();
            Assert.assertEquals(0L, testingSlotPool.getNumberOfPendingRequests().get().intValue());
            RpcUtils.terminateRpcEndpoint(testingSlotPool, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(testingSlotPool, timeout);
            throw th;
        }
    }
}
