package org.apache.flink.test.optimizer.iterations;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.class */
public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
    private static final String VERTEX_SOURCE = "Vertices";
    private static final String ITERATION_NAME = "Connected Components Iteration";
    private static final String EDGES_SOURCE = "Edges";
    private static final String JOIN_NEIGHBORS_MATCH = "Join Candidate Id With Neighbor";
    private static final String MIN_ID_AND_UPDATE = "Min Id and Update";
    private static final String SINK = "Result";
    private static final boolean PRINT_PLAN = false;
    private final FieldList set0 = new FieldList(PRINT_PLAN);

    @FunctionAnnotation.ForwardedFieldsFirst({"f0->f0"})
    @FunctionAnnotation.ForwardedFieldsSecond({"f0->f0"})
    /* loaded from: input_file:org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest$DummyCoGroupFunction.class */
    public static class DummyCoGroupFunction implements CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        public void coGroup(Iterable<Tuple2<Long, Long>> iterable, Iterable<Tuple2<Long, Long>> iterable2, Collector<Tuple2<Long, Long>> collector) throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest$DummyJoinFunction.class */
    public static class DummyJoinFunction implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        public void join(Tuple2<Long, Long> tuple2, Tuple2<Long, Long> tuple22, Collector<Tuple2<Long, Long>> collector) throws Exception {
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Tuple2<Long, Long>) obj, (Tuple2<Long, Long>) obj2, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest$DummyMapFunction.class */
    public static class DummyMapFunction implements FlatMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
        public void flatMap(Tuple1<Long> tuple1, Collector<Tuple2<Long, Long>> collector) throws Exception {
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple1<Long>) obj, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    @Test
    public void testWorksetConnectedComponents() {
        Plan connectedComponentsCoGroupPlan = getConnectedComponentsCoGroupPlan();
        connectedComponentsCoGroupPlan.setExecutionConfig(new ExecutionConfig());
        OptimizedPlan compileNoStats = compileNoStats(connectedComponentsCoGroupPlan);
        CompilerTestBase.OptimizerPlanNodeResolver optimizerPlanNodeResolver = getOptimizerPlanNodeResolver(compileNoStats);
        SourcePlanNode node = optimizerPlanNodeResolver.getNode(VERTEX_SOURCE);
        SourcePlanNode node2 = optimizerPlanNodeResolver.getNode(EDGES_SOURCE);
        SinkPlanNode node3 = optimizerPlanNodeResolver.getNode(SINK);
        WorksetIterationPlanNode node4 = optimizerPlanNodeResolver.getNode(ITERATION_NAME);
        DualInputPlanNode node5 = optimizerPlanNodeResolver.getNode(JOIN_NEIGHBORS_MATCH);
        DualInputPlanNode node6 = optimizerPlanNodeResolver.getNode(MIN_ID_AND_UPDATE);
        Assert.assertEquals(DriverStrategy.NONE, node3.getDriverStrategy());
        Assert.assertEquals(DriverStrategy.NONE, node.getDriverStrategy());
        Assert.assertEquals(DriverStrategy.NONE, node2.getDriverStrategy());
        Assert.assertEquals(DriverStrategy.INNER_MERGE, node5.getDriverStrategy());
        Assert.assertEquals(this.set0, node5.getKeysForInput1());
        Assert.assertEquals(this.set0, node5.getKeysForInput2());
        Assert.assertEquals(DriverStrategy.CO_GROUP, node6.getDriverStrategy());
        Assert.assertEquals(this.set0, node6.getKeysForInput1());
        Assert.assertEquals(this.set0, node6.getKeysForInput2());
        Assert.assertEquals(ShipStrategyType.FORWARD, node3.getInput().getShipStrategy());
        Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node4.getInitialSolutionSetInput().getShipStrategy());
        Assert.assertEquals(this.set0, node4.getInitialSolutionSetInput().getShipStrategyKeys());
        Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node4.getInitialWorksetInput().getShipStrategy());
        Assert.assertEquals(this.set0, node4.getInitialWorksetInput().getShipStrategyKeys());
        Assert.assertEquals(ShipStrategyType.FORWARD, node5.getInput1().getShipStrategy());
        Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node5.getInput2().getShipStrategy());
        Assert.assertEquals(this.set0, node5.getInput2().getShipStrategyKeys());
        Assert.assertTrue(node5.getInput2().getTempMode().isCached());
        Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node6.getInput1().getShipStrategy());
        Assert.assertEquals(ShipStrategyType.FORWARD, node6.getInput2().getShipStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node3.getInput().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node4.getInitialSolutionSetInput().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.SORT, node4.getInitialWorksetInput().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node5.getInput1().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.SORT, node5.getInput2().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.SORT, node6.getInput1().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node6.getInput2().getLocalStrategy());
        Assert.assertTrue(TempMode.CACHED == node5.getInput2().getTempMode());
        new JobGraphGenerator().compileJobGraph(compileNoStats);
    }

    public static Plan getConnectedComponentsCoGroupPlan() {
        PreviewPlanEnvironment previewPlanEnvironment = new PreviewPlanEnvironment();
        previewPlanEnvironment.setAsContext();
        try {
            ConnectedComponentsWithCoGroup(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE, "100"});
        } catch (OptimizerPlanEnvironment.ProgramAbortException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail("ConnectedComponentsWithCoGroup failed with an exception");
        }
        return previewPlanEnvironment.getPlan();
    }

    public static void ConnectedComponentsWithCoGroup(String[] strArr) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(Integer.parseInt(strArr[PRINT_PLAN]));
        Operator name = executionEnvironment.readCsvFile(strArr[1]).types(Long.class).name(VERTEX_SOURCE);
        Operator name2 = executionEnvironment.readCsvFile(strArr[2]).types(Long.class, Long.class).name(EDGES_SOURCE);
        FlatMapOperator flatMap = name.flatMap(new DummyMapFunction());
        DeltaIteration name3 = flatMap.iterateDelta(flatMap, Integer.parseInt(strArr[4]), new int[]{PRINT_PLAN}).name(ITERATION_NAME);
        Operator name4 = name3.getWorkset().join(name2).where(new int[]{PRINT_PLAN}).equalTo(new int[]{PRINT_PLAN}).with(new DummyJoinFunction()).name(JOIN_NEIGHBORS_MATCH).coGroup(name3.getSolutionSet()).where(new int[]{PRINT_PLAN}).equalTo(new int[]{PRINT_PLAN}).with(new DummyCoGroupFunction()).name(MIN_ID_AND_UPDATE);
        name3.closeWith(name4, name4).writeAsCsv(strArr[3]).name(SINK);
        executionEnvironment.execute();
    }
}
