package org.apache.beam.runners.flink;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.program.DetachedEnvironment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkRunner.class */
public class FlinkRunner extends PipelineRunner<PipelineResult> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FlinkRunner.class);
    private final FlinkPipelineOptions options;
    Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders = new HashSet();

    public static FlinkRunner fromOptions(PipelineOptions pipelineOptions) {
        FlinkPipelineOptions flinkPipelineOptions = (FlinkPipelineOptions) PipelineOptionsValidator.validate(FlinkPipelineOptions.class, pipelineOptions);
        ArrayList arrayList = new ArrayList();
        if (flinkPipelineOptions.getAppName() == null) {
            arrayList.add("appName");
        }
        if (arrayList.size() > 0) {
            throw new IllegalArgumentException("Missing required values: " + Joiner.on(',').join(arrayList));
        }
        if (flinkPipelineOptions.getFilesToStage() == null) {
            flinkPipelineOptions.setFilesToStage(PipelineResources.detectClassPathResourcesToStage(FlinkRunner.class.getClassLoader()));
            LOG.info("PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage {} files. Enable logging at DEBUG level to see which files will be staged.", Integer.valueOf(flinkPipelineOptions.getFilesToStage().size()));
            LOG.debug("Classpath elements: {}", flinkPipelineOptions.getFilesToStage());
        }
        return new FlinkRunner(flinkPipelineOptions);
    }

    private FlinkRunner(FlinkPipelineOptions flinkPipelineOptions) {
        this.options = flinkPipelineOptions;
    }

    @Override // org.apache.beam.sdk.PipelineRunner
    public PipelineResult run(Pipeline pipeline) {
        logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
        MetricsEnvironment.setMetricsSupported(true);
        LOG.info("Executing pipeline using FlinkRunner.");
        FlinkPipelineExecutionEnvironment flinkPipelineExecutionEnvironment = new FlinkPipelineExecutionEnvironment(this.options);
        LOG.info("Translating pipeline to Flink program.");
        flinkPipelineExecutionEnvironment.translate(pipeline);
        try {
            LOG.info("Starting execution of Flink program.");
            return createPipelineResult(flinkPipelineExecutionEnvironment.executePipeline(), this.options);
        } catch (Exception e) {
            LOG.error("Pipeline execution failed", (Throwable) e);
            throw new RuntimeException("Pipeline execution failed", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static PipelineResult createPipelineResult(JobExecutionResult jobExecutionResult, PipelineOptions pipelineOptions) {
        if (jobExecutionResult instanceof DetachedEnvironment.DetachedJobExecutionResult) {
            LOG.info("Pipeline submitted in Detached mode");
            return new FlinkDetachedRunnerResult();
        }
        LOG.info("Execution finished in {} msecs", Long.valueOf(jobExecutionResult.getNetRuntime()));
        Map<String, Object> allAccumulatorResults = jobExecutionResult.getAllAccumulatorResults();
        if (allAccumulatorResults != null && !allAccumulatorResults.isEmpty()) {
            LOG.info("Final accumulator values:");
            for (Map.Entry<String, Object> entry : jobExecutionResult.getAllAccumulatorResults().entrySet()) {
                LOG.info("{} : {}", entry.getKey(), entry.getValue());
            }
        }
        FlinkRunnerResult flinkRunnerResult = new FlinkRunnerResult(allAccumulatorResults, jobExecutionResult.getNetRuntime());
        new MetricsPusher(flinkRunnerResult.getMetricsContainerStepMap(), (MetricsOptions) pipelineOptions.as(MetricsOptions.class), flinkRunnerResult).start();
        return flinkRunnerResult;
    }

    @VisibleForTesting
    public FlinkPipelineOptions getPipelineOptions() {
        return this.options;
    }

    public String toString() {
        return "FlinkRunner#" + hashCode();
    }

    void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> pTransform) {
        this.ptransformViewsWithNonDeterministicKeyCoders.add(pTransform);
    }

    private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) {
        if (this.ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
            return;
        }
        final TreeSet treeSet = new TreeSet();
        pipeline.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() { // from class: org.apache.beam.runners.flink.FlinkRunner.1
            @Override // org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults, org.apache.beam.sdk.Pipeline.PipelineVisitor
            public void visitPrimitiveTransform(TransformHierarchy.Node node) {
                if (FlinkRunner.this.ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
                    treeSet.add(node.getFullName());
                }
            }

            @Override // org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults, org.apache.beam.sdk.Pipeline.PipelineVisitor
            public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
                if (FlinkRunner.this.ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
                    treeSet.add(node.getFullName());
                }
                return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
            }
        });
        LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} because the key coder is not deterministic. Falling back to singleton implementation which may cause memory and/or performance problems. Future major versions of the Flink runner will require deterministic key coders.", treeSet);
    }

    @VisibleForTesting
    JobGraph getJobGraph(Pipeline pipeline) {
        return new FlinkPipelineExecutionEnvironment(this.options).getJobGraph(pipeline);
    }
}
