package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.class */
public class SchedulerSlotSharingTest {
    @Test
    public void scheduleSingleVertexType() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 8), slotSharingGroup), false).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 8), slotSharingGroup), false).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 8), slotSharingGroup), false).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 8), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot);
            Assert.assertNotNull(simpleSlot2);
            Assert.assertNotNull(simpleSlot3);
            Assert.assertNotNull(simpleSlot4);
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(simpleSlot, simpleSlot2, simpleSlot3, simpleSlot4));
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 4, 8), slotSharingGroup), false).get();
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (NoResourceAvailableException e) {
            } catch (Exception e2) {
                Assert.fail("Wrong exception.");
            }
            simpleSlot3.releaseSlot();
            SimpleSlot simpleSlot5 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 4, 8), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot5);
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            simpleSlot4.releaseSlot();
            SimpleSlot simpleSlot6 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 5, 8), slotSharingGroup), false).get();
            SimpleSlot simpleSlot7 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 6, 8), slotSharingGroup), false).get();
            SimpleSlot simpleSlot8 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 7, 8), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot6);
            Assert.assertNotNull(simpleSlot7);
            Assert.assertNotNull(simpleSlot8);
            Assert.assertEquals(0L, 0 + (simpleSlot5.getTaskManagerID().equals(randomInstance.getTaskManagerID()) ? 1 : -1) + (simpleSlot6.getTaskManagerID().equals(randomInstance.getTaskManagerID()) ? 1 : -1) + (simpleSlot7.getTaskManagerID().equals(randomInstance.getTaskManagerID()) ? 1 : -1) + (simpleSlot8.getTaskManagerID().equals(randomInstance.getTaskManagerID()) ? 1 : -1));
            simpleSlot5.releaseSlot();
            simpleSlot6.releaseSlot();
            simpleSlot7.releaseSlot();
            simpleSlot8.releaseSlot();
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(8L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void allocateSlotWithSharing() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 5), slotSharingGroup), false).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 5), slotSharingGroup), false).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 5), slotSharingGroup), false).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 5), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot);
            Assert.assertNotNull(simpleSlot2);
            Assert.assertNotNull(simpleSlot3);
            Assert.assertNotNull(simpleSlot4);
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(simpleSlot, simpleSlot2, simpleSlot3, simpleSlot4));
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 4, 5), slotSharingGroup), false).get();
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (NoResourceAvailableException e) {
            } catch (Exception e2) {
                Assert.fail("Wrong exception.");
            }
            SimpleSlot simpleSlot5 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 5), slotSharingGroup), false).get();
            SimpleSlot simpleSlot6 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 5), slotSharingGroup), false).get();
            SimpleSlot simpleSlot7 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 5), slotSharingGroup), false).get();
            SimpleSlot simpleSlot8 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 3, 5), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot5);
            Assert.assertNotNull(simpleSlot6);
            Assert.assertNotNull(simpleSlot7);
            Assert.assertNotNull(simpleSlot8);
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 4, 5), slotSharingGroup), false).get();
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (NoResourceAvailableException e3) {
            } catch (Exception e4) {
                Assert.fail("Wrong exception.");
            }
            simpleSlot.releaseSlot();
            simpleSlot4.releaseSlot();
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(2L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 4, 5), slotSharingGroup), false).get();
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (Exception e5) {
                Assert.fail("Wrong exception.");
            } catch (NoResourceAvailableException e6) {
            }
            SimpleSlot simpleSlot9 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 4, 5), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot9);
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(1L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            simpleSlot6.releaseSlot();
            SimpleSlot simpleSlot10 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 4, 5), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot10);
            simpleSlot2.releaseSlot();
            simpleSlot3.releaseSlot();
            simpleSlot9.releaseSlot();
            simpleSlot5.releaseSlot();
            simpleSlot7.releaseSlot();
            simpleSlot8.releaseSlot();
            simpleSlot10.releaseSlot();
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(10L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e7) {
            e7.printStackTrace();
            Assert.fail(e7.getMessage());
        }
    }

    @Test
    public void allocateSlotWithIntermediateTotallyEmptySharingGroup() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 4), slotSharingGroup), false).get();
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            simpleSlot3.releaseSlot();
            simpleSlot4.releaseSlot();
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            SimpleSlot simpleSlot5 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot6 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot7 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot8 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 3, 4), slotSharingGroup), false).get();
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            simpleSlot5.releaseSlot();
            simpleSlot6.releaseSlot();
            simpleSlot7.releaseSlot();
            simpleSlot8.releaseSlot();
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(8L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void allocateSlotWithTemprarilyEmptyVertexGroup() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2, jobVertexID3});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 4), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot);
            Assert.assertNotNull(simpleSlot2);
            Assert.assertNotNull(simpleSlot3);
            Assert.assertNotNull(simpleSlot4);
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(simpleSlot, simpleSlot2, simpleSlot3, simpleSlot4));
            SimpleSlot simpleSlot5 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 7), slotSharingGroup), false).get();
            SimpleSlot simpleSlot6 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 7), slotSharingGroup), false).get();
            SimpleSlot simpleSlot7 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 7), slotSharingGroup), false).get();
            SimpleSlot simpleSlot8 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 3, 7), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot5);
            Assert.assertNotNull(simpleSlot6);
            Assert.assertNotNull(simpleSlot7);
            Assert.assertNotNull(simpleSlot8);
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(simpleSlot5, simpleSlot6, simpleSlot7, simpleSlot8));
            SimpleSlot simpleSlot9 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot10 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 1, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot11 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 2, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot12 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 3, 4), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot9);
            Assert.assertNotNull(simpleSlot10);
            Assert.assertNotNull(simpleSlot11);
            Assert.assertNotNull(simpleSlot12);
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(simpleSlot9, simpleSlot10, simpleSlot11, simpleSlot12));
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 4, 5), slotSharingGroup), false).get();
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (NoResourceAvailableException e) {
            } catch (Exception e2) {
                Assert.fail("Wrong exception.");
            }
            simpleSlot5.releaseSlot();
            simpleSlot6.releaseSlot();
            simpleSlot7.releaseSlot();
            simpleSlot8.releaseSlot();
            SimpleSlot simpleSlot13 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 5, 7), slotSharingGroup), false).get();
            SimpleSlot simpleSlot14 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 6, 7), slotSharingGroup), false).get();
            SimpleSlot simpleSlot15 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 7, 7), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot13);
            Assert.assertNotNull(simpleSlot14);
            Assert.assertNotNull(simpleSlot15);
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            simpleSlot3.releaseSlot();
            simpleSlot4.releaseSlot();
            simpleSlot13.releaseSlot();
            simpleSlot14.releaseSlot();
            simpleSlot15.releaseSlot();
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
            simpleSlot9.releaseSlot();
            simpleSlot10.releaseSlot();
            simpleSlot11.releaseSlot();
            simpleSlot12.releaseSlot();
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(15L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void allocateSlotWithTemporarilyEmptyVertexGroup2() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 2), slotSharingGroup), false).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 2), slotSharingGroup), false).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 2), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot);
            Assert.assertNotNull(simpleSlot2);
            Assert.assertNotNull(simpleSlot3);
            Assert.assertEquals(2L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(1L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            simpleSlot2.releaseSlot();
            simpleSlot3.releaseSlot();
            Assert.assertNotNull((SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 1)), false).get());
            Assert.assertEquals(1L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID));
            Assert.assertEquals(1L, slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID2));
            Assert.assertEquals(0L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(4L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void scheduleMixedSharingAndNonSharing() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            JobVertexID jobVertexID4 = new JobVertexID();
            JobVertexID jobVertexID5 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(3));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 2)), false).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 1, 2)), false).get();
            Assert.assertNotNull(simpleSlot);
            Assert.assertNotNull(simpleSlot2);
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot5 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot6 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 4), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot3);
            Assert.assertNotNull(simpleSlot4);
            Assert.assertNotNull(simpleSlot5);
            Assert.assertNotNull(simpleSlot6);
            SimpleSlot simpleSlot7 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 1, 3)), false).get();
            Assert.assertNotNull(simpleSlot7);
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 4), slotSharingGroup), false).get();
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (NoResourceAvailableException e) {
            } catch (Exception e2) {
                Assert.fail("Wrong exception.");
            }
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 4), slotSharingGroup), false).get();
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (NoResourceAvailableException e3) {
            } catch (Exception e4) {
                Assert.fail("Wrong exception.");
            }
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 0, 3)), false);
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (NoResourceAvailableException e5) {
            } catch (Exception e6) {
                Assert.fail("Wrong exception.");
            }
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID5, 0, 1)), false);
                Assert.fail("Scheduler accepted too many tasks at the same time");
            } catch (NoResourceAvailableException e7) {
            } catch (Exception e8) {
                Assert.fail("Wrong exception.");
            }
            simpleSlot.releaseSlot();
            SimpleSlot simpleSlot8 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot9 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 3, 4), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot8);
            Assert.assertNotNull(simpleSlot9);
            simpleSlot3.releaseSlot();
            simpleSlot4.releaseSlot();
            simpleSlot5.releaseSlot();
            Assert.assertEquals(1L, scheduler.getNumberOfAvailableSlots());
            SimpleSlot simpleSlot10 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 0, 3)), false).get();
            Assert.assertNotNull(simpleSlot10);
            simpleSlot6.releaseSlot();
            SimpleSlot simpleSlot11 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 2, 3)), false).get();
            Assert.assertNotNull(simpleSlot11);
            simpleSlot2.releaseSlot();
            SimpleSlot simpleSlot12 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot13 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 4), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot12);
            Assert.assertNotNull(simpleSlot13);
            simpleSlot8.releaseSlot();
            simpleSlot9.releaseSlot();
            simpleSlot12.releaseSlot();
            simpleSlot13.releaseSlot();
            SimpleSlot simpleSlot14 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID5, 1, 2)), false).get();
            SimpleSlot simpleSlot15 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID5, 0, 2)), false).get();
            Assert.assertNotNull(simpleSlot14);
            Assert.assertNotNull(simpleSlot15);
            simpleSlot10.releaseSlot();
            simpleSlot7.releaseSlot();
            simpleSlot11.releaseSlot();
            simpleSlot14.releaseSlot();
            simpleSlot15.releaseSlot();
            Assert.assertEquals(5L, scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(15L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e9) {
            e9.printStackTrace();
            Assert.fail(e9.getMessage());
        }
    }

    @Test
    public void testLocalizedAssignment1() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            TaskManagerLocation taskManagerLocation = randomInstance.getTaskManagerLocation();
            TaskManagerLocation taskManagerLocation2 = randomInstance2.getTaskManagerLocation();
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 0, 2, taskManagerLocation), slotSharingGroup), false).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 1, 2, taskManagerLocation2), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot);
            Assert.assertNotNull(simpleSlot2);
            Assert.assertEquals(2L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(1L, randomInstance.getNumberOfAvailableSlots());
            Assert.assertEquals(1L, randomInstance2.getNumberOfAvailableSlots());
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 0, 2, taskManagerLocation), slotSharingGroup), false).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 1, 2, taskManagerLocation2), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot3);
            Assert.assertNotNull(simpleSlot4);
            Assert.assertEquals(2L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(1L, randomInstance.getNumberOfAvailableSlots());
            Assert.assertEquals(1L, randomInstance2.getNumberOfAvailableSlots());
            Assert.assertEquals(4L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testLocalizedAssignment2() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            TaskManagerLocation taskManagerLocation = randomInstance.getTaskManagerLocation();
            TaskManagerLocation taskManagerLocation2 = randomInstance2.getTaskManagerLocation();
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 0, 2, taskManagerLocation), slotSharingGroup), false).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 1, 2, taskManagerLocation), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot);
            Assert.assertNotNull(simpleSlot2);
            Assert.assertEquals(2L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, randomInstance.getNumberOfAvailableSlots());
            Assert.assertEquals(2L, randomInstance2.getNumberOfAvailableSlots());
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 0, 2, taskManagerLocation2), slotSharingGroup), false).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 1, 2, taskManagerLocation2), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot3);
            Assert.assertNotNull(simpleSlot4);
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, randomInstance.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, randomInstance2.getNumberOfAvailableSlots());
            Assert.assertEquals(4L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testLocalizedAssignment3() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2});
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            TaskManagerLocation taskManagerLocation = randomInstance.getTaskManagerLocation();
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 0, 2, taskManagerLocation), slotSharingGroup), false).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID, 1, 2, taskManagerLocation), slotSharingGroup), false).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 0, 4, taskManagerLocation), slotSharingGroup), false).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 1, 4, taskManagerLocation), slotSharingGroup), false).get();
            SimpleSlot simpleSlot5 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 3, 4, taskManagerLocation), slotSharingGroup), false).get();
            SimpleSlot simpleSlot6 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jobVertexID2, 4, 4, taskManagerLocation), slotSharingGroup), false).get();
            Assert.assertNotNull(simpleSlot);
            Assert.assertNotNull(simpleSlot2);
            Assert.assertNotNull(simpleSlot3);
            Assert.assertNotNull(simpleSlot4);
            Assert.assertNotNull(simpleSlot5);
            Assert.assertNotNull(simpleSlot6);
            Assert.assertEquals(4L, slotSharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals(0L, randomInstance.getNumberOfAvailableSlots());
            Assert.assertEquals(0L, randomInstance2.getNumberOfAvailableSlots());
            Assert.assertEquals(randomInstance.getTaskManagerID(), simpleSlot.getTaskManagerID());
            Assert.assertEquals(randomInstance.getTaskManagerID(), simpleSlot2.getTaskManagerID());
            Assert.assertEquals(randomInstance.getTaskManagerID(), simpleSlot3.getTaskManagerID());
            Assert.assertEquals(randomInstance.getTaskManagerID(), simpleSlot4.getTaskManagerID());
            Assert.assertEquals(randomInstance2.getTaskManagerID(), simpleSlot5.getTaskManagerID());
            Assert.assertEquals(randomInstance2.getTaskManagerID(), simpleSlot6.getTaskManagerID());
            Assert.assertEquals(4L, scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals(2L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(0L, scheduler.getNumberOfUnconstrainedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSequentialAllocateAndRelease() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            JobVertexID jobVertexID4 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2, jobVertexID3, jobVertexID4});
            Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(4));
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 1, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 1, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot5 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 1), slotSharingGroup), false).get();
            SimpleSlot simpleSlot6 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 2, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot7 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 3, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot8 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 2, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot9 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 3, 4), slotSharingGroup), false).get();
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            simpleSlot6.releaseSlot();
            simpleSlot7.releaseSlot();
            simpleSlot3.releaseSlot();
            simpleSlot4.releaseSlot();
            simpleSlot8.releaseSlot();
            simpleSlot9.releaseSlot();
            SimpleSlot simpleSlot10 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 0, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot11 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 1, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot12 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 2, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot13 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 3, 4), slotSharingGroup), false).get();
            simpleSlot5.releaseSlot();
            simpleSlot10.releaseSlot();
            simpleSlot11.releaseSlot();
            simpleSlot12.releaseSlot();
            simpleSlot13.releaseSlot();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testConcurrentAllocateAndRelease() {
        final ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 50; i++) {
            try {
                final JobVertexID jobVertexID = new JobVertexID();
                final JobVertexID jobVertexID2 = new JobVertexID();
                final JobVertexID jobVertexID3 = new JobVertexID();
                final JobVertexID jobVertexID4 = new JobVertexID();
                final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2, jobVertexID3, jobVertexID4});
                final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
                scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(4));
                final AtomicInteger atomicInteger = new AtomicInteger();
                final AtomicInteger atomicInteger2 = new AtomicInteger();
                final AtomicBoolean atomicBoolean = new AtomicBoolean();
                final AtomicInteger atomicInteger3 = new AtomicInteger();
                final Random random = new Random();
                final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
                final AtomicInteger atomicInteger4 = new AtomicInteger();
                final Runnable runnable = new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerSlotSharingTest.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, atomicInteger3.getAndIncrement(), 4), slotSharingGroup), false).get();
                            CommonTestUtils.sleepUninterruptibly(random.nextInt(5));
                            simpleSlot.releaseSlot();
                            if (atomicInteger4.incrementAndGet() == 13) {
                                synchronized (atomicInteger4) {
                                    atomicInteger4.notifyAll();
                                }
                            }
                        } catch (Throwable th) {
                            th.printStackTrace();
                            atomicBoolean2.set(true);
                        }
                    }
                };
                final Runnable runnable2 = new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerSlotSharingTest.2
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            if (atomicBoolean.compareAndSet(false, true)) {
                                SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 1), slotSharingGroup), false).get();
                                CommonTestUtils.sleepUninterruptibly(5L);
                                newFixedThreadPool.execute(runnable);
                                newFixedThreadPool.execute(runnable);
                                newFixedThreadPool.execute(runnable);
                                newFixedThreadPool.execute(runnable);
                                simpleSlot.releaseSlot();
                                if (atomicInteger4.incrementAndGet() == 13) {
                                    synchronized (atomicInteger4) {
                                        atomicInteger4.notifyAll();
                                    }
                                }
                            }
                        } catch (Throwable th) {
                            th.printStackTrace();
                            atomicBoolean2.set(true);
                        }
                    }
                };
                final Runnable runnable3 = new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerSlotSharingTest.3
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, atomicInteger2.getAndIncrement(), 4), slotSharingGroup), false).get();
                            CommonTestUtils.sleepUninterruptibly(random.nextInt(5));
                            newFixedThreadPool.execute(runnable2);
                            CommonTestUtils.sleepUninterruptibly(random.nextInt(5));
                            simpleSlot.releaseSlot();
                            if (atomicInteger4.incrementAndGet() == 13) {
                                synchronized (atomicInteger4) {
                                    atomicInteger4.notifyAll();
                                }
                            }
                        } catch (Throwable th) {
                            th.printStackTrace();
                            atomicBoolean2.set(true);
                        }
                    }
                };
                final Runnable runnable4 = new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerSlotSharingTest.4
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, atomicInteger.getAndIncrement(), 4), slotSharingGroup), false).get();
                            CommonTestUtils.sleepUninterruptibly(random.nextInt(5));
                            newFixedThreadPool.execute(runnable3);
                            CommonTestUtils.sleepUninterruptibly(random.nextInt(5));
                            simpleSlot.releaseSlot();
                            if (atomicInteger4.incrementAndGet() == 13) {
                                synchronized (atomicInteger4) {
                                    atomicInteger4.notifyAll();
                                }
                            }
                        } catch (Throwable th) {
                            th.printStackTrace();
                            atomicBoolean2.set(true);
                        }
                    }
                };
                Runnable runnable5 = new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerSlotSharingTest.5
                    @Override // java.lang.Runnable
                    public void run() {
                        CommonTestUtils.sleepUninterruptibly(random.nextInt(10));
                        newFixedThreadPool.execute(runnable4);
                    }
                };
                newFixedThreadPool.execute(runnable5);
                newFixedThreadPool.execute(runnable5);
                newFixedThreadPool.execute(runnable5);
                newFixedThreadPool.execute(runnable5);
                synchronized (atomicInteger4) {
                    while (!atomicBoolean2.get() && atomicInteger4.get() < 13) {
                        atomicInteger4.wait(1000L);
                    }
                }
                Assert.assertFalse("Thread failed", atomicBoolean2.get());
                while (scheduler.getNumberOfAvailableSlots() < 4) {
                    CommonTestUtils.sleepUninterruptibly(5L);
                }
                Assert.assertEquals(1L, scheduler.getNumberOfAvailableInstances());
                Assert.assertEquals(1L, scheduler.getNumberOfInstancesWithAvailableSlots());
                Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
                Assert.assertEquals(13L, scheduler.getNumberOfUnconstrainedAssignments());
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                return;
            }
        }
    }

    @Test
    public void testDopIncreases() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            JobVertexID jobVertexID4 = new JobVertexID();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertexID, jobVertexID2, jobVertexID3, jobVertexID4});
            Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(4));
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID, 0, 1), slotSharingGroup), false).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID2, 0, 1), slotSharingGroup), false).get();
            Assert.assertTrue(simpleSlot.getParent() == simpleSlot2.getParent());
            Assert.assertEquals(3L, scheduler.getNumberOfAvailableSlots());
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 0, 5), slotSharingGroup), false).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 1, 5), slotSharingGroup), false).get();
            SimpleSlot simpleSlot5 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 0, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot6 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 1, 4), slotSharingGroup), false).get();
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            SimpleSlot simpleSlot7 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 2, 5), slotSharingGroup), false).get();
            SimpleSlot simpleSlot8 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 3, 5), slotSharingGroup), false).get();
            SimpleSlot simpleSlot9 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 2, 4), slotSharingGroup), false).get();
            SimpleSlot simpleSlot10 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID4, 3, 4), slotSharingGroup), false).get();
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jobVertexID3, 4, 5), slotSharingGroup), false);
                Assert.fail("should throw an exception");
            } catch (NoResourceAvailableException e) {
            }
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
            simpleSlot3.releaseSlot();
            simpleSlot4.releaseSlot();
            simpleSlot7.releaseSlot();
            simpleSlot8.releaseSlot();
            simpleSlot5.releaseSlot();
            simpleSlot6.releaseSlot();
            simpleSlot9.releaseSlot();
            simpleSlot10.releaseSlot();
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }
}
