package org.apache.flink.optimizer.dataexchange;

import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.class */
public class DataExchangeModeForwardTest extends CompilerTestBase {
    @Test
    public void testPipelinedForced() {
        verifySimpleForwardPlan(ExecutionMode.PIPELINED_FORCED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
    }

    @Test
    public void testPipelined() {
        verifySimpleForwardPlan(ExecutionMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
    }

    @Test
    public void testBatch() {
        verifySimpleForwardPlan(ExecutionMode.BATCH, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED);
    }

    @Test
    public void testBatchForced() {
        verifySimpleForwardPlan(ExecutionMode.BATCH_FORCED, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.BATCH);
    }

    private void verifySimpleForwardPlan(ExecutionMode executionMode, DataExchangeMode dataExchangeMode, DataExchangeMode dataExchangeMode2, DataExchangeMode dataExchangeMode3, DataExchangeMode dataExchangeMode4, DataExchangeMode dataExchangeMode5, DataExchangeMode dataExchangeMode6) {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.getConfig().setExecutionMode(executionMode);
            executionEnvironment.readTextFile("/never/accessed").map(new MapFunction<String, Integer>() { // from class: org.apache.flink.optimizer.dataexchange.DataExchangeModeForwardTest.2
                public Integer map(String str) {
                    return 0;
                }
            }).filter(new FilterFunction<Integer>() { // from class: org.apache.flink.optimizer.dataexchange.DataExchangeModeForwardTest.1
                public boolean filter(Integer num) {
                    return false;
                }
            }).groupBy(new IdentityKeyExtractor()).reduceGroup(new Top1GroupReducer()).output(new DiscardingOutputFormat());
            SinkPlanNode sinkPlanNode = (SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next();
            SingleInputPlanNode predecessor = sinkPlanNode.getPredecessor();
            SingleInputPlanNode predecessor2 = predecessor.getPredecessor();
            SingleInputPlanNode predecessor3 = predecessor2.getPredecessor();
            SingleInputPlanNode predecessor4 = predecessor3.getPredecessor();
            Assert.assertEquals(dataExchangeMode, predecessor4.getPredecessor().getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode2, predecessor4.getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode3, predecessor3.getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode4, predecessor2.getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode5, predecessor.getInput().getDataExchangeMode());
            Assert.assertEquals(dataExchangeMode6, sinkPlanNode.getInput().getDataExchangeMode());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
