package org.apache.flink.optimizer;

import java.util.Iterator;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.io.ReplicatingInputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/ReplicatingDataSourceTest.class */
public class ReplicatingDataSourceTest extends CompilerTestBase {

    /* loaded from: input_file:org/apache/flink/optimizer/ReplicatingDataSourceTest$IdFlatMap.class */
    public static class IdFlatMap<T> implements FlatMapFunction<T, T> {
        public void flatMap(T t, Collector<T> collector) throws Exception {
            collector.collect(t);
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/ReplicatingDataSourceTest$IdMap.class */
    public static class IdMap<T> implements MapFunction<T, T> {
        public T map(T t) throws Exception {
            return t;
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/ReplicatingDataSourceTest$IdPMap.class */
    public static class IdPMap<T> implements MapPartitionFunction<T, T> {
        public void mapPartition(Iterable<T> iterable, Collector<T> collector) throws Exception {
            Iterator<T> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(it.next());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/ReplicatingDataSourceTest$LastReduce.class */
    public static class LastReduce<T> implements ReduceFunction<T> {
        public T reduce(T t, T t2) throws Exception {
            return t2;
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/ReplicatingDataSourceTest$NoFilter.class */
    public static class NoFilter<T> implements FilterFunction<T> {
        public boolean filter(T t) throws Exception {
            return false;
        }
    }

    @Test
    public void checkJoinWithReplicatedSourceInput() {
        LocalEnvironment createLocalEnvironment = ExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(8);
        createLocalEnvironment.createInput(new ReplicatingInputFormat(new TupleCsvInputFormat(new Path("/some/path"), TupleTypeInfo.getBasicTupleTypeInfo(new Class[]{String.class}))), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).join(createLocalEnvironment.readCsvFile("/some/otherpath").types(String.class)).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        DualInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(createLocalEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getInput1().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput2().getShipStrategy();
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy);
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy2);
    }

    @Test
    public void checkJoinWithReplicatedSourceInputBehindMap() {
        LocalEnvironment createLocalEnvironment = ExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(8);
        createLocalEnvironment.createInput(new ReplicatingInputFormat(new TupleCsvInputFormat(new Path("/some/path"), TupleTypeInfo.getBasicTupleTypeInfo(new Class[]{String.class}))), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).map(new IdMap()).join(createLocalEnvironment.readCsvFile("/some/otherpath").types(String.class)).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        DualInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(createLocalEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getInput1().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput2().getShipStrategy();
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy);
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy2);
    }

    @Test
    public void checkJoinWithReplicatedSourceInputBehindFilter() {
        LocalEnvironment createLocalEnvironment = ExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(8);
        createLocalEnvironment.createInput(new ReplicatingInputFormat(new TupleCsvInputFormat(new Path("/some/path"), TupleTypeInfo.getBasicTupleTypeInfo(new Class[]{String.class}))), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).filter(new NoFilter()).join(createLocalEnvironment.readCsvFile("/some/otherpath").types(String.class)).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        DualInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(createLocalEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getInput1().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput2().getShipStrategy();
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy);
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy2);
    }

    @Test
    public void checkJoinWithReplicatedSourceInputBehindFlatMap() {
        LocalEnvironment createLocalEnvironment = ExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(8);
        createLocalEnvironment.createInput(new ReplicatingInputFormat(new TupleCsvInputFormat(new Path("/some/path"), TupleTypeInfo.getBasicTupleTypeInfo(new Class[]{String.class}))), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).flatMap(new IdFlatMap()).join(createLocalEnvironment.readCsvFile("/some/otherpath").types(String.class)).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        DualInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(createLocalEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getInput1().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput2().getShipStrategy();
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy);
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy2);
    }

    @Test
    public void checkJoinWithReplicatedSourceInputBehindMapPartition() {
        LocalEnvironment createLocalEnvironment = ExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(8);
        createLocalEnvironment.createInput(new ReplicatingInputFormat(new TupleCsvInputFormat(new Path("/some/path"), TupleTypeInfo.getBasicTupleTypeInfo(new Class[]{String.class}))), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).mapPartition(new IdPMap()).join(createLocalEnvironment.readCsvFile("/some/otherpath").types(String.class)).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        DualInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(createLocalEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getInput1().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput2().getShipStrategy();
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy);
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy2);
    }

    @Test
    public void checkJoinWithReplicatedSourceInputBehindMultiMaps() {
        LocalEnvironment createLocalEnvironment = ExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(8);
        createLocalEnvironment.createInput(new ReplicatingInputFormat(new TupleCsvInputFormat(new Path("/some/path"), TupleTypeInfo.getBasicTupleTypeInfo(new Class[]{String.class}))), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).filter(new NoFilter()).mapPartition(new IdPMap()).flatMap(new IdFlatMap()).map(new IdMap()).join(createLocalEnvironment.readCsvFile("/some/otherpath").types(String.class)).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        DualInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(createLocalEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getInput1().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput2().getShipStrategy();
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy);
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy2);
    }

    @Test
    public void checkCrossWithReplicatedSourceInput() {
        LocalEnvironment createLocalEnvironment = ExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(8);
        createLocalEnvironment.createInput(new ReplicatingInputFormat(new TupleCsvInputFormat(new Path("/some/path"), TupleTypeInfo.getBasicTupleTypeInfo(new Class[]{String.class}))), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).cross(createLocalEnvironment.readCsvFile("/some/otherpath").types(String.class)).writeAsText("/some/newpath");
        DualInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(createLocalEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getInput1().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput2().getShipStrategy();
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy);
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy2);
    }

    @Test
    public void checkCrossWithReplicatedSourceInputBehindMap() {
        LocalEnvironment createLocalEnvironment = ExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(8);
        createLocalEnvironment.createInput(new ReplicatingInputFormat(new TupleCsvInputFormat(new Path("/some/path"), TupleTypeInfo.getBasicTupleTypeInfo(new Class[]{String.class}))), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).map(new IdMap()).filter(new NoFilter()).cross(createLocalEnvironment.readCsvFile("/some/otherpath").types(String.class)).writeAsText("/some/newpath");
        DualInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(createLocalEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getInput1().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput2().getShipStrategy();
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy);
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy2);
    }

    @Test(expected = CompilerException.class)
    public void checkJoinWithReplicatedSourceInputChangingparallelism() {
        LocalEnvironment createLocalEnvironment = ExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(8);
        createLocalEnvironment.createInput(new ReplicatingInputFormat(new TupleCsvInputFormat(new Path("/some/path"), TupleTypeInfo.getBasicTupleTypeInfo(new Class[]{String.class}))), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).join(createLocalEnvironment.readCsvFile("/some/otherpath").types(String.class)).where(new String[]{"*"}).equalTo(new String[]{"*"}).setParallelism(10).writeAsText("/some/newpath");
        compileNoStats(createLocalEnvironment.createProgramPlan());
    }

    @Test(expected = CompilerException.class)
    public void checkJoinWithReplicatedSourceInputBehindMapChangingparallelism() {
        LocalEnvironment createLocalEnvironment = ExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(8);
        createLocalEnvironment.createInput(new ReplicatingInputFormat(new TupleCsvInputFormat(new Path("/some/path"), TupleTypeInfo.getBasicTupleTypeInfo(new Class[]{String.class}))), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).map(new IdMap()).setParallelism(9).join(createLocalEnvironment.readCsvFile("/some/otherpath").types(String.class)).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        compileNoStats(createLocalEnvironment.createProgramPlan());
    }

    @Test(expected = CompilerException.class)
    public void checkJoinWithReplicatedSourceInputBehindReduce() {
        LocalEnvironment createLocalEnvironment = ExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(8);
        createLocalEnvironment.createInput(new ReplicatingInputFormat(new TupleCsvInputFormat(new Path("/some/path"), TupleTypeInfo.getBasicTupleTypeInfo(new Class[]{String.class}))), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).reduce(new LastReduce()).join(createLocalEnvironment.readCsvFile("/some/otherpath").types(String.class)).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        compileNoStats(createLocalEnvironment.createProgramPlan());
    }

    @Test(expected = CompilerException.class)
    public void checkJoinWithReplicatedSourceInputBehindRebalance() {
        LocalEnvironment createLocalEnvironment = ExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(8);
        createLocalEnvironment.createInput(new ReplicatingInputFormat(new TupleCsvInputFormat(new Path("/some/path"), TupleTypeInfo.getBasicTupleTypeInfo(new Class[]{String.class}))), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO})).rebalance().join(createLocalEnvironment.readCsvFile("/some/otherpath").types(String.class)).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        compileNoStats(createLocalEnvironment.createProgramPlan());
    }
}
