package org.apache.flink.runtime.leaderelection;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.Tasks$BlockingOnceReceiver$;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.class */
public class LeaderChangeJobRecoveryTest extends TestLogger {
    private static FiniteDuration timeout = FiniteDuration.apply(30, TimeUnit.SECONDS);
    private JobID jobId;
    private TestingManualHighAvailabilityServices highAvailabilityServices;
    private int numTMs = 1;
    private int numSlotsPerTM = 1;
    private int parallelism = this.numTMs * this.numSlotsPerTM;
    private TestingCluster cluster = null;
    private JobGraph job = createBlockingJob(this.parallelism);

    @Before
    public void before() throws TimeoutException, InterruptedException {
        this.jobId = HighAvailabilityServices.DEFAULT_JOB_ID;
        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
        Configuration configuration = new Configuration();
        configuration.setInteger("local.number-jobmanager", 1);
        configuration.setInteger("local.number-taskmanager", this.numTMs);
        configuration.setInteger("taskmanager.numberOfTaskSlots", this.numSlotsPerTM);
        configuration.setString("restart-strategy", "fixeddelay");
        configuration.setInteger("restart-strategy.fixed-delay.attempts", 9999);
        configuration.setString("restart-strategy.fixed-delay.delay", "100 milli");
        this.highAvailabilityServices = new TestingManualHighAvailabilityServices();
        this.cluster = new TestingCluster(configuration, this.highAvailabilityServices, true, false);
        this.cluster.start(false);
        this.cluster.waitForActorsToBeAlive();
    }

    @Test
    public void testNotRestartedWhenLosingLeadership() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        this.highAvailabilityServices.grantLeadership(this.jobId, 0, randomUUID);
        this.highAvailabilityServices.notifyRetrievers(this.jobId, 0, randomUUID);
        this.cluster.waitForTaskManagersToBeRegistered(timeout);
        this.cluster.submitJobDetached(this.job);
        ActorGateway leaderGateway = this.cluster.getLeaderGateway(timeout);
        Await.ready(leaderGateway.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(this.job.getJobID()), timeout), timeout);
        TestingJobManagerMessages.ResponseExecutionGraph responseExecutionGraph = (TestingJobManagerMessages.ResponseExecutionGraph) Await.result(leaderGateway.ask(new TestingJobManagerMessages.RequestExecutionGraph(this.job.getJobID()), timeout), timeout);
        Assert.assertTrue(responseExecutionGraph instanceof TestingJobManagerMessages.ExecutionGraphFound);
        ExecutionGraph executionGraph = ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
        this.highAvailabilityServices.revokeLeadership(this.jobId);
        executionGraph.getTerminationFuture().get(30L, TimeUnit.SECONDS);
    }

    public JobGraph createBlockingJob(int i) {
        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
        JobVertex jobVertex = new JobVertex("sender");
        JobVertex jobVertex2 = new JobVertex("receiver");
        jobVertex.setInvokableClass(Tasks.Sender.class);
        jobVertex2.setInvokableClass(Tasks.BlockingOnceReceiver.class);
        jobVertex.setParallelism(i);
        jobVertex2.setParallelism(i);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        return new JobGraph("Blocking test job", new JobVertex[]{jobVertex, jobVertex2});
    }
}
