package org.apache.flink.runtime.executiongraph;

import java.util.Iterator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.class */
public class ExecutionGraphSuspendTest {
    @Test
    public void testSuspendedOutOfCreated() throws Exception {
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.spy(new SimpleAckingTaskManagerGateway());
        ExecutionGraph createExecutionGraph = createExecutionGraph(taskManagerGateway, 10);
        Assert.assertEquals(JobStatus.CREATED, createExecutionGraph.getState());
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.SUSPENDED, createExecutionGraph.getState());
        validateAllVerticesInState(createExecutionGraph, ExecutionState.CANCELED);
        validateCancelRpcCalls(taskManagerGateway, 0);
        ensureCannotLeaveSuspendedState(createExecutionGraph, taskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfDeploying() throws Exception {
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.spy(new SimpleAckingTaskManagerGateway());
        ExecutionGraph createExecutionGraph = createExecutionGraph(taskManagerGateway, 10);
        createExecutionGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, createExecutionGraph.getState());
        validateAllVerticesInState(createExecutionGraph, ExecutionState.DEPLOYING);
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.SUSPENDED, createExecutionGraph.getState());
        validateAllVerticesInState(createExecutionGraph, ExecutionState.CANCELING);
        validateCancelRpcCalls(taskManagerGateway, 10);
        ensureCannotLeaveSuspendedState(createExecutionGraph, taskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfRunning() throws Exception {
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.spy(new SimpleAckingTaskManagerGateway());
        ExecutionGraph createExecutionGraph = createExecutionGraph(taskManagerGateway, 10);
        createExecutionGraph.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(createExecutionGraph);
        Assert.assertEquals(JobStatus.RUNNING, createExecutionGraph.getState());
        validateAllVerticesInState(createExecutionGraph, ExecutionState.RUNNING);
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.SUSPENDED, createExecutionGraph.getState());
        validateAllVerticesInState(createExecutionGraph, ExecutionState.CANCELING);
        validateCancelRpcCalls(taskManagerGateway, 10);
        ensureCannotLeaveSuspendedState(createExecutionGraph, taskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfFailing() throws Exception {
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.spy(new SimpleAckingTaskManagerGateway());
        ExecutionGraph createExecutionGraph = createExecutionGraph(taskManagerGateway, 10);
        createExecutionGraph.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(createExecutionGraph);
        createExecutionGraph.failGlobal(new Exception("fail global"));
        Assert.assertEquals(JobStatus.FAILING, createExecutionGraph.getState());
        validateCancelRpcCalls(taskManagerGateway, 10);
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.SUSPENDED, createExecutionGraph.getState());
        ExecutionGraphTestUtils.completeCancellingForAllVertices(createExecutionGraph);
        ensureCannotLeaveSuspendedState(createExecutionGraph, taskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfFailed() throws Exception {
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.spy(new SimpleAckingTaskManagerGateway());
        ExecutionGraph createExecutionGraph = createExecutionGraph(taskManagerGateway, 10);
        createExecutionGraph.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(createExecutionGraph);
        createExecutionGraph.failGlobal(new Exception("fail global"));
        Assert.assertEquals(JobStatus.FAILING, createExecutionGraph.getState());
        validateCancelRpcCalls(taskManagerGateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(createExecutionGraph);
        Assert.assertEquals(JobStatus.FAILED, createExecutionGraph.getState());
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.FAILED, createExecutionGraph.getState());
        validateCancelRpcCalls(taskManagerGateway, 10);
    }

    @Test
    public void testSuspendedOutOfCanceling() throws Exception {
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.spy(new SimpleAckingTaskManagerGateway());
        ExecutionGraph createExecutionGraph = createExecutionGraph(taskManagerGateway, 10);
        createExecutionGraph.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(createExecutionGraph);
        createExecutionGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, createExecutionGraph.getState());
        validateCancelRpcCalls(taskManagerGateway, 10);
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.SUSPENDED, createExecutionGraph.getState());
        ExecutionGraphTestUtils.completeCancellingForAllVertices(createExecutionGraph);
        ensureCannotLeaveSuspendedState(createExecutionGraph, taskManagerGateway);
    }

    @Test
    public void testSuspendedOutOfCanceled() throws Exception {
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.spy(new SimpleAckingTaskManagerGateway());
        ExecutionGraph createExecutionGraph = createExecutionGraph(taskManagerGateway, 10);
        createExecutionGraph.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(createExecutionGraph);
        createExecutionGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, createExecutionGraph.getState());
        validateCancelRpcCalls(taskManagerGateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(createExecutionGraph);
        Assert.assertEquals(JobStatus.CANCELED, createExecutionGraph.getState());
        createExecutionGraph.suspend(new Exception("suspend"));
        Assert.assertEquals(JobStatus.CANCELED, createExecutionGraph.getState());
        validateCancelRpcCalls(taskManagerGateway, 10);
    }

    @Test
    public void testSuspendWhileRestarting() throws Exception {
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
        createSimpleTestGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, createSimpleTestGraph.getState());
        ExecutionGraphTestUtils.switchAllVerticesToRunning(createSimpleTestGraph);
        createSimpleTestGraph.failGlobal(new Exception("test"));
        Assert.assertEquals(JobStatus.FAILING, createSimpleTestGraph.getState());
        ExecutionGraphTestUtils.completeCancellingForAllVertices(createSimpleTestGraph);
        Assert.assertEquals(JobStatus.RESTARTING, createSimpleTestGraph.getState());
        Exception exc = new Exception("Suspended");
        createSimpleTestGraph.suspend(exc);
        Assert.assertEquals(JobStatus.SUSPENDED, createSimpleTestGraph.getState());
        Assert.assertEquals(exc, createSimpleTestGraph.getFailureCause());
    }

    private static void ensureCannotLeaveSuspendedState(ExecutionGraph executionGraph, TaskManagerGateway taskManagerGateway) {
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        Mockito.reset(new TaskManagerGateway[]{taskManagerGateway});
        executionGraph.failGlobal(new Exception("fail"));
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        Mockito.verifyNoMoreInteractions(new Object[]{taskManagerGateway});
        executionGraph.cancel();
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        Mockito.verifyNoMoreInteractions(new Object[]{taskManagerGateway});
        executionGraph.suspend(new Exception("suspend again"));
        Assert.assertEquals(JobStatus.SUSPENDED, executionGraph.getState());
        Mockito.verifyNoMoreInteractions(new Object[]{taskManagerGateway});
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(0L, ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptNumber());
        }
    }

    private static void validateAllVerticesInState(ExecutionGraph executionGraph, ExecutionState executionState) {
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(executionState, ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getState());
        }
    }

    private static void validateCancelRpcCalls(TaskManagerGateway taskManagerGateway, int i) {
        ((TaskManagerGateway) Mockito.verify(taskManagerGateway, Mockito.times(i))).cancelTask((ExecutionAttemptID) Matchers.any(ExecutionAttemptID.class), (Time) Matchers.any(Time.class));
    }

    private static ExecutionGraph createExecutionGraph(TaskManagerGateway taskManagerGateway, int i) throws Exception {
        JobID jobID = new JobID();
        JobVertex jobVertex = new JobVertex("vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        return ExecutionGraphTestUtils.createSimpleTestGraph(jobID, new SimpleSlotProvider(jobID, i, taskManagerGateway), new FixedDelayRestartStrategy(0, 0L), jobVertex);
    }
}
