package org.apache.flink.runtime.rest.handler.job;

import java.io.File;
import java.io.ObjectInputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.FlinkException;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.class */
public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
    private static final String FILE_TYPE_JOB_GRAPH = "JobGraph";
    private static final String FILE_TYPE_JAR = "Jar";
    private static final String FILE_TYPE_ARTIFACT = "Artifact";
    private final Executor executor;
    private final Configuration configuration;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/job/JobSubmitHandler$MissingFileException.class */
    public static final class MissingFileException extends RestHandlerException {
        private static final long serialVersionUID = -7954810495610194965L;

        MissingFileException(String str, String str2) {
            super(str + " file " + str2 + " could not be found on the server.", HttpResponseStatus.BAD_REQUEST);
        }
    }

    public JobSubmitHandler(CompletableFuture<String> completableFuture, GatewayRetriever<? extends DispatcherGateway> gatewayRetriever, Time time, Map<String, String> map, Executor executor, Configuration configuration) {
        super(completableFuture, gatewayRetriever, time, map, JobSubmitHeaders.getInstance());
        this.executor = executor;
        this.configuration = configuration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.rest.handler.AbstractRestHandler
    public CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> handlerRequest, @Nonnull DispatcherGateway dispatcherGateway) throws RestHandlerException {
        Collection<File> uploadedFiles = handlerRequest.getUploadedFiles();
        Map<String, Path> map = (Map) uploadedFiles.stream().collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, Path::fromLocalFile));
        if (uploadedFiles.size() != map.size()) {
            Object[] objArr = new Object[3];
            objArr[0] = uploadedFiles.size() < map.size() ? "lower" : "higher";
            objArr[1] = Integer.valueOf(map.size());
            objArr[2] = Integer.valueOf(uploadedFiles.size());
            throw new RestHandlerException(String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s", objArr), HttpResponseStatus.BAD_REQUEST);
        }
        JobSubmitRequestBody requestBody = handlerRequest.getRequestBody();
        if (requestBody.jobGraphFileName == null) {
            throw new RestHandlerException(String.format("The %s field must not be omitted or be null.", JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH), HttpResponseStatus.BAD_REQUEST);
        }
        CompletableFuture<JobGraph> loadJobGraph = loadJobGraph(requestBody, map);
        return uploadJobGraphFiles(dispatcherGateway, loadJobGraph, getJarFilesToUpload(requestBody.jarFileNames, map), getArtifactFilesToUpload(requestBody.artifactFileNames, map), this.configuration).thenCompose(jobGraph -> {
            return dispatcherGateway.submitJob(jobGraph, this.timeout);
        }).thenCombine((CompletionStage) loadJobGraph, (BiFunction<? super U, ? super U, ? extends V>) (acknowledge, jobGraph2) -> {
            return new JobSubmitResponseBody("/jobs/" + jobGraph2.getJobID());
        });
    }

    private CompletableFuture<JobGraph> loadJobGraph(JobSubmitRequestBody jobSubmitRequestBody, Map<String, Path> map) throws MissingFileException {
        Path pathAndAssertUpload = getPathAndAssertUpload(jobSubmitRequestBody.jobGraphFileName, FILE_TYPE_JOB_GRAPH, map);
        return CompletableFuture.supplyAsync(() -> {
            try {
                ObjectInputStream objectInputStream = new ObjectInputStream(pathAndAssertUpload.getFileSystem().open(pathAndAssertUpload));
                Throwable th = null;
                try {
                    try {
                        JobGraph jobGraph = (JobGraph) objectInputStream.readObject();
                        if (objectInputStream != null) {
                            if (0 != 0) {
                                try {
                                    objectInputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                objectInputStream.close();
                            }
                        }
                        return jobGraph;
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new CompletionException(new RestHandlerException("Failed to deserialize JobGraph.", HttpResponseStatus.BAD_REQUEST, e));
            }
        }, this.executor);
    }

    private static Collection<Path> getJarFilesToUpload(Collection<String> collection, Map<String, Path> map) throws MissingFileException {
        ArrayList arrayList = new ArrayList(collection.size());
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(new Path(getPathAndAssertUpload(it.next(), FILE_TYPE_JAR, map).toString()));
        }
        return arrayList;
    }

    private static Collection<Tuple2<String, Path>> getArtifactFilesToUpload(Collection<JobSubmitRequestBody.DistributedCacheFile> collection, Map<String, Path> map) throws MissingFileException {
        ArrayList arrayList = new ArrayList(collection.size());
        for (JobSubmitRequestBody.DistributedCacheFile distributedCacheFile : collection) {
            arrayList.add(Tuple2.of(distributedCacheFile.entryName, new Path(getPathAndAssertUpload(distributedCacheFile.fileName, FILE_TYPE_ARTIFACT, map).toString())));
        }
        return arrayList;
    }

    private CompletableFuture<JobGraph> uploadJobGraphFiles(DispatcherGateway dispatcherGateway, CompletableFuture<JobGraph> completableFuture, Collection<Path> collection, Collection<Tuple2<String, Path>> collection2, Configuration configuration) {
        return completableFuture.thenCombine((CompletionStage) dispatcherGateway.getBlobServerPort(this.timeout), (jobGraph, num) -> {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(dispatcherGateway.getHostname(), num.intValue());
            try {
                ClientUtils.uploadJobGraphFiles(jobGraph, collection, collection2, () -> {
                    return new BlobClient(inetSocketAddress, configuration);
                });
                return jobGraph;
            } catch (FlinkException e) {
                throw new CompletionException(new RestHandlerException("Could not upload job files.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e));
            }
        });
    }

    private static Path getPathAndAssertUpload(String str, String str2, Map<String, Path> map) throws MissingFileException {
        Path path = map.get(str);
        if (path == null) {
            throw new MissingFileException(str2, str);
        }
        return path;
    }
}
