package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraph.class */
public class ExecutionGraph implements AccessExecutionGraph {
    private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
    private static final AtomicLongFieldUpdater<ExecutionGraph> GLOBAL_VERSION_UPDATER = AtomicLongFieldUpdater.newUpdater(ExecutionGraph.class, "globalModVersion");
    static final Logger LOG = LoggerFactory.getLogger((Class<?>) ExecutionGraph.class);
    private final Object progressLock;
    private final JobInformation jobInformation;
    private final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey;
    private final ScheduledExecutorService futureExecutor;
    private final Executor ioExecutor;
    private boolean isStoppable;
    private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
    private final List<ExecutionJobVertex> verticesInCreationOrder;
    private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
    private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
    private final List<JobStatusListener> jobStatusListeners;
    private final List<ExecutionStatusListener> executionListeners;
    private final FailoverStrategy failoverStrategy;
    private final long[] stateTimestamps;
    private final Time rpcTimeout;
    private final Time allocationTimeout;
    private final RestartStrategy restartStrategy;
    private final SlotProvider slotProvider;
    private final ClassLoader userClassLoader;
    private final KvStateLocationRegistry kvStateLocationRegistry;
    private final BlobWriter blobWriter;
    private int numVerticesTotal;
    private boolean allowQueuedScheduling;
    private ScheduleMode scheduleMode;
    private final AtomicInteger verticesFinished;
    private volatile JobStatus state;
    private volatile CompletableFuture<JobStatus> terminationFuture;
    private volatile long globalModVersion;
    private volatile Throwable failureCause;
    private volatile ErrorInfo failureInfo;

    @Nullable
    private volatile CompletableFuture<Void> schedulingFuture;
    private CheckpointCoordinator checkpointCoordinator;
    private CheckpointStatsTracker checkpointStatsTracker;
    private String jsonPlan;

    @VisibleForTesting
    ExecutionGraph(ScheduledExecutorService scheduledExecutorService, Executor executor, JobID jobID, String str, Configuration configuration, SerializedValue<ExecutionConfig> serializedValue, Time time, RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException {
        this(new JobInformation(jobID, str, serializedValue, configuration, Collections.emptyList(), Collections.emptyList()), scheduledExecutorService, executor, time, restartStrategy, slotProvider);
    }

    @VisibleForTesting
    ExecutionGraph(JobInformation jobInformation, ScheduledExecutorService scheduledExecutorService, Executor executor, Time time, RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException {
        this(jobInformation, scheduledExecutorService, executor, time, restartStrategy, new RestartAllStrategy.Factory(), slotProvider);
    }

    @VisibleForTesting
    ExecutionGraph(JobInformation jobInformation, ScheduledExecutorService scheduledExecutorService, Executor executor, Time time, RestartStrategy restartStrategy, FailoverStrategy.Factory factory, SlotProvider slotProvider) throws IOException {
        this(jobInformation, scheduledExecutorService, executor, time, restartStrategy, factory, slotProvider, ExecutionGraph.class.getClassLoader(), VoidBlobWriter.getInstance(), time);
    }

    public ExecutionGraph(JobInformation jobInformation, ScheduledExecutorService scheduledExecutorService, Executor executor, Time time, RestartStrategy restartStrategy, FailoverStrategy.Factory factory, SlotProvider slotProvider, ClassLoader classLoader, BlobWriter blobWriter, Time time2) throws IOException {
        this.progressLock = new Object();
        this.isStoppable = true;
        this.allowQueuedScheduling = false;
        this.scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
        this.state = JobStatus.CREATED;
        Preconditions.checkNotNull(scheduledExecutorService);
        this.jobInformation = (JobInformation) Preconditions.checkNotNull(jobInformation);
        this.blobWriter = (BlobWriter) Preconditions.checkNotNull(blobWriter);
        this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter);
        this.futureExecutor = (ScheduledExecutorService) Preconditions.checkNotNull(scheduledExecutorService);
        this.ioExecutor = (Executor) Preconditions.checkNotNull(executor);
        this.slotProvider = (SlotProvider) Preconditions.checkNotNull(slotProvider, "scheduler");
        this.userClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader, "userClassLoader");
        this.tasks = new ConcurrentHashMap<>(16);
        this.intermediateResults = new ConcurrentHashMap<>(16);
        this.verticesInCreationOrder = new ArrayList(16);
        this.currentExecutions = new ConcurrentHashMap<>(16);
        this.jobStatusListeners = new CopyOnWriteArrayList();
        this.executionListeners = new CopyOnWriteArrayList();
        this.stateTimestamps = new long[JobStatus.values().length];
        this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
        this.rpcTimeout = (Time) Preconditions.checkNotNull(time);
        this.allocationTimeout = (Time) Preconditions.checkNotNull(time2);
        this.restartStrategy = restartStrategy;
        this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices());
        this.verticesFinished = new AtomicInteger();
        this.globalModVersion = 1L;
        this.failoverStrategy = (FailoverStrategy) Preconditions.checkNotNull(factory.create(this), "null failover strategy");
        this.schedulingFuture = null;
        LOG.info("Job recovers via failover strategy: {}", this.failoverStrategy.getStrategyName());
    }

    public int getNumberOfExecutionJobVertices() {
        return this.verticesInCreationOrder.size();
    }

    public boolean isQueuedSchedulingAllowed() {
        return this.allowQueuedScheduling;
    }

    public void setQueuedSchedulingAllowed(boolean z) {
        this.allowQueuedScheduling = z;
    }

    public void setScheduleMode(ScheduleMode scheduleMode) {
        this.scheduleMode = scheduleMode;
    }

    public ScheduleMode getScheduleMode() {
        return this.scheduleMode;
    }

    public Time getAllocationTimeout() {
        return this.allocationTimeout;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public boolean isArchived() {
        return false;
    }

    public void enableCheckpointing(long j, long j2, long j3, int i, CheckpointRetentionPolicy checkpointRetentionPolicy, List<ExecutionJobVertex> list, List<ExecutionJobVertex> list2, List<ExecutionJobVertex> list3, List<MasterTriggerRestoreHook<?>> list4, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, StateBackend stateBackend, CheckpointStatsTracker checkpointStatsTracker) {
        Preconditions.checkArgument(j >= 10, "checkpoint interval must not be below 10ms");
        Preconditions.checkArgument(j2 >= 10, "checkpoint timeout must not be below 10ms");
        Preconditions.checkState(this.state == JobStatus.CREATED, "Job must be in CREATED state");
        Preconditions.checkState(this.checkpointCoordinator == null, "checkpointing already enabled");
        ExecutionVertex[] collectExecutionVertices = collectExecutionVertices(list);
        ExecutionVertex[] collectExecutionVertices2 = collectExecutionVertices(list2);
        ExecutionVertex[] collectExecutionVertices3 = collectExecutionVertices(list3);
        this.checkpointStatsTracker = (CheckpointStatsTracker) Preconditions.checkNotNull(checkpointStatsTracker, "CheckpointStatsTracker");
        this.checkpointCoordinator = new CheckpointCoordinator(this.jobInformation.getJobId(), j, j2, j3, i, checkpointRetentionPolicy, collectExecutionVertices, collectExecutionVertices2, collectExecutionVertices3, checkpointIDCounter, completedCheckpointStore, stateBackend, this.ioExecutor, SharedStateRegistry.DEFAULT_FACTORY);
        for (MasterTriggerRestoreHook<?> masterTriggerRestoreHook : list4) {
            if (!this.checkpointCoordinator.addMasterHook(masterTriggerRestoreHook)) {
                LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", masterTriggerRestoreHook.getIdentifier());
            }
        }
        this.checkpointCoordinator.setCheckpointStatsTracker(this.checkpointStatsTracker);
        if (j != Long.MAX_VALUE) {
            registerJobStatusListener(this.checkpointCoordinator.createActivatorDeactivator());
        }
    }

    @Nullable
    public CheckpointCoordinator getCheckpointCoordinator() {
        return this.checkpointCoordinator;
    }

    public KvStateLocationRegistry getKvStateLocationRegistry() {
        return this.kvStateLocationRegistry;
    }

    public RestartStrategy getRestartStrategy() {
        return this.restartStrategy;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration() {
        if (this.checkpointStatsTracker != null) {
            return this.checkpointStatsTracker.getJobCheckpointingConfiguration();
        }
        return null;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
        if (this.checkpointStatsTracker != null) {
            return this.checkpointStatsTracker.createSnapshot();
        }
        return null;
    }

    private ExecutionVertex[] collectExecutionVertices(List<ExecutionJobVertex> list) {
        if (list.size() == 1) {
            ExecutionJobVertex executionJobVertex = list.get(0);
            if (executionJobVertex.getGraph() != this) {
                throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
            }
            return executionJobVertex.getTaskVertices();
        }
        ArrayList arrayList = new ArrayList();
        for (ExecutionJobVertex executionJobVertex2 : list) {
            if (executionJobVertex2.getGraph() != this) {
                throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
            }
            arrayList.addAll(Arrays.asList(executionJobVertex2.getTaskVertices()));
        }
        return (ExecutionVertex[]) arrayList.toArray(new ExecutionVertex[arrayList.size()]);
    }

    public Collection<PermanentBlobKey> getRequiredJarFiles() {
        return this.jobInformation.getRequiredJarFileBlobKeys();
    }

    public Collection<URL> getRequiredClasspaths() {
        return this.jobInformation.getRequiredClasspathURLs();
    }

    public void setJsonPlan(String str) {
        this.jsonPlan = str;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public String getJsonPlan() {
        return this.jsonPlan;
    }

    public SlotProvider getSlotProvider() {
        return this.slotProvider;
    }

    public Either<SerializedValue<JobInformation>, PermanentBlobKey> getJobInformationOrBlobKey() {
        return this.jobInformationOrBlobKey;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public JobID getJobID() {
        return this.jobInformation.getJobId();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public String getJobName() {
        return this.jobInformation.getJobName();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public boolean isStoppable() {
        return this.isStoppable;
    }

    public Configuration getJobConfiguration() {
        return this.jobInformation.getJobConfiguration();
    }

    public ClassLoader getUserClassLoader() {
        return this.userClassLoader;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public JobStatus getState() {
        return this.state;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public ErrorInfo getFailureInfo() {
        return this.failureInfo;
    }

    public long getNumberOfFullRestarts() {
        return this.globalModVersion - 1;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public ExecutionJobVertex getJobVertex(JobVertexID jobVertexID) {
        return this.tasks.get(jobVertexID);
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
        return Collections.unmodifiableMap(this.tasks);
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public Iterable<ExecutionJobVertex> getVerticesTopologically() {
        final int size = this.verticesInCreationOrder.size();
        return new Iterable<ExecutionJobVertex>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraph.1
            @Override // java.lang.Iterable
            public Iterator<ExecutionJobVertex> iterator() {
                return new Iterator<ExecutionJobVertex>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraph.1.1
                    private int pos = 0;

                    @Override // java.util.Iterator
                    public boolean hasNext() {
                        return this.pos < size;
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.Iterator
                    public ExecutionJobVertex next() {
                        if (!hasNext()) {
                            throw new NoSuchElementException();
                        }
                        List list = ExecutionGraph.this.verticesInCreationOrder;
                        int i = this.pos;
                        this.pos = i + 1;
                        return (ExecutionJobVertex) list.get(i);
                    }

                    @Override // java.util.Iterator
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    public int getTotalNumberOfVertices() {
        return this.numVerticesTotal;
    }

    public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
        return Collections.unmodifiableMap(this.intermediateResults);
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public Iterable<ExecutionVertex> getAllExecutionVertices() {
        return new Iterable<ExecutionVertex>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraph.2
            @Override // java.lang.Iterable
            public Iterator<ExecutionVertex> iterator() {
                return new AllVerticesIterator(ExecutionGraph.this.getVerticesTopologically().iterator());
            }
        };
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public long getStatusTimestamp(JobStatus jobStatus) {
        return this.stateTimestamps[jobStatus.ordinal()];
    }

    public final BlobWriter getBlobWriter() {
        return this.blobWriter;
    }

    public Executor getFutureExecutor() {
        return this.futureExecutor;
    }

    public Map<String, OptionalFailure<Accumulator<?, ?>>> aggregateUserAccumulators() {
        HashMap hashMap = new HashMap();
        Iterator<ExecutionVertex> it = getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            Map<String, Accumulator<?, ?>> userAccumulators = it.next().getCurrentExecutionAttempt().getUserAccumulators();
            if (userAccumulators != null) {
                AccumulatorHelper.mergeInto(hashMap, userAccumulators);
            }
        }
        return hashMap;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorsSerialized() {
        return (Map) aggregateUserAccumulators().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return serializeAccumulator((String) entry.getKey(), (OptionalFailure) entry.getValue());
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static SerializedValue<OptionalFailure<Object>> serializeAccumulator(String str, OptionalFailure<Accumulator<?, ?>> optionalFailure) {
        try {
            return optionalFailure.isFailure() ? new SerializedValue<>(OptionalFailure.ofFailure(optionalFailure.getFailureCause())) : new SerializedValue<>(OptionalFailure.of(optionalFailure.getUnchecked().getLocalValue()));
        } catch (IOException e) {
            LOG.error("Could not serialize accumulator " + str + '.', (Throwable) e);
            try {
                return new SerializedValue<>(OptionalFailure.ofFailure(e));
            } catch (IOException e2) {
                throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e2);
            }
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(aggregateUserAccumulators());
    }

    public void attachJobGraph(List<JobVertex> list) throws JobException {
        LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} vertices and {} intermediate results.", Integer.valueOf(list.size()), Integer.valueOf(this.tasks.size()), Integer.valueOf(this.intermediateResults.size()));
        ArrayList arrayList = new ArrayList(list.size());
        long currentTimeMillis = System.currentTimeMillis();
        for (JobVertex jobVertex : list) {
            if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
                this.isStoppable = false;
            }
            ExecutionJobVertex executionJobVertex = new ExecutionJobVertex(this, jobVertex, 1, this.rpcTimeout, this.globalModVersion, currentTimeMillis);
            executionJobVertex.connectToPredecessors(this.intermediateResults);
            ExecutionJobVertex putIfAbsent = this.tasks.putIfAbsent(jobVertex.getID(), executionJobVertex);
            if (putIfAbsent != null) {
                throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", jobVertex.getID(), executionJobVertex, putIfAbsent));
            }
            for (IntermediateResult intermediateResult : executionJobVertex.getProducedDataSets()) {
                IntermediateResult putIfAbsent2 = this.intermediateResults.putIfAbsent(intermediateResult.getId(), intermediateResult);
                if (putIfAbsent2 != null) {
                    throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", intermediateResult.getId(), intermediateResult, putIfAbsent2));
                }
            }
            this.verticesInCreationOrder.add(executionJobVertex);
            this.numVerticesTotal += executionJobVertex.getParallelism();
            arrayList.add(executionJobVertex);
        }
        this.terminationFuture = new CompletableFuture<>();
        this.failoverStrategy.notifyNewVertices(arrayList);
    }

    public void scheduleForExecution() throws JobException {
        CompletableFuture<Void> scheduleEager;
        long j = this.globalModVersion;
        if (!transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
            throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
        }
        switch (this.scheduleMode) {
            case LAZY_FROM_SOURCES:
                scheduleEager = scheduleLazy(this.slotProvider);
                break;
            case EAGER:
                scheduleEager = scheduleEager(this.slotProvider, this.allocationTimeout);
                break;
            default:
                throw new JobException("Schedule mode is invalid.");
        }
        if (this.state != JobStatus.RUNNING || j != this.globalModVersion) {
            scheduleEager.cancel(false);
        } else {
            this.schedulingFuture = scheduleEager;
            scheduleEager.whenCompleteAsync((r4, th) -> {
                if (th == null || (th instanceof CancellationException)) {
                    return;
                }
                failGlobal(ExceptionUtils.stripCompletionException(th));
            }, (Executor) this.futureExecutor);
        }
    }

    private CompletableFuture<Void> scheduleLazy(SlotProvider slotProvider) {
        ArrayList arrayList = new ArrayList(this.numVerticesTotal);
        for (ExecutionJobVertex executionJobVertex : this.verticesInCreationOrder) {
            if (executionJobVertex.getJobVertex().isInputVertex()) {
                arrayList.add(executionJobVertex.scheduleAll(slotProvider, this.allowQueuedScheduling, LocationPreferenceConstraint.ALL));
            }
        }
        return FutureUtils.waitForAll(arrayList);
    }

    private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, Time time) {
        Preconditions.checkState(this.state == JobStatus.RUNNING, "job is not running currently");
        boolean z = this.allowQueuedScheduling;
        ArrayList arrayList = new ArrayList(getNumberOfExecutionJobVertices());
        Iterator<ExecutionJobVertex> it = getVerticesTopologically().iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next().allocateResourcesForAll(slotProvider, z, LocationPreferenceConstraint.ALL, this.allocationTimeout));
        }
        FutureUtils.ConjunctFuture combineAll = FutureUtils.combineAll(arrayList);
        return combineAll.thenAccept(collection -> {
            Iterator it2 = collection.iterator();
            while (it2.hasNext()) {
                Execution execution = (Execution) it2.next();
                try {
                    execution.deploy();
                } catch (Throwable th) {
                    throw new CompletionException(new FlinkException(String.format("Could not deploy execution %s.", execution), th));
                }
            }
        }).exceptionally(th -> {
            Throwable th;
            Throwable stripCompletionException = ExceptionUtils.stripCompletionException(th);
            if (stripCompletionException instanceof TimeoutException) {
                th = new NoResourceAvailableException("Could not allocate all requires slots within timeout of " + time + ". Slots required: " + combineAll.getNumFuturesTotal() + ", slots allocated: " + combineAll.getNumFuturesCompleted());
            } else {
                th = stripCompletionException;
            }
            throw new CompletionException(th);
        });
    }

    public void cancel() {
        while (true) {
            JobStatus jobStatus = this.state;
            if (jobStatus == JobStatus.RUNNING || jobStatus == JobStatus.CREATED) {
                if (transitionState(jobStatus, JobStatus.CANCELLING)) {
                    long incrementGlobalModVersion = incrementGlobalModVersion();
                    CompletableFuture<Void> completableFuture = this.schedulingFuture;
                    if (completableFuture != null) {
                        completableFuture.cancel(false);
                    }
                    ArrayList arrayList = new ArrayList(this.verticesInCreationOrder.size());
                    Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
                    while (it.hasNext()) {
                        arrayList.add(it.next().cancelWithFuture());
                    }
                    FutureUtils.waitForAll(arrayList).whenComplete((r11, th) -> {
                        if (th != null) {
                            transitionState(JobStatus.CANCELLING, JobStatus.FAILED, new FlinkException("Could not cancel job " + getJobName() + " because not all execution job vertices could be cancelled.", th));
                        } else {
                            allVerticesInTerminalState(incrementGlobalModVersion);
                        }
                    });
                    return;
                }
            } else if (jobStatus == JobStatus.FAILING) {
                if (transitionState(jobStatus, JobStatus.CANCELLING)) {
                    return;
                }
            } else {
                if (jobStatus != JobStatus.RESTARTING) {
                    return;
                }
                synchronized (this.progressLock) {
                    if (transitionState(jobStatus, JobStatus.CANCELED)) {
                        onTerminalState(JobStatus.CANCELED);
                        LOG.info("Canceled during restart.");
                        return;
                    }
                }
            }
        }
    }

    public void stop() throws StoppingException {
        if (!this.isStoppable) {
            throw new StoppingException("This job is not stoppable.");
        }
        for (ExecutionVertex executionVertex : getAllExecutionVertices()) {
            if (executionVertex.getNumberOfInputs() == 0) {
                executionVertex.stop();
            }
        }
    }

    public void suspend(Throwable th) {
        JobStatus jobStatus;
        do {
            jobStatus = this.state;
            if (jobStatus.isTerminalState() || jobStatus == JobStatus.SUSPENDING) {
                return;
            }
        } while (!transitionState(jobStatus, JobStatus.SUSPENDING, th));
        initFailureCause(th);
        incrementGlobalModVersion();
        CompletableFuture<Void> completableFuture = this.schedulingFuture;
        if (completableFuture != null) {
            completableFuture.cancel(false);
        }
        ArrayList arrayList = new ArrayList(this.verticesInCreationOrder.size());
        Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().cancelWithFuture());
        }
        FutureUtils.waitForAll(arrayList).whenComplete((r5, th2) -> {
            if (th2 != null) {
                LOG.debug("Flink could not properly clean up resource after suspension.", th2);
            }
            allVerticesInTerminalState(-1L);
            LOG.info("Job {} has been suspended.", getJobID());
        });
    }

    public void failGlobal(Throwable th) {
        JobStatus jobStatus;
        do {
            jobStatus = this.state;
            if (jobStatus == JobStatus.FAILING || jobStatus == JobStatus.SUSPENDING || jobStatus == JobStatus.SUSPENDED || jobStatus.isGloballyTerminalState()) {
                return;
            }
        } while (!transitionState(jobStatus, JobStatus.FAILING, th));
        initFailureCause(th);
        long incrementGlobalModVersion = incrementGlobalModVersion();
        CompletableFuture<Void> completableFuture = this.schedulingFuture;
        if (completableFuture != null) {
            completableFuture.cancel(false);
        }
        ArrayList arrayList = new ArrayList(this.verticesInCreationOrder.size());
        Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().cancelWithFuture());
        }
        FutureUtils.waitForAll(arrayList).whenComplete((r11, th2) -> {
            if (th2 != null) {
                transitionState(JobStatus.FAILING, JobStatus.FAILED, new FlinkException("Could not cancel all execution job vertices properly.", th2));
            } else {
                allVerticesInTerminalState(incrementGlobalModVersion);
            }
        });
    }

    public void restart(long j) {
        try {
            synchronized (this.progressLock) {
                if (this.globalModVersion != j) {
                    LOG.info("Concurrent full restart subsumed this restart.");
                    return;
                }
                JobStatus jobStatus = this.state;
                if (jobStatus == JobStatus.CANCELED) {
                    LOG.info("Canceled job during restart. Aborting restart.");
                    return;
                }
                if (jobStatus == JobStatus.FAILED) {
                    LOG.info("Failed job during restart. Aborting restart.");
                    return;
                }
                if (jobStatus == JobStatus.SUSPENDING || jobStatus == JobStatus.SUSPENDED) {
                    LOG.info("Suspended job during restart. Aborting restart.");
                    return;
                }
                if (jobStatus != JobStatus.RESTARTING) {
                    throw new IllegalStateException("Can only restart job from state restarting.");
                }
                this.currentExecutions.clear();
                HashSet hashSet = new HashSet();
                long currentTimeMillis = System.currentTimeMillis();
                for (ExecutionJobVertex executionJobVertex : this.verticesInCreationOrder) {
                    CoLocationGroup coLocationGroup = executionJobVertex.getCoLocationGroup();
                    if (coLocationGroup != null && !hashSet.contains(coLocationGroup)) {
                        coLocationGroup.resetConstraints();
                        hashSet.add(coLocationGroup);
                    }
                    executionJobVertex.resetForNewExecution(currentTimeMillis, j);
                }
                for (int i = 0; i < this.stateTimestamps.length; i++) {
                    if (i != JobStatus.RESTARTING.ordinal()) {
                        this.stateTimestamps[i] = 0;
                    }
                }
                transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
                if (this.checkpointCoordinator != null) {
                    this.checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
                }
                scheduleForExecution();
            }
        } catch (Throwable th) {
            LOG.warn("Failed to restart the job.", th);
            failGlobal(th);
        }
    }

    public void restoreLatestCheckpointedState(boolean z, boolean z2) throws Exception {
        synchronized (this.progressLock) {
            if (this.checkpointCoordinator != null) {
                this.checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), z, z2);
            }
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public ArchivedExecutionConfig getArchivedExecutionConfig() {
        try {
            ExecutionConfig deserializeValue = this.jobInformation.getSerializedExecutionConfig().deserializeValue(this.userClassLoader);
            if (deserializeValue != null) {
                return deserializeValue.archive();
            }
            return null;
        } catch (IOException | ClassNotFoundException e) {
            LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", getJobID(), e);
            return null;
        }
    }

    public CompletableFuture<JobStatus> getTerminationFuture() {
        return this.terminationFuture;
    }

    @VisibleForTesting
    public JobStatus waitUntilTerminal() throws InterruptedException {
        try {
            return this.terminationFuture.get();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public FailoverStrategy getFailoverStrategy() {
        return this.failoverStrategy;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getGlobalModVersion() {
        return this.globalModVersion;
    }

    private boolean transitionState(JobStatus jobStatus, JobStatus jobStatus2) {
        return transitionState(jobStatus, jobStatus2, null);
    }

    private boolean transitionState(JobStatus jobStatus, JobStatus jobStatus2, Throwable th) {
        if (jobStatus.isTerminalState()) {
            String str = "Job is trying to leave terminal state " + jobStatus;
            LOG.error(str);
            throw new IllegalStateException(str);
        }
        if (!STATE_UPDATER.compareAndSet(this, jobStatus, jobStatus2)) {
            return false;
        }
        LOG.info("Job {} ({}) switched from state {} to {}.", getJobName(), getJobID(), jobStatus, jobStatus2, th);
        this.stateTimestamps[jobStatus2.ordinal()] = System.currentTimeMillis();
        notifyJobStatusChange(jobStatus2, th);
        return true;
    }

    private long incrementGlobalModVersion() {
        return GLOBAL_VERSION_UPDATER.incrementAndGet(this);
    }

    private void initFailureCause(Throwable th) {
        this.failureCause = th;
        this.failureInfo = new ErrorInfo(th, System.currentTimeMillis());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void vertexFinished() {
        if (this.verticesFinished.incrementAndGet() == this.numVerticesTotal && this.state == JobStatus.RUNNING) {
            try {
                Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
                while (it.hasNext()) {
                    it.next().getJobVertex().finalizeOnMaster(getUserClassLoader());
                }
                if (transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) {
                    onTerminalState(JobStatus.FINISHED);
                }
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalError(th);
                failGlobal(new Exception("Failed to finalize execution on master", th));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void vertexUnFinished() {
        this.verticesFinished.getAndDecrement();
    }

    private void allVerticesInTerminalState(long j) {
        while (true) {
            JobStatus jobStatus = this.state;
            if (jobStatus == JobStatus.RUNNING) {
                failGlobal(new Exception("ExecutionGraph went into allVerticesInTerminalState() from RUNNING"));
            } else if (jobStatus == JobStatus.CANCELLING) {
                if (transitionState(jobStatus, JobStatus.CANCELED)) {
                    onTerminalState(JobStatus.CANCELED);
                    return;
                }
            } else if (jobStatus == JobStatus.FAILING) {
                if (tryRestartOrFail(j)) {
                    return;
                }
            } else {
                if (jobStatus != JobStatus.SUSPENDING) {
                    if (jobStatus.isGloballyTerminalState()) {
                        LOG.warn("Job has entered globally terminal state without waiting for all job vertices to reach final state.");
                        return;
                    } else {
                        failGlobal(new Exception("ExecutionGraph went into final state from state " + jobStatus));
                        return;
                    }
                }
                if (transitionState(jobStatus, JobStatus.SUSPENDED)) {
                    onTerminalState(JobStatus.SUSPENDED);
                    return;
                }
            }
        }
    }

    private boolean tryRestartOrFail(long j) {
        JobStatus jobStatus = this.state;
        if (jobStatus != JobStatus.FAILING && jobStatus != JobStatus.RESTARTING) {
            return false;
        }
        Throwable th = this.failureCause;
        synchronized (this.progressLock) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(), getJobID(), th);
            } else {
                LOG.info("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(), getJobID());
            }
            boolean z = !(th instanceof SuppressRestartsException);
            boolean canRestart = this.restartStrategy.canRestart();
            boolean z2 = z && canRestart;
            if (z2 && transitionState(jobStatus, JobStatus.RESTARTING)) {
                LOG.info("Restarting the job {} ({}).", getJobName(), getJobID());
                this.restartStrategy.restart(new ExecutionGraphRestartCallback(this, j), new ScheduledExecutorServiceAdapter(this.futureExecutor));
                return true;
            }
            if (z2 || !transitionState(jobStatus, JobStatus.FAILED, th)) {
                return false;
            }
            LOG.info("Could not restart the job {} ({}) because {}.", getJobName(), getJobID(), StringUtils.concatenateWithAnd(z ? null : "a type of SuppressRestartsException was thrown", canRestart ? null : "the restart strategy prevented it"), th);
            onTerminalState(JobStatus.FAILED);
            return true;
        }
    }

    private void onTerminalState(JobStatus jobStatus) {
        try {
            CheckpointCoordinator checkpointCoordinator = this.checkpointCoordinator;
            this.checkpointCoordinator = null;
            if (checkpointCoordinator != null) {
                checkpointCoordinator.shutdown(jobStatus);
            }
        } catch (Exception e) {
            LOG.error("Error while cleaning up after execution", (Throwable) e);
        } finally {
            this.terminationFuture.complete(jobStatus);
        }
    }

    public boolean updateState(TaskExecutionState taskExecutionState) {
        Execution execution = this.currentExecutions.get(taskExecutionState.getID());
        if (execution == null) {
            return false;
        }
        try {
            switch (taskExecutionState.getExecutionState()) {
                case RUNNING:
                    return execution.switchToRunning();
                case FINISHED:
                    execution.markFinished(deserializeAccumulators(taskExecutionState), taskExecutionState.getIOMetrics());
                    return true;
                case CANCELED:
                    execution.cancelingComplete(deserializeAccumulators(taskExecutionState), taskExecutionState.getIOMetrics());
                    return true;
                case FAILED:
                    execution.markFailed(taskExecutionState.getError(this.userClassLoader), deserializeAccumulators(taskExecutionState), taskExecutionState.getIOMetrics());
                    return true;
                default:
                    execution.fail(new Exception("TaskManager sent illegal state update: " + taskExecutionState.getExecutionState()));
                    return false;
            }
        } catch (Throwable th) {
            ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
            failGlobal(th);
            return false;
        }
    }

    private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionState taskExecutionState) {
        AccumulatorSnapshot accumulators = taskExecutionState.getAccumulators();
        if (accumulators == null) {
            return null;
        }
        try {
            return accumulators.deserializeUserAccumulators(this.userClassLoader);
        } catch (Throwable th) {
            LOG.error("Failed to deserialize final accumulator results.", th);
            return null;
        }
    }

    public void scheduleOrUpdateConsumers(ResultPartitionID resultPartitionID) throws ExecutionGraphException {
        Execution execution = this.currentExecutions.get(resultPartitionID.getProducerId());
        if (execution == null) {
            throw new ExecutionGraphException("Cannot find execution for execution Id " + resultPartitionID.getPartitionId() + '.');
        }
        if (execution.getVertex() == null) {
            throw new ExecutionGraphException("Execution with execution Id " + resultPartitionID.getPartitionId() + " has no vertex assigned.");
        }
        execution.getVertex().scheduleOrUpdateConsumers(resultPartitionID);
    }

    public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
        return Collections.unmodifiableMap(this.currentExecutions);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerExecution(Execution execution) {
        if (this.currentExecutions.putIfAbsent(execution.getAttemptId(), execution) != null) {
            failGlobal(new Exception("Trying to register execution " + execution + " for already used ID " + execution.getAttemptId()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deregisterExecution(Execution execution) {
        Execution remove = this.currentExecutions.remove(execution.getAttemptId());
        if (remove == null || remove == execution) {
            return;
        }
        failGlobal(new Exception("De-registering execution " + execution + " failed. Found for same ID execution " + remove));
    }

    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        try {
            Map<String, Accumulator<?, ?>> deserializeUserAccumulators = accumulatorSnapshot.deserializeUserAccumulators(this.userClassLoader);
            ExecutionAttemptID executionAttemptID = accumulatorSnapshot.getExecutionAttemptID();
            Execution execution = this.currentExecutions.get(executionAttemptID);
            if (execution != null) {
                execution.setAccumulators(deserializeUserAccumulators);
            } else {
                LOG.debug("Received accumulator result for unknown execution {}.", executionAttemptID);
            }
        } catch (Exception e) {
            LOG.error("Cannot update accumulators for job {}.", getJobID(), e);
        }
    }

    public void registerJobStatusListener(JobStatusListener jobStatusListener) {
        if (jobStatusListener != null) {
            this.jobStatusListeners.add(jobStatusListener);
        }
    }

    public void registerExecutionListener(ExecutionStatusListener executionStatusListener) {
        if (executionStatusListener != null) {
            this.executionListeners.add(executionStatusListener);
        }
    }

    private void notifyJobStatusChange(JobStatus jobStatus, Throwable th) {
        if (this.jobStatusListeners.size() > 0) {
            long currentTimeMillis = System.currentTimeMillis();
            SerializedThrowable serializedThrowable = th == null ? null : new SerializedThrowable(th);
            Iterator<JobStatusListener> it = this.jobStatusListeners.iterator();
            while (it.hasNext()) {
                try {
                    it.next().jobStatusChanges(getJobID(), jobStatus, currentTimeMillis, serializedThrowable);
                } catch (Throwable th2) {
                    LOG.warn("Error while notifying JobStatusListener", th2);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyExecutionChange(Execution execution, ExecutionState executionState, Throwable th) {
        if (this.executionListeners.size() > 0) {
            ExecutionJobVertex jobVertex = execution.getVertex().getJobVertex();
            String stringifyException = th == null ? null : ExceptionUtils.stringifyException(th);
            long currentTimeMillis = System.currentTimeMillis();
            Iterator<ExecutionStatusListener> it = this.executionListeners.iterator();
            while (it.hasNext()) {
                try {
                    it.next().executionStatusChanged(getJobID(), jobVertex.getJobVertexId(), jobVertex.getJobVertex().getName(), jobVertex.getParallelism(), execution.getParallelSubtaskIndex(), execution.getAttemptId(), executionState, currentTimeMillis, stringifyException);
                } catch (Throwable th2) {
                    LOG.warn("Error while notifying ExecutionStatusListener", th2);
                }
            }
        }
        if (executionState == ExecutionState.FAILED) {
            Throwable flinkException = th != null ? th : new FlinkException("Unknown Error (missing cause)");
            execution.getStateTimestamp(ExecutionState.FAILED);
            if (execution.getGlobalModVersion() == this.globalModVersion) {
                try {
                    if (this.checkpointCoordinator != null) {
                        this.checkpointCoordinator.failUnacknowledgedPendingCheckpointsFor(execution.getAttemptId(), flinkException);
                    }
                    this.failoverStrategy.onTaskFailure(execution, flinkException);
                } catch (Throwable th3) {
                    LOG.warn("Error in failover strategy - falling back to global restart", th3);
                    failGlobal(flinkException);
                }
            }
        }
    }
}
