package org.apache.beam.runners.flink;

import java.io.IOException;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.runners.flink.FlinkJobServerDriver;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarCreator;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkJobInvoker.class */
public class FlinkJobInvoker extends JobInvoker {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class);
    private final FlinkJobServerDriver.FlinkServerConfiguration serverConfig;

    public static FlinkJobInvoker create(FlinkJobServerDriver.FlinkServerConfiguration flinkServerConfiguration) {
        return new FlinkJobInvoker(flinkServerConfiguration);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration flinkServerConfiguration) {
        super("flink-runner-job-invoker");
        this.serverConfig = flinkServerConfiguration;
    }

    protected JobInvocation invokeWithExecutor(RunnerApi.Pipeline pipeline, Struct struct, @Nullable String str, ListeningExecutorService listeningExecutorService) throws IOException {
        LOG.trace("Parsing pipeline options");
        FlinkPipelineOptions flinkPipelineOptions = (FlinkPipelineOptions) PipelineOptionsTranslation.fromProto(struct).as(FlinkPipelineOptions.class);
        String format = String.format("%s_%s", flinkPipelineOptions.getJobName(), UUID.randomUUID().toString());
        if (FlinkPipelineOptions.AUTO.equals(flinkPipelineOptions.getFlinkMaster())) {
            flinkPipelineOptions.setFlinkMaster(this.serverConfig.getFlinkMasterUrl());
        }
        PortablePipelineOptions as = flinkPipelineOptions.as(PortablePipelineOptions.class);
        PortablePipelineJarCreator flinkPipelineRunner = (as.getOutputExecutablePath() == null || as.getOutputExecutablePath().isEmpty()) ? new FlinkPipelineRunner(flinkPipelineOptions, this.serverConfig.getFlinkConfDir(), PipelineResources.detectClassPathResourcesToStage(FlinkJobInvoker.class.getClassLoader())) : new PortablePipelineJarCreator(FlinkPipelineRunner.class);
        flinkPipelineOptions.setRunner(null);
        LOG.info("Invoking job {} with pipeline runner {}", format, flinkPipelineRunner);
        return createJobInvocation(format, str, listeningExecutorService, pipeline, flinkPipelineOptions, flinkPipelineRunner);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JobInvocation createJobInvocation(String str, String str2, ListeningExecutorService listeningExecutorService, RunnerApi.Pipeline pipeline, FlinkPipelineOptions flinkPipelineOptions, PortablePipelineRunner portablePipelineRunner) {
        return new JobInvocation(JobInfo.create(str, flinkPipelineOptions.getJobName(), str2, PipelineOptionsTranslation.toProto(flinkPipelineOptions)), listeningExecutorService, pipeline, portablePipelineRunner);
    }
}
