package org.apache.flink.yarn;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.YarnMessages;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/yarn/YarnClusterClient.class */
public class YarnClusterClient extends ClusterClient {
    private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClient.class);
    private static final int POLLING_THREAD_INTERVAL_MS = 1000;
    private YarnClient yarnClient;
    private Thread clientShutdownHook;
    private PollingThread pollingRunner;
    private final AbstractYarnClusterDescriptor clusterDescriptor;
    private final LazApplicationClientLoader applicationClient;
    private final FiniteDuration akkaDuration;
    private final ApplicationReport appReport;
    private final ApplicationId appId;
    private final String trackingURL;
    private boolean isConnected;
    private final boolean newlyCreatedCluster;
    private AtomicBoolean hasBeenShutDown;

    /* loaded from: input_file:org/apache/flink/yarn/YarnClusterClient$ClientShutdownHook.class */
    private class ClientShutdownHook extends Thread {
        private ClientShutdownHook() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            YarnClusterClient.LOG.info("Shutting down YarnClusterClient from the client shutdown hook");
            try {
                YarnClusterClient.this.shutdown();
            } catch (Throwable th) {
                YarnClusterClient.LOG.warn("Could not properly shut down the yarn cluster client.", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/yarn/YarnClusterClient$LazApplicationClientLoader.class */
    public static class LazApplicationClientLoader {
        private final Configuration flinkConfig;
        private final ClusterClient.LazyActorSystemLoader actorSystemLoader;
        private final HighAvailabilityServices highAvailabilityServices;
        private ActorRef applicationClient;

        private LazApplicationClientLoader(Configuration configuration, ClusterClient.LazyActorSystemLoader lazyActorSystemLoader, HighAvailabilityServices highAvailabilityServices) {
            this.flinkConfig = (Configuration) Preconditions.checkNotNull(configuration, "flinkConfig");
            this.actorSystemLoader = (ClusterClient.LazyActorSystemLoader) Preconditions.checkNotNull(lazyActorSystemLoader, "actorSystemLoader");
            this.highAvailabilityServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices, "highAvailabilityServices");
        }

        public ActorRef get() throws FlinkException {
            if (this.applicationClient == null) {
                YarnClusterClient.LOG.info("Start application client.");
                try {
                    try {
                        this.applicationClient = this.actorSystemLoader.get().actorOf(Props.create(ApplicationClient.class, new Object[]{this.flinkConfig, this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID)}), "applicationClient");
                    } catch (Exception e) {
                        throw new FlinkException("Could not start the ApplicationClient.", e);
                    }
                } catch (FlinkException e2) {
                    throw new FlinkException("Could not start the ClusterClient's ActorSystem.", e2);
                }
            }
            return this.applicationClient;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/yarn/YarnClusterClient$PollingThread.class */
    public static class PollingThread extends Thread {
        private YarnClient yarnClient;
        private ApplicationId appId;
        private ApplicationReport lastReport;
        AtomicBoolean running = new AtomicBoolean(true);
        private final Object lock = new Object();

        public PollingThread(YarnClient yarnClient, ApplicationId applicationId) {
            this.yarnClient = yarnClient;
            this.appId = applicationId;
        }

        public void stopRunner() {
            if (!this.running.get()) {
                YarnClusterClient.LOG.warn("Polling thread was already stopped");
            }
            this.running.set(false);
        }

        public ApplicationReport getLastReport() {
            ApplicationReport applicationReport;
            synchronized (this.lock) {
                applicationReport = this.lastReport;
            }
            return applicationReport;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.running.get() && this.yarnClient.isInState(Service.STATE.STARTED)) {
                try {
                    ApplicationReport applicationReport = this.yarnClient.getApplicationReport(this.appId);
                    synchronized (this.lock) {
                        this.lastReport = applicationReport;
                    }
                } catch (Exception e) {
                    YarnClusterClient.LOG.warn("Error while getting application report", e);
                }
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e2) {
                    YarnClusterClient.LOG.error("Polling thread got interrupted", e2);
                    Thread.currentThread().interrupt();
                    stopRunner();
                }
            }
            if (!this.running.get() || this.yarnClient.isInState(Service.STATE.STARTED)) {
                return;
            }
            YarnClusterClient.LOG.warn("YARN client is unexpected in state " + this.yarnClient.getServiceState());
        }
    }

    public YarnClusterClient(AbstractYarnClusterDescriptor abstractYarnClusterDescriptor, YarnClient yarnClient, ApplicationReport applicationReport, Configuration configuration, boolean z) throws Exception {
        super(configuration);
        this.clientShutdownHook = new ClientShutdownHook();
        this.isConnected = true;
        this.hasBeenShutDown = new AtomicBoolean(false);
        this.akkaDuration = AkkaUtils.getTimeout(configuration);
        this.clusterDescriptor = abstractYarnClusterDescriptor;
        this.yarnClient = yarnClient;
        this.appReport = applicationReport;
        this.appId = applicationReport.getApplicationId();
        this.trackingURL = applicationReport.getTrackingUrl();
        this.newlyCreatedCluster = z;
        this.applicationClient = new LazApplicationClientLoader(configuration, this.actorSystemLoader, this.highAvailabilityServices);
        this.pollingRunner = new PollingThread(yarnClient, this.appId);
        this.pollingRunner.setDaemon(true);
        this.pollingRunner.start();
        Runtime.getRuntime().addShutdownHook(this.clientShutdownHook);
    }

    public void disconnect() {
        if (this.hasBeenShutDown.getAndSet(true)) {
            return;
        }
        if (!this.isConnected) {
            throw new IllegalStateException("Can not disconnect from an unconnected cluster.");
        }
        LOG.info("Disconnecting YarnClusterClient from ApplicationMaster");
        try {
            Runtime.getRuntime().removeShutdownHook(this.clientShutdownHook);
        } catch (IllegalStateException e) {
        }
        try {
            this.pollingRunner.stopRunner();
            this.pollingRunner.join(1000L);
        } catch (InterruptedException e2) {
            LOG.warn("Shutdown of the polling runner was interrupted", e2);
            Thread.currentThread().interrupt();
        }
        this.isConnected = false;
    }

    private void stopAfterJob(JobID jobID) {
        Preconditions.checkNotNull(jobID, "The job id must not be null");
        try {
            Await.ready(getJobManagerGateway().ask(new ShutdownClusterAfterJob(jobID), this.akkaDuration), this.akkaDuration);
        } catch (Exception e) {
            throw new RuntimeException("Unable to tell application master to stop once the specified job has been finised", e);
        }
    }

    public Configuration getFlinkConfiguration() {
        return this.flinkConfig;
    }

    public int getMaxSlots() {
        int taskManagerCount = this.clusterDescriptor.getTaskManagerCount() * this.clusterDescriptor.getTaskManagerSlots();
        if (taskManagerCount > 0) {
            return taskManagerCount;
        }
        return -1;
    }

    public boolean hasUserJarsInClassPath(List<URL> list) {
        return this.clusterDescriptor.hasUserJarFiles(list);
    }

    protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        if (!isDetached()) {
            return super.run(jobGraph, classLoader);
        }
        if (this.newlyCreatedCluster) {
            stopAfterJob(jobGraph.getJobID());
        }
        return super.runDetached(jobGraph, classLoader);
    }

    public String getWebInterfaceURL() {
        return !this.trackingURL.startsWith("http://") ? "http://" + this.trackingURL : this.trackingURL;
    }

    public String getClusterIdentifier() {
        return "Yarn cluster with application id " + this.appReport.getApplicationId();
    }

    public GetClusterStatusResponse getClusterStatus() {
        if (!this.isConnected) {
            throw new IllegalStateException("The cluster is not connected to the cluster.");
        }
        if (hasBeenShutdown()) {
            throw new IllegalStateException("The cluster has already been shutdown.");
        }
        try {
            return (GetClusterStatusResponse) Await.result(getJobManagerGateway().ask(GetClusterStatus.getInstance(), this.akkaDuration), this.akkaDuration);
        } catch (Exception e) {
            throw new RuntimeException("Unable to get ClusterClient status from Application Client", e);
        }
    }

    public ApplicationStatus getApplicationStatus() {
        if (!this.isConnected) {
            throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
        }
        ApplicationReport applicationReport = null;
        if (this.pollingRunner == null) {
            LOG.warn("YarnClusterClient.getApplicationStatus() has been called on an uninitialized cluster.The system might be in an erroneous state");
        } else {
            applicationReport = this.pollingRunner.getLastReport();
        }
        if (applicationReport == null) {
            LOG.warn("YarnClusterClient.getApplicationStatus() has been called on a cluster that didn't receive a status so far.The system might be in an erroneous state");
            return ApplicationStatus.UNKNOWN;
        }
        YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();
        ApplicationStatus applicationStatus = (yarnApplicationState == YarnApplicationState.FAILED || yarnApplicationState == YarnApplicationState.KILLED) ? ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
        if (applicationStatus != ApplicationStatus.SUCCEEDED) {
            LOG.warn("YARN reported application state {}", yarnApplicationState);
            LOG.warn("Diagnostics: {}", applicationReport.getDiagnostics());
        }
        return applicationStatus;
    }

    public List<String> getNewMessages() {
        ArrayList arrayList;
        if (hasBeenShutdown()) {
            throw new RuntimeException("The YarnClusterClient has already been stopped");
        }
        if (!this.isConnected) {
            throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
        }
        arrayList = new ArrayList();
        while (true) {
            try {
                Object result = Await.result(Patterns.ask(this.applicationClient.get(), YarnMessages.getLocalGetYarnMessage(), new Timeout(this.akkaDuration)), this.akkaDuration);
                if (!(result instanceof Option)) {
                    throw new RuntimeException("LocalGetYarnMessage requires a response of type Option. Instead the response is of type " + result.getClass() + ".");
                }
                Option option = (Option) result;
                LOG.debug("Received message option {}", option);
                if (option.isEmpty()) {
                    break;
                }
                Object obj = option.get();
                if (obj instanceof InfoMessage) {
                    InfoMessage infoMessage = (InfoMessage) obj;
                    arrayList.add("[" + infoMessage.date() + "] " + infoMessage.message());
                } else {
                    LOG.warn("LocalGetYarnMessage returned unexpected type: " + option);
                }
            } catch (Exception e) {
                LOG.warn("Error retrieving the YARN messages locally", e);
            }
        }
        return arrayList;
    }

    public void finalizeCluster() {
        if (isDetached() || !this.newlyCreatedCluster) {
            disconnect();
        } else {
            shutdownCluster();
        }
    }

    public void shutdownCluster() {
        if (this.hasBeenShutDown.getAndSet(true)) {
            return;
        }
        if (!this.isConnected) {
            throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
        }
        try {
            Runtime.getRuntime().removeShutdownHook(this.clientShutdownHook);
        } catch (IllegalStateException e) {
        }
        LOG.info("Sending shutdown request to the Application Master");
        try {
            Await.ready(Patterns.ask(this.applicationClient.get(), new YarnMessages.LocalStopYarnSession(getApplicationStatus(), "Flink YARN Client requested shutdown"), new Timeout(this.akkaDuration)), this.akkaDuration);
        } catch (Exception e2) {
            LOG.warn("Error while stopping YARN cluster.", e2);
        }
        try {
            File yarnPropertiesLocation = FlinkYarnSessionCli.getYarnPropertiesLocation(this.flinkConfig);
            if (yarnPropertiesLocation.isFile()) {
                if (yarnPropertiesLocation.delete()) {
                    LOG.info("Deleted Yarn properties file at {}", yarnPropertiesLocation.getAbsoluteFile().toString());
                } else {
                    LOG.warn("Couldn't delete Yarn properties file at {}", yarnPropertiesLocation.getAbsoluteFile().toString());
                }
            }
        } catch (Exception e3) {
            LOG.warn("Exception while deleting the JobManager address file", e3);
        }
        try {
            this.pollingRunner.stopRunner();
            this.pollingRunner.join(1000L);
        } catch (InterruptedException e4) {
            LOG.warn("Shutdown of the polling runner was interrupted", e4);
            Thread.currentThread().interrupt();
        }
        try {
            ApplicationReport applicationReport = this.yarnClient.getApplicationReport(this.appId);
            LOG.info("Application " + this.appId + " finished with state " + applicationReport.getYarnApplicationState() + " and final state " + applicationReport.getFinalApplicationStatus() + " at " + applicationReport.getFinishTime());
            if (applicationReport.getYarnApplicationState() == YarnApplicationState.FAILED || applicationReport.getYarnApplicationState() == YarnApplicationState.KILLED) {
                LOG.warn("Application failed. Diagnostics " + applicationReport.getDiagnostics());
                LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve the full application log using this command:" + System.lineSeparator() + "\tyarn logs -applicationId " + applicationReport.getApplicationId() + System.lineSeparator() + "(It sometimes takes a few seconds until the logs are aggregated)");
            }
        } catch (Exception e5) {
            LOG.warn("Couldn't get final report", e5);
        }
        LOG.info("YARN Client is shutting down");
        this.yarnClient.stop();
        this.yarnClient = null;
    }

    public boolean hasBeenShutdown() {
        return this.hasBeenShutDown.get();
    }

    public boolean isDetached() {
        return super.isDetached() || this.clusterDescriptor.isDetachedMode();
    }

    public void waitForClusterToBeReady() {
        logAndSysout("Waiting until all TaskManagers have connected");
        GetClusterStatusResponse getClusterStatusResponse = null;
        while (true) {
            GetClusterStatusResponse getClusterStatusResponse2 = getClusterStatusResponse;
            GetClusterStatusResponse clusterStatus = getClusterStatus();
            if (clusterStatus != null && !clusterStatus.equals(getClusterStatusResponse2)) {
                logAndSysout("TaskManager status (" + clusterStatus.numRegisteredTaskManagers() + "/" + this.clusterDescriptor.getTaskManagerCount() + ")");
                if (clusterStatus.numRegisteredTaskManagers() >= this.clusterDescriptor.getTaskManagerCount()) {
                    logAndSysout("All TaskManagers are connected");
                    return;
                }
            } else if (getClusterStatusResponse2 == null) {
                logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
            }
            try {
                Thread.sleep(250L);
                getClusterStatusResponse = clusterStatus;
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting for TaskManagers", e);
            }
        }
    }

    public ApplicationId getApplicationId() {
        return this.appId;
    }
}
