package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.class */
public class BackPressureStatsTrackerImpl implements BackPressureStatsTracker {
    private static final Logger LOG;
    static final int MAX_STACK_TRACE_DEPTH = 3;
    static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
    static final String EXPECTED_METHOD_NAME = "requestBufferBuilderBlocking";
    private final StackTraceSampleCoordinator coordinator;
    private final Cache<ExecutionJobVertex, OperatorBackPressureStats> operatorStatsCache;
    private final int cleanUpInterval;
    private final int numSamples;
    private final int backPressureStatsRefreshInterval;
    private final Time delayBetweenSamples;
    private boolean shutDown;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object lock = new Object();
    private final Set<ExecutionJobVertex> pendingStats = new HashSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl$StackTraceSampleCompletionCallback.class */
    public class StackTraceSampleCompletionCallback implements BiFunction<StackTraceSample, Throwable, Void> {
        private final ExecutionJobVertex vertex;

        public StackTraceSampleCompletionCallback(ExecutionJobVertex executionJobVertex) {
            this.vertex = executionJobVertex;
        }

        @Override // java.util.function.BiFunction
        public Void apply(StackTraceSample stackTraceSample, Throwable th) {
            synchronized (BackPressureStatsTrackerImpl.this.lock) {
                try {
                    try {
                    } catch (Throwable th2) {
                        BackPressureStatsTrackerImpl.LOG.error("Error during stats completion.", th2);
                        BackPressureStatsTrackerImpl.this.pendingStats.remove(this.vertex);
                    }
                    if (BackPressureStatsTrackerImpl.this.shutDown) {
                        return null;
                    }
                    JobStatus state = this.vertex.getGraph().getState();
                    if (state.isGloballyTerminalState()) {
                        BackPressureStatsTrackerImpl.LOG.debug("Ignoring sample, because job is in state " + state + ".");
                    } else if (stackTraceSample != null) {
                        BackPressureStatsTrackerImpl.this.operatorStatsCache.put(this.vertex, createStatsFromSample(stackTraceSample));
                    } else {
                        BackPressureStatsTrackerImpl.LOG.debug("Failed to gather stack trace sample.", th);
                    }
                    BackPressureStatsTrackerImpl.this.pendingStats.remove(this.vertex);
                    return null;
                } finally {
                    BackPressureStatsTrackerImpl.this.pendingStats.remove(this.vertex);
                }
            }
        }

        private OperatorBackPressureStats createStatsFromSample(StackTraceSample stackTraceSample) {
            Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTraces = stackTraceSample.getStackTraces();
            HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(stackTraces.size());
            Set<ExecutionAttemptID> keySet = stackTraceSample.getStackTraces().keySet();
            for (ExecutionVertex executionVertex : this.vertex.getTaskVertices()) {
                ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
                if (keySet.contains(attemptId)) {
                    newHashMapWithExpectedSize.put(attemptId, Integer.valueOf(executionVertex.getParallelSubtaskIndex()));
                } else {
                    BackPressureStatsTrackerImpl.LOG.debug("Outdated sample. A task, which is part of the sample has been reset.");
                }
            }
            double[] dArr = new double[stackTraces.size()];
            for (Map.Entry<ExecutionAttemptID, List<StackTraceElement[]>> entry : stackTraces.entrySet()) {
                int i = 0;
                List<StackTraceElement[]> value = entry.getValue();
                for (StackTraceElement[] stackTraceElementArr : value) {
                    int length = stackTraceElementArr.length - 1;
                    while (true) {
                        if (length >= 0) {
                            StackTraceElement stackTraceElement = stackTraceElementArr[length];
                            if (stackTraceElement.getClassName().equals(BackPressureStatsTrackerImpl.EXPECTED_CLASS_NAME) && stackTraceElement.getMethodName().equals(BackPressureStatsTrackerImpl.EXPECTED_METHOD_NAME)) {
                                i++;
                                break;
                            }
                            length--;
                        }
                    }
                }
                int intValue = ((Integer) newHashMapWithExpectedSize.get(entry.getKey())).intValue();
                int size = value.size();
                dArr[intValue] = size > 0 ? i / size : CMAESOptimizer.DEFAULT_STOPFITNESS;
            }
            return new OperatorBackPressureStats(stackTraceSample.getSampleId(), stackTraceSample.getEndTime(), dArr);
        }
    }

    public BackPressureStatsTrackerImpl(StackTraceSampleCoordinator stackTraceSampleCoordinator, int i, int i2, int i3, Time time) {
        this.coordinator = (StackTraceSampleCoordinator) Preconditions.checkNotNull(stackTraceSampleCoordinator, "Stack trace sample coordinator");
        Preconditions.checkArgument(i >= 0, "Clean up interval");
        this.cleanUpInterval = i;
        Preconditions.checkArgument(i2 >= 1, "Number of samples");
        this.numSamples = i2;
        Preconditions.checkArgument(i3 >= 0, "backPressureStatsRefreshInterval must be greater than or equal to 0");
        this.backPressureStatsRefreshInterval = i3;
        this.delayBetweenSamples = (Time) Preconditions.checkNotNull(time, "Delay between samples");
        this.operatorStatsCache = CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterAccess(i, TimeUnit.MILLISECONDS).build();
    }

    public long getCleanUpInterval() {
        return this.cleanUpInterval;
    }

    @Override // org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker
    public Optional<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex executionJobVertex) {
        Optional<OperatorBackPressureStats> ofNullable;
        synchronized (this.lock) {
            OperatorBackPressureStats ifPresent = this.operatorStatsCache.getIfPresent(executionJobVertex);
            if (ifPresent == null || this.backPressureStatsRefreshInterval <= System.currentTimeMillis() - ifPresent.getEndTimestamp()) {
                triggerStackTraceSampleInternal(executionJobVertex);
            }
            ofNullable = Optional.ofNullable(ifPresent);
        }
        return ofNullable;
    }

    private boolean triggerStackTraceSampleInternal(ExecutionJobVertex executionJobVertex) {
        Executor futureExecutor;
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        if (this.shutDown || this.pendingStats.contains(executionJobVertex) || executionJobVertex.getGraph().getState().isGloballyTerminalState() || (futureExecutor = executionJobVertex.getGraph().getFutureExecutor()) == null) {
            return false;
        }
        this.pendingStats.add(executionJobVertex);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(executionJobVertex.getTaskVertices()));
        }
        this.coordinator.triggerStackTraceSample(executionJobVertex.getTaskVertices(), this.numSamples, this.delayBetweenSamples, 3).handleAsync((BiFunction<? super StackTraceSample, Throwable, ? extends U>) new StackTraceSampleCompletionCallback(executionJobVertex), futureExecutor);
        return true;
    }

    @Deprecated
    public boolean triggerStackTraceSample(ExecutionJobVertex executionJobVertex) {
        boolean triggerStackTraceSampleInternal;
        synchronized (this.lock) {
            triggerStackTraceSampleInternal = triggerStackTraceSampleInternal(executionJobVertex);
        }
        return triggerStackTraceSampleInternal;
    }

    @Override // org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker
    public void cleanUpOperatorStatsCache() {
        this.operatorStatsCache.cleanUp();
    }

    @Override // org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker
    public void shutDown() {
        synchronized (this.lock) {
            if (!this.shutDown) {
                this.operatorStatsCache.invalidateAll();
                this.pendingStats.clear();
                this.shutDown = true;
            }
        }
    }

    void invalidateOperatorStatsCache() {
        this.operatorStatsCache.invalidateAll();
    }

    static {
        $assertionsDisabled = !BackPressureStatsTrackerImpl.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) BackPressureStatsTrackerImpl.class);
    }
}
