package org.apache.flink.runtime.checkpoint;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.class */
public class CheckpointCoordinatorExternalizedCheckpointsTest {

    @Rule
    public TemporaryFolder tmp = new TemporaryFolder();

    @Test
    public void testTriggerAndConfirmSimpleExternalizedCheckpoint() throws Exception {
        JobID jobID = new JobID();
        ExternalizedCheckpointSettings externalizeCheckpoints = ExternalizedCheckpointSettings.externalizeCheckpoints(false);
        File newFolder = this.tmp.newFolder();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTest.mockExecutionVertex(executionAttemptID);
        ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(executionAttemptID2);
        HashMap hashMap = new HashMap();
        hashMap.put(mockExecutionVertex.getJobvertexId(), mockExecutionVertex.getJobVertex());
        hashMap.put(mockExecutionVertex2.getJobvertexId(), mockExecutionVertex2.getJobVertex());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, 0L, Integer.MAX_VALUE, externalizeCheckpoints, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), newFolder.getAbsolutePath(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        long currentTimeMillis = System.currentTimeMillis();
        checkpointCoordinator.triggerCheckpoint(currentTimeMillis, false);
        long longValue = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue));
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue));
        CompletedCheckpoint latestCheckpoint = checkpointCoordinator.getCheckpointStore().getLatestCheckpoint();
        verifyExternalizedCheckpoint(latestCheckpoint, jobID, longValue, currentTimeMillis);
        verifyExternalizedCheckpointRestore(latestCheckpoint, hashMap, mockExecutionVertex, mockExecutionVertex2);
        long currentTimeMillis2 = System.currentTimeMillis() + 7;
        checkpointCoordinator.triggerCheckpoint(currentTimeMillis2, false);
        long longValue2 = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue2));
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue2));
        CompletedCheckpoint latestCheckpoint2 = checkpointCoordinator.getCheckpointStore().getLatestCheckpoint();
        verifyExternalizedCheckpoint(latestCheckpoint2, jobID, longValue2, currentTimeMillis2);
        verifyExternalizedCheckpointRestore(latestCheckpoint2, hashMap, mockExecutionVertex, mockExecutionVertex2);
        long currentTimeMillis3 = System.currentTimeMillis() + 146;
        checkpointCoordinator.triggerCheckpoint(currentTimeMillis3, false);
        long longValue3 = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue3));
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue3));
        CompletedCheckpoint latestCheckpoint3 = checkpointCoordinator.getCheckpointStore().getLatestCheckpoint();
        verifyExternalizedCheckpoint(latestCheckpoint3, jobID, longValue3, currentTimeMillis3);
        verifyExternalizedCheckpointRestore(latestCheckpoint3, hashMap, mockExecutionVertex, mockExecutionVertex2);
        checkpointCoordinator.shutdown(JobStatus.FINISHED);
    }

    private static void verifyExternalizedCheckpoint(CompletedCheckpoint completedCheckpoint, JobID jobID, long j, long j2) {
        Assert.assertEquals(jobID, completedCheckpoint.getJobId());
        Assert.assertEquals(j, completedCheckpoint.getCheckpointID());
        Assert.assertEquals(j2, completedCheckpoint.getTimestamp());
        Assert.assertNotNull(completedCheckpoint.getExternalPointer());
        Assert.assertNotNull(completedCheckpoint.getExternalizedMetadata());
        Assert.assertTrue(new File(completedCheckpoint.getExternalizedMetadata().getFilePath().getPath()).exists());
    }

    private static void verifyExternalizedCheckpointRestore(CompletedCheckpoint completedCheckpoint, Map<JobVertexID, ExecutionJobVertex> map, ExecutionVertex... executionVertexArr) throws IOException {
        CompletedCheckpoint loadAndValidateSavepoint = SavepointLoader.loadAndValidateSavepoint(completedCheckpoint.getJobId(), map, completedCheckpoint.getExternalPointer(), Thread.currentThread().getContextClassLoader(), false);
        for (ExecutionVertex executionVertex : executionVertexArr) {
            for (OperatorID operatorID : executionVertex.getJobVertex().getOperatorIDs()) {
                Assert.assertEquals(completedCheckpoint.getOperatorStates().get(operatorID), loadAndValidateSavepoint.getOperatorStates().get(operatorID));
            }
        }
    }
}
