package org.apache.beam.runners.flink;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.flink.CreateStreamingFlinkView;
import org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.testing.RegexMatcher;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.RemoteEnvironment;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.hamcrest.BaseMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Every;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.class */
public class FlinkPipelineExecutionEnvironmentTest implements Serializable {

    @Rule
    public transient TemporaryFolder tmpFolder = new TemporaryFolder();

    @Test
    public void shouldRecognizeAndTranslateStreamingPipeline() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        as.setFlinkMaster("[auto]");
        FlinkPipelineExecutionEnvironment flinkPipelineExecutionEnvironment = new FlinkPipelineExecutionEnvironment(as);
        Pipeline create = Pipeline.create();
        create.apply(GenerateSequence.from(0L).withRate(1L, Duration.standardSeconds(1L))).apply(ParDo.of(new DoFn<Long, String>() { // from class: org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironmentTest.1
            @DoFn.ProcessElement
            public void processElement(DoFn<Long, String>.ProcessContext processContext) throws Exception {
                processContext.output(Long.toString(((Long) processContext.element()).longValue()));
            }
        })).apply(Window.into(FixedWindows.of(Duration.standardHours(1L)))).apply(TextIO.write().withNumShards(1).withWindowedWrites().to("/dummy/path"));
        flinkPipelineExecutionEnvironment.translate(create);
    }

    @Test
    public void shouldPrepareFilesToStageWhenFlinkMasterIsSetExplicitly() throws IOException {
        FlinkPipelineOptions testPreparingResourcesToStage = testPreparingResourcesToStage("localhost:8081", false);
        MatcherAssert.assertThat(Integer.valueOf(testPreparingResourcesToStage.getFilesToStage().size()), CoreMatchers.is(2));
        MatcherAssert.assertThat((String) testPreparingResourcesToStage.getFilesToStage().get(0), RegexMatcher.matches(".*\\.jar"));
    }

    @Test
    public void shouldFailWhenFileDoesNotExistAndFlinkMasterIsSetExplicitly() {
        Assert.assertThrows("To-be-staged file does not exist: ", IllegalStateException.class, () -> {
            testPreparingResourcesToStage("localhost:8081", true);
        });
    }

    @Test
    public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToAuto() throws IOException {
        FlinkPipelineOptions testPreparingResourcesToStage = testPreparingResourcesToStage("[auto]");
        MatcherAssert.assertThat(Integer.valueOf(testPreparingResourcesToStage.getFilesToStage().size()), CoreMatchers.is(3));
        MatcherAssert.assertThat(testPreparingResourcesToStage.getFilesToStage(), Every.everyItem(CoreMatchers.not(RegexMatcher.matches(".*\\.jar"))));
    }

    @Test
    public void shouldNotPrepareFilesToStagewhenFlinkMasterIsSetToCollection() throws IOException {
        FlinkPipelineOptions testPreparingResourcesToStage = testPreparingResourcesToStage("[collection]");
        MatcherAssert.assertThat(Integer.valueOf(testPreparingResourcesToStage.getFilesToStage().size()), CoreMatchers.is(3));
        MatcherAssert.assertThat(testPreparingResourcesToStage.getFilesToStage(), Every.everyItem(CoreMatchers.not(RegexMatcher.matches(".*\\.jar"))));
    }

    @Test
    public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToLocal() throws IOException {
        FlinkPipelineOptions testPreparingResourcesToStage = testPreparingResourcesToStage("[local]");
        MatcherAssert.assertThat(Integer.valueOf(testPreparingResourcesToStage.getFilesToStage().size()), CoreMatchers.is(3));
        MatcherAssert.assertThat(testPreparingResourcesToStage.getFilesToStage(), Every.everyItem(CoreMatchers.not(RegexMatcher.matches(".*\\.jar"))));
    }

    @Test
    public void shouldUseDefaultTempLocationIfNoneSet() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        as.setFlinkMaster("clusterAddress");
        new FlinkPipelineExecutionEnvironment(as).translate(Pipeline.create(as));
        MatcherAssert.assertThat(as.getFilesToStage(), Matchers.hasItem(CoreMatchers.startsWith(System.getProperty("java.io.tmpdir"))));
    }

    @Test
    public void shouldUsePreparedFilesOnRemoteEnvironment() throws Exception {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        as.setFlinkMaster("clusterAddress");
        FlinkPipelineExecutionEnvironment flinkPipelineExecutionEnvironment = new FlinkPipelineExecutionEnvironment(as);
        flinkPipelineExecutionEnvironment.translate(Pipeline.create(as));
        ExecutionEnvironment batchExecutionEnvironment = flinkPipelineExecutionEnvironment.getBatchExecutionEnvironment();
        MatcherAssert.assertThat(batchExecutionEnvironment, CoreMatchers.instanceOf(RemoteEnvironment.class));
        MatcherAssert.assertThat((List) Whitebox.getInternalState(batchExecutionEnvironment, "jarFiles"), CoreMatchers.is(convertFilesToURLs(as.getFilesToStage())));
    }

    @Test
    public void shouldUsePreparedFilesOnRemoteStreamEnvironment() throws Exception {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        as.setFlinkMaster("clusterAddress");
        as.setStreaming(true);
        FlinkPipelineExecutionEnvironment flinkPipelineExecutionEnvironment = new FlinkPipelineExecutionEnvironment(as);
        flinkPipelineExecutionEnvironment.translate(Pipeline.create(as));
        StreamExecutionEnvironment streamExecutionEnvironment = flinkPipelineExecutionEnvironment.getStreamExecutionEnvironment();
        MatcherAssert.assertThat(streamExecutionEnvironment, CoreMatchers.instanceOf(RemoteStreamEnvironment.class));
        MatcherAssert.assertThat((List) Whitebox.getInternalState(streamExecutionEnvironment, "jarFiles"), CoreMatchers.is(convertFilesToURLs(as.getFilesToStage())));
    }

    @Test
    public void shouldUseTransformOverrides() {
        for (boolean z : new boolean[]{true, false}) {
            FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
            as.setStreaming(z);
            as.setRunner(FlinkRunner.class);
            FlinkPipelineExecutionEnvironment flinkPipelineExecutionEnvironment = new FlinkPipelineExecutionEnvironment(as);
            Pipeline pipeline = (Pipeline) Mockito.spy(Pipeline.create(as));
            flinkPipelineExecutionEnvironment.translate(pipeline);
            ArgumentCaptor forClass = ArgumentCaptor.forClass(ImmutableList.class);
            ((Pipeline) Mockito.verify(pipeline)).replaceAll((List) forClass.capture());
            ImmutableList immutableList = (ImmutableList) forClass.getValue();
            MatcherAssert.assertThat(Boolean.valueOf(immutableList.isEmpty()), CoreMatchers.is(false));
            MatcherAssert.assertThat(Integer.valueOf(immutableList.size()), CoreMatchers.is(Integer.valueOf(FlinkTransformOverrides.getDefaultOverrides(as).size())));
        }
    }

    @Test
    public void shouldProvideParallelismToTransformOverrides() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setStreaming(true);
        as.setRunner(FlinkRunner.class);
        FlinkPipelineExecutionEnvironment flinkPipelineExecutionEnvironment = new FlinkPipelineExecutionEnvironment(as);
        Pipeline create = Pipeline.create(as);
        create.apply(Create.of("test", new String[0])).apply(TextIO.write().to("/tmp"));
        Pipeline pipeline = (Pipeline) Mockito.spy(create);
        flinkPipelineExecutionEnvironment.translate(pipeline);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ImmutableList.class);
        ((Pipeline) Mockito.verify(pipeline)).replaceAll((List) forClass.capture());
        MatcherAssert.assertThat((ImmutableList) forClass.getValue(), Matchers.hasItem(new BaseMatcher<PTransformOverride>() { // from class: org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironmentTest.2
            public void describeTo(Description description) {
            }

            public boolean matches(Object obj) {
                if (!(obj instanceof PTransformOverride)) {
                    return false;
                }
                FlinkStreamingPipelineTranslator.StreamingShardedWriteFactory overrideFactory = ((PTransformOverride) obj).getOverrideFactory();
                return (overrideFactory instanceof FlinkStreamingPipelineTranslator.StreamingShardedWriteFactory) && overrideFactory.options.getParallelism().intValue() > 0;
            }
        }));
    }

    @Test
    public void shouldUseStreamingTransformOverridesWithUnboundedSources() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        FlinkPipelineExecutionEnvironment flinkPipelineExecutionEnvironment = new FlinkPipelineExecutionEnvironment(as);
        Pipeline pipeline = (Pipeline) Mockito.spy(Pipeline.create(as));
        pipeline.apply(GenerateSequence.from(0L));
        flinkPipelineExecutionEnvironment.translate(pipeline);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ImmutableList.class);
        ((Pipeline) Mockito.verify(pipeline)).replaceAll((List) forClass.capture());
        MatcherAssert.assertThat((ImmutableList) forClass.getValue(), Matchers.hasItem(PTransformOverride.of(PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN), CreateStreamingFlinkView.Factory.INSTANCE)));
    }

    @Test
    public void testTranslationModeOverrideWithUnboundedSources() {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(FlinkRunner.class);
        as.setStreaming(false);
        FlinkPipelineExecutionEnvironment flinkPipelineExecutionEnvironment = new FlinkPipelineExecutionEnvironment(as);
        Pipeline create = Pipeline.create(as);
        create.apply(GenerateSequence.from(0L));
        flinkPipelineExecutionEnvironment.translate(create);
        MatcherAssert.assertThat(Boolean.valueOf(as.isStreaming()), Matchers.is(true));
    }

    @Test
    public void testTranslationModeNoOverrideWithoutUnboundedSources() {
        for (boolean z : new boolean[]{true, false}) {
            FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
            as.setRunner(FlinkRunner.class);
            as.setStreaming(z);
            FlinkPipelineExecutionEnvironment flinkPipelineExecutionEnvironment = new FlinkPipelineExecutionEnvironment(as);
            Pipeline create = Pipeline.create(as);
            create.apply(GenerateSequence.from(0L).to(10L));
            flinkPipelineExecutionEnvironment.translate(create);
            MatcherAssert.assertThat(Boolean.valueOf(as.isStreaming()), Matchers.is(Boolean.valueOf(z)));
        }
    }

    @Test
    public void shouldLogWarningWhenCheckpointingIsDisabled() {
        Pipeline create = Pipeline.create();
        create.getOptions().setRunner(TestFlinkRunner.class);
        create.apply(GenerateSequence.from(0L)).apply(ParDo.of(new DoFn<Long, Void>() { // from class: org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironmentTest.3
            @DoFn.ProcessElement
            public void processElement(DoFn<Long, Void>.ProcessContext processContext) {
                throw new RuntimeException("Failing here is ok.");
            }
        }));
        PrintStream printStream = System.err;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrintStream printStream2 = new PrintStream(byteArrayOutputStream);
        try {
            System.setErr(printStream2);
            create.run();
            Assert.fail("Should have failed");
            System.setErr(printStream);
        } catch (Exception e) {
            System.setErr(printStream);
        } catch (Throwable th) {
            System.setErr(printStream);
            throw th;
        }
        printStream2.flush();
        MatcherAssert.assertThat(new String(byteArrayOutputStream.toByteArray(), Charsets.UTF_8), CoreMatchers.containsString("UnboundedSources present which rely on checkpointing, but checkpointing is disabled."));
    }

    private FlinkPipelineOptions testPreparingResourcesToStage(String str) throws IOException {
        return testPreparingResourcesToStage(str, true);
    }

    private FlinkPipelineOptions testPreparingResourcesToStage(String str, boolean z) throws IOException {
        Pipeline create = Pipeline.create();
        String absolutePath = this.tmpFolder.newFolder().getAbsolutePath();
        ArrayList arrayList = new ArrayList();
        File newFolder = this.tmpFolder.newFolder();
        newFolder.createNewFile();
        arrayList.add(newFolder.getAbsolutePath());
        arrayList.add(this.tmpFolder.newFile().getAbsolutePath());
        if (z) {
            arrayList.add("/path/to/not/existing/dir");
        }
        FlinkPipelineOptions pipelineOptions = setPipelineOptions(str, absolutePath, arrayList);
        new FlinkPipelineExecutionEnvironment(pipelineOptions).translate(create);
        return pipelineOptions;
    }

    private FlinkPipelineOptions setPipelineOptions(String str, String str2, List<String> list) {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setRunner(TestFlinkRunner.class);
        as.setFlinkMaster(str);
        as.setTempLocation(str2);
        as.setFilesToStage(list);
        return as;
    }

    private static List<URL> convertFilesToURLs(List<String> list) {
        return (List) list.stream().map(str -> {
            try {
                return new File(str).getAbsoluteFile().toURI().toURL();
            } catch (MalformedURLException e) {
                throw new RuntimeException("Failed to convert to URL", e);
            }
        }).collect(Collectors.toList());
    }
}
