package org.apache.flink.test.iterative.nephele;

import java.io.BufferedReader;
import java.util.Collection;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
import org.apache.flink.api.java.record.functions.MapFunction;
import org.apache.flink.api.java.record.io.CsvInputFormat;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
import org.apache.flink.runtime.operators.CollectorMapDriver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.GroupReduceDriver;
import org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver;
import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.class */
public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
    private static final long SEED = 3287269182979823L;
    private static final int NUM_VERTICES = 1000;
    private static final int NUM_EDGES = 10000;
    private static final int ITERATION_ID = 1;
    private static final long MEM_PER_CONSUMER = 3;
    private static final int parallelism = 4;
    private static final double MEM_FRAC_PER_CONSUMER = 0.15d;
    protected String verticesPath;
    protected String edgesPath;
    protected String resultPath;

    /* loaded from: input_file:org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase$DummyMapper.class */
    public static final class DummyMapper extends MapFunction {
        private static final long serialVersionUID = 1;

        public void map(Record record, Collector<Record> collector) {
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Collector collector) throws Exception {
            map((Record) obj, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase$IdDuplicator.class */
    public static final class IdDuplicator extends MapFunction {
        private static final long serialVersionUID = 1;

        public void map(Record record, Collector<Record> collector) throws Exception {
            record.setField(ConnectedComponentsNepheleITCase.ITERATION_ID, record.getField(0, LongValue.class));
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Collector collector) throws Exception {
            map((Record) obj, (Collector<Record>) collector);
        }
    }

    public ConnectedComponentsNepheleITCase(Configuration configuration) {
        super(configuration);
        setTaskManagerNumSlots(parallelism);
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Configuration configuration = new Configuration();
        configuration.setInteger("testcase", ITERATION_ID);
        Configuration configuration2 = new Configuration();
        configuration2.setInteger("testcase", 2);
        Configuration configuration3 = new Configuration();
        configuration3.setInteger("testcase", 3);
        Configuration configuration4 = new Configuration();
        configuration4.setInteger("testcase", parallelism);
        return toParameterList(new Configuration[]{configuration, configuration2, configuration3, configuration4});
    }

    protected void preSubmit() throws Exception {
        this.verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
        this.edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
        this.resultPath = getTempFilePath("results");
    }

    protected JobGraph getJobGraph() throws Exception {
        switch (this.config.getInteger("testcase", 0)) {
            case ITERATION_ID /* 1 */:
                return createJobGraphUnifiedTails(this.verticesPath, this.edgesPath, this.resultPath, parallelism, 100);
            case 2:
                return createJobGraphSeparateTails(this.verticesPath, this.edgesPath, this.resultPath, parallelism, 100);
            case 3:
                return createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(this.verticesPath, this.edgesPath, this.resultPath, parallelism, 100);
            case parallelism /* 4 */:
                return createJobGraphSolutionSetUpdateAndWorksetTail(this.verticesPath, this.edgesPath, this.resultPath, parallelism, 100);
            default:
                throw new RuntimeException("Broken test configuration");
        }
    }

    protected void postSubmit() throws Exception {
        BufferedReader[] resultReader = getResultReader(this.resultPath);
        int length = resultReader.length;
        for (int i = 0; i < length; i += ITERATION_ID) {
            ConnectedComponentsData.checkOddEvenResult(resultReader[i]);
        }
    }

    private static InputFormatVertex createVerticesInput(JobGraph jobGraph, String str, int i, TypeSerializerFactory<?> typeSerializerFactory, TypeComparatorFactory<?> typeComparatorFactory) {
        InputFormatVertex createInput = JobGraphUtils.createInput(new CsvInputFormat(' ', new Class[]{LongValue.class}), str, "VerticesInput", jobGraph, i);
        TaskConfig taskConfig = new TaskConfig(createInput.getConfiguration());
        taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig.setOutputSerializer(typeSerializerFactory);
        TaskConfig taskConfig2 = new TaskConfig(new Configuration());
        taskConfig2.setStubWrapper(new UserCodeClassWrapper(IdDuplicator.class));
        taskConfig2.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        taskConfig2.setInputLocalStrategy(0, LocalStrategy.NONE);
        taskConfig2.setInputSerializer(typeSerializerFactory, 0);
        taskConfig2.setOutputSerializer(typeSerializerFactory);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        taskConfig2.setOutputComparator(typeComparatorFactory, 0);
        taskConfig2.setOutputComparator(typeComparatorFactory, ITERATION_ID);
        taskConfig.addChainedTask(ChainedCollectorMapDriver.class, taskConfig2, "ID Duplicator");
        return createInput;
    }

    private static InputFormatVertex createEdgesInput(JobGraph jobGraph, String str, int i, TypeSerializerFactory<?> typeSerializerFactory, TypeComparatorFactory<?> typeComparatorFactory) {
        InputFormatVertex createInput = JobGraphUtils.createInput(new CsvInputFormat(' ', new Class[]{LongValue.class, LongValue.class}), str, "EdgesInput", jobGraph, i);
        TaskConfig taskConfig = new TaskConfig(createInput.getConfiguration());
        taskConfig.setOutputSerializer(typeSerializerFactory);
        taskConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        taskConfig.setOutputComparator(typeComparatorFactory, 0);
        return createInput;
    }

    private static JobVertex createIterationHead(JobGraph jobGraph, int i, TypeSerializerFactory<?> typeSerializerFactory, TypeComparatorFactory<?> typeComparatorFactory, TypePairComparatorFactory<?, ?> typePairComparatorFactory) {
        JobVertex createTask = JobGraphUtils.createTask(IterationHeadPactTask.class, "Join With Edges (Iteration Head)", jobGraph, i);
        TaskConfig taskConfig = new TaskConfig(createTask.getConfiguration());
        taskConfig.setIterationId(ITERATION_ID);
        taskConfig.addInputToGroup(0);
        taskConfig.setInputSerializer(typeSerializerFactory, 0);
        taskConfig.setInputComparator(typeComparatorFactory, 0);
        taskConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
        taskConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
        taskConfig.addInputToGroup(ITERATION_ID);
        taskConfig.setInputSerializer(typeSerializerFactory, ITERATION_ID);
        taskConfig.setInputComparator(typeComparatorFactory, ITERATION_ID);
        taskConfig.setInputLocalStrategy(ITERATION_ID, LocalStrategy.NONE);
        taskConfig.setInputCached(ITERATION_ID, true);
        taskConfig.setRelativeInputMaterializationMemory(ITERATION_ID, MEM_FRAC_PER_CONSUMER);
        taskConfig.addInputToGroup(2);
        taskConfig.setInputSerializer(typeSerializerFactory, 2);
        taskConfig.setInputComparator(typeComparatorFactory, 2);
        taskConfig.setInputLocalStrategy(2, LocalStrategy.NONE);
        taskConfig.setIterationHeadSolutionSetInputIndex(2);
        taskConfig.setSolutionSetSerializer(typeSerializerFactory);
        taskConfig.setSolutionSetComparator(typeComparatorFactory);
        taskConfig.setIsWorksetIteration();
        taskConfig.setRelativeBackChannelMemory(MEM_FRAC_PER_CONSUMER);
        taskConfig.setRelativeSolutionSetMemory(MEM_FRAC_PER_CONSUMER);
        taskConfig.setOutputSerializer(typeSerializerFactory);
        taskConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        taskConfig.setOutputComparator(typeComparatorFactory, 0);
        TaskConfig taskConfig2 = new TaskConfig(new Configuration());
        taskConfig2.setOutputSerializer(typeSerializerFactory);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig.setIterationHeadFinalOutputConfig(taskConfig2);
        taskConfig.setIterationHeadIndexOfSyncOutput(2);
        taskConfig.setDriver(BuildSecondCachedMatchDriver.class);
        taskConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        taskConfig.setStubWrapper(new UserCodeClassWrapper(WorksetConnectedComponents.NeighborWithComponentIDJoin.class));
        taskConfig.setDriverComparator(typeComparatorFactory, 0);
        taskConfig.setDriverComparator(typeComparatorFactory, ITERATION_ID);
        taskConfig.setDriverPairComparator(typePairComparatorFactory);
        taskConfig.setRelativeMemoryDriver(MEM_FRAC_PER_CONSUMER);
        taskConfig.addIterationAggregator("pact.runtime.workset-empty-aggregator", new LongSumAggregator());
        return createTask;
    }

    private static JobVertex createIterationIntermediate(JobGraph jobGraph, int i, TypeSerializerFactory<?> typeSerializerFactory, TypeComparatorFactory<?> typeComparatorFactory) {
        JobVertex createTask = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "Find Min Component-ID", jobGraph, i);
        TaskConfig taskConfig = new TaskConfig(createTask.getConfiguration());
        taskConfig.setIterationId(ITERATION_ID);
        taskConfig.addInputToGroup(0);
        taskConfig.setInputSerializer(typeSerializerFactory, 0);
        taskConfig.setInputComparator(typeComparatorFactory, 0);
        taskConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
        taskConfig.setRelativeMemoryInput(0, MEM_FRAC_PER_CONSUMER);
        taskConfig.setFilehandlesInput(0, 64);
        taskConfig.setSpillingThresholdInput(0, 0.85f);
        taskConfig.setOutputSerializer(typeSerializerFactory);
        taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig.setDriver(GroupReduceDriver.class);
        taskConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        taskConfig.setDriverComparator(typeComparatorFactory, 0);
        taskConfig.setStubWrapper(new UserCodeObjectWrapper(new ReduceOperator.WrappingClassReduceFunction(WorksetConnectedComponents.MinimumComponentIDReduce.class)));
        return createTask;
    }

    private static OutputFormatVertex createOutput(JobGraph jobGraph, String str, int i, TypeSerializerFactory<?> typeSerializerFactory) {
        OutputFormatVertex createFileOutput = JobGraphUtils.createFileOutput(jobGraph, "Final Output", i);
        TaskConfig taskConfig = new TaskConfig(createFileOutput.getConfiguration());
        taskConfig.addInputToGroup(0);
        taskConfig.setInputSerializer(typeSerializerFactory, 0);
        taskConfig.setStubWrapper(new UserCodeClassWrapper(CsvOutputFormat.class));
        taskConfig.setStubParameter("flink.output.file", str);
        Configuration stubParameters = taskConfig.getStubParameters();
        stubParameters.setString("output.record.delimiter", "\n");
        stubParameters.setString("output.record.field-delimiter", " ");
        stubParameters.setClass("output.record.type_0", LongValue.class);
        stubParameters.setInteger("output.record.position_0", 0);
        stubParameters.setClass("output.record.type_1", LongValue.class);
        stubParameters.setInteger("output.record.position_1", ITERATION_ID);
        stubParameters.setInteger("output.record.num-fields", 2);
        return createFileOutput;
    }

    private static JobVertex createSync(JobGraph jobGraph, int i, int i2) {
        JobVertex createSync = JobGraphUtils.createSync(jobGraph, i);
        TaskConfig taskConfig = new TaskConfig(createSync.getConfiguration());
        taskConfig.setNumberOfIterations(i2);
        taskConfig.setIterationId(ITERATION_ID);
        taskConfig.addIterationAggregator("pact.runtime.workset-empty-aggregator", new LongSumAggregator());
        taskConfig.setConvergenceCriterion("pact.runtime.workset-empty-aggregator", new WorksetEmptyConvergenceCriterion());
        return createSync;
    }

    public JobGraph createJobGraphUnifiedTails(String str, String str2, String str3, int i, int i2) {
        RecordSerializerFactory recordSerializerFactory = RecordSerializerFactory.get();
        RecordComparatorFactory recordComparatorFactory = new RecordComparatorFactory(new int[]{0}, new Class[]{LongValue.class}, new boolean[]{ITERATION_ID});
        RecordPairComparatorFactory recordPairComparatorFactory = RecordPairComparatorFactory.get();
        JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
        InputFormatVertex createVerticesInput = createVerticesInput(jobGraph, str, i, recordSerializerFactory, recordComparatorFactory);
        InputFormatVertex createEdgesInput = createEdgesInput(jobGraph, str2, i, recordSerializerFactory, recordComparatorFactory);
        JobVertex createIterationHead = createIterationHead(jobGraph, i, recordSerializerFactory, recordComparatorFactory, recordPairComparatorFactory);
        JobVertex createIterationIntermediate = createIterationIntermediate(jobGraph, i, recordSerializerFactory, recordComparatorFactory);
        TaskConfig taskConfig = new TaskConfig(createIterationIntermediate.getConfiguration());
        OutputFormatVertex createOutput = createOutput(jobGraph, str3, i, recordSerializerFactory);
        JobVertex createSync = createSync(jobGraph, i, i2);
        JobVertex createTask = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph, i);
        TaskConfig taskConfig2 = new TaskConfig(createTask.getConfiguration());
        taskConfig2.setIterationId(ITERATION_ID);
        taskConfig2.setIsWorksetIteration();
        taskConfig2.setIsWorksetUpdate();
        taskConfig2.setIsSolutionSetUpdate();
        taskConfig2.setIsSolutionSetUpdateWithoutReprobe();
        taskConfig2.addInputToGroup(0);
        taskConfig2.setInputSerializer(recordSerializerFactory, 0);
        taskConfig2.setOutputSerializer(recordSerializerFactory);
        taskConfig2.setDriver(JoinWithSolutionSetSecondDriver.class);
        taskConfig2.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        taskConfig2.setDriverComparator(recordComparatorFactory, 0);
        taskConfig2.setDriverPairComparator(recordPairComparatorFactory);
        taskConfig2.setStubWrapper(new UserCodeClassWrapper(WorksetConnectedComponents.UpdateComponentIdMatch.class));
        JobGraphUtils.connect(createVerticesInput, createIterationHead, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(createEdgesInput, createIterationHead, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(createVerticesInput, createIterationHead, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(createIterationHead, createIterationIntermediate, DistributionPattern.ALL_TO_ALL);
        taskConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, i);
        JobGraphUtils.connect(createIterationIntermediate, createTask, DistributionPattern.POINTWISE);
        taskConfig2.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createIterationHead, createOutput, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createIterationHead, createSync, DistributionPattern.POINTWISE);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        createVerticesInput.setSlotSharingGroup(slotSharingGroup);
        createEdgesInput.setSlotSharingGroup(slotSharingGroup);
        createIterationHead.setSlotSharingGroup(slotSharingGroup);
        createIterationIntermediate.setSlotSharingGroup(slotSharingGroup);
        createTask.setSlotSharingGroup(slotSharingGroup);
        createOutput.setSlotSharingGroup(slotSharingGroup);
        createSync.setSlotSharingGroup(slotSharingGroup);
        createIterationIntermediate.setStrictlyCoLocatedWith(createIterationHead);
        createTask.setStrictlyCoLocatedWith(createIterationHead);
        return jobGraph;
    }

    public JobGraph createJobGraphSeparateTails(String str, String str2, String str3, int i, int i2) {
        RecordSerializerFactory recordSerializerFactory = RecordSerializerFactory.get();
        RecordComparatorFactory recordComparatorFactory = new RecordComparatorFactory(new int[]{0}, new Class[]{LongValue.class}, new boolean[]{ITERATION_ID});
        RecordPairComparatorFactory recordPairComparatorFactory = RecordPairComparatorFactory.get();
        JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
        InputFormatVertex createVerticesInput = createVerticesInput(jobGraph, str, i, recordSerializerFactory, recordComparatorFactory);
        InputFormatVertex createEdgesInput = createEdgesInput(jobGraph, str2, i, recordSerializerFactory, recordComparatorFactory);
        JobVertex createIterationHead = createIterationHead(jobGraph, i, recordSerializerFactory, recordComparatorFactory, recordPairComparatorFactory);
        new TaskConfig(createIterationHead.getConfiguration()).setWaitForSolutionSetUpdate();
        JobVertex createIterationIntermediate = createIterationIntermediate(jobGraph, i, recordSerializerFactory, recordComparatorFactory);
        TaskConfig taskConfig = new TaskConfig(createIterationIntermediate.getConfiguration());
        OutputFormatVertex createOutput = createOutput(jobGraph, str3, i, recordSerializerFactory);
        JobVertex createSync = createSync(jobGraph, i, i2);
        JobVertex createTask = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "Solution Set Join", jobGraph, i);
        TaskConfig taskConfig2 = new TaskConfig(createTask.getConfiguration());
        taskConfig2.setIterationId(ITERATION_ID);
        taskConfig2.addInputToGroup(0);
        taskConfig2.setInputSerializer(recordSerializerFactory, 0);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig2.setOutputComparator(recordComparatorFactory, 0);
        taskConfig2.setOutputComparator(recordComparatorFactory, ITERATION_ID);
        taskConfig2.setOutputSerializer(recordSerializerFactory);
        taskConfig2.setDriver(JoinWithSolutionSetSecondDriver.class);
        taskConfig2.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        taskConfig2.setDriverComparator(recordComparatorFactory, 0);
        taskConfig2.setDriverPairComparator(recordPairComparatorFactory);
        taskConfig2.setStubWrapper(new UserCodeClassWrapper(WorksetConnectedComponents.UpdateComponentIdMatch.class));
        JobVertex createTask2 = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail", jobGraph, i);
        TaskConfig taskConfig3 = new TaskConfig(createTask2.getConfiguration());
        taskConfig3.setIterationId(ITERATION_ID);
        taskConfig3.setIsSolutionSetUpdate();
        taskConfig3.setIsWorksetIteration();
        taskConfig3.addInputToGroup(0);
        taskConfig3.setInputSerializer(recordSerializerFactory, 0);
        taskConfig3.setInputAsynchronouslyMaterialized(0, true);
        taskConfig3.setRelativeInputMaterializationMemory(0, MEM_FRAC_PER_CONSUMER);
        taskConfig3.setOutputSerializer(recordSerializerFactory);
        taskConfig3.setDriver(CollectorMapDriver.class);
        taskConfig3.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        taskConfig3.setStubWrapper(new UserCodeClassWrapper(DummyMapper.class));
        JobVertex createTask3 = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail", jobGraph, i);
        TaskConfig taskConfig4 = new TaskConfig(createTask3.getConfiguration());
        taskConfig4.setIterationId(ITERATION_ID);
        taskConfig4.setIsWorksetIteration();
        taskConfig4.setIsWorksetUpdate();
        taskConfig4.addInputToGroup(0);
        taskConfig4.setInputSerializer(recordSerializerFactory, 0);
        taskConfig4.setOutputSerializer(recordSerializerFactory);
        taskConfig4.setDriver(CollectorMapDriver.class);
        taskConfig4.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        taskConfig4.setStubWrapper(new UserCodeClassWrapper(DummyMapper.class));
        JobGraphUtils.connect(createVerticesInput, createIterationHead, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(createEdgesInput, createIterationHead, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(createVerticesInput, createIterationHead, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(createIterationHead, createIterationIntermediate, DistributionPattern.ALL_TO_ALL);
        taskConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, i);
        JobGraphUtils.connect(createIterationIntermediate, createTask, DistributionPattern.POINTWISE);
        taskConfig2.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createTask, createTask2, DistributionPattern.POINTWISE);
        taskConfig3.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createTask, createTask3, DistributionPattern.POINTWISE);
        taskConfig4.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createIterationHead, createOutput, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createIterationHead, createSync, DistributionPattern.POINTWISE);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        createVerticesInput.setSlotSharingGroup(slotSharingGroup);
        createEdgesInput.setSlotSharingGroup(slotSharingGroup);
        createIterationHead.setSlotSharingGroup(slotSharingGroup);
        createIterationIntermediate.setSlotSharingGroup(slotSharingGroup);
        createTask.setSlotSharingGroup(slotSharingGroup);
        createTask3.setSlotSharingGroup(slotSharingGroup);
        createTask2.setSlotSharingGroup(slotSharingGroup);
        createOutput.setSlotSharingGroup(slotSharingGroup);
        createSync.setSlotSharingGroup(slotSharingGroup);
        createIterationIntermediate.setStrictlyCoLocatedWith(createIterationHead);
        createTask.setStrictlyCoLocatedWith(createIterationHead);
        createTask3.setStrictlyCoLocatedWith(createIterationHead);
        createTask2.setStrictlyCoLocatedWith(createIterationHead);
        return jobGraph;
    }

    public JobGraph createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(String str, String str2, String str3, int i, int i2) {
        RecordSerializerFactory recordSerializerFactory = RecordSerializerFactory.get();
        RecordComparatorFactory recordComparatorFactory = new RecordComparatorFactory(new int[]{0}, new Class[]{LongValue.class}, new boolean[]{ITERATION_ID});
        RecordPairComparatorFactory recordPairComparatorFactory = RecordPairComparatorFactory.get();
        JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Workset Update, Solution Set Tail)");
        InputFormatVertex createVerticesInput = createVerticesInput(jobGraph, str, i, recordSerializerFactory, recordComparatorFactory);
        InputFormatVertex createEdgesInput = createEdgesInput(jobGraph, str2, i, recordSerializerFactory, recordComparatorFactory);
        JobVertex createIterationHead = createIterationHead(jobGraph, i, recordSerializerFactory, recordComparatorFactory, recordPairComparatorFactory);
        new TaskConfig(createIterationHead.getConfiguration()).setWaitForSolutionSetUpdate();
        JobVertex createIterationIntermediate = createIterationIntermediate(jobGraph, i, recordSerializerFactory, recordComparatorFactory);
        TaskConfig taskConfig = new TaskConfig(createIterationIntermediate.getConfiguration());
        OutputFormatVertex createOutput = createOutput(jobGraph, str3, i, recordSerializerFactory);
        JobVertex createSync = createSync(jobGraph, i, i2);
        JobVertex createTask = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "WorksetUpdate", jobGraph, i);
        TaskConfig taskConfig2 = new TaskConfig(createTask.getConfiguration());
        taskConfig2.setIterationId(ITERATION_ID);
        taskConfig2.setIsWorksetIteration();
        taskConfig2.setIsWorksetUpdate();
        taskConfig2.addInputToGroup(0);
        taskConfig2.setInputSerializer(recordSerializerFactory, 0);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig2.setOutputComparator(recordComparatorFactory, 0);
        taskConfig2.setOutputSerializer(recordSerializerFactory);
        taskConfig2.setDriver(JoinWithSolutionSetSecondDriver.class);
        taskConfig2.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        taskConfig2.setDriverComparator(recordComparatorFactory, 0);
        taskConfig2.setDriverPairComparator(recordPairComparatorFactory);
        taskConfig2.setStubWrapper(new UserCodeClassWrapper(WorksetConnectedComponents.UpdateComponentIdMatch.class));
        JobVertex createTask2 = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail", jobGraph, i);
        TaskConfig taskConfig3 = new TaskConfig(createTask2.getConfiguration());
        taskConfig3.setIterationId(ITERATION_ID);
        taskConfig3.setIsSolutionSetUpdate();
        taskConfig3.setIsWorksetIteration();
        taskConfig3.addInputToGroup(0);
        taskConfig3.setInputSerializer(recordSerializerFactory, 0);
        taskConfig3.setOutputSerializer(recordSerializerFactory);
        taskConfig3.setDriver(CollectorMapDriver.class);
        taskConfig3.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        taskConfig3.setStubWrapper(new UserCodeClassWrapper(DummyMapper.class));
        JobGraphUtils.connect(createVerticesInput, createIterationHead, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(createEdgesInput, createIterationHead, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(createVerticesInput, createIterationHead, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(createIterationHead, createIterationIntermediate, DistributionPattern.ALL_TO_ALL);
        taskConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, i);
        JobGraphUtils.connect(createIterationIntermediate, createTask, DistributionPattern.POINTWISE);
        taskConfig2.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createTask, createTask2, DistributionPattern.POINTWISE);
        taskConfig3.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createIterationHead, createOutput, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createIterationHead, createSync, DistributionPattern.POINTWISE);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        createVerticesInput.setSlotSharingGroup(slotSharingGroup);
        createEdgesInput.setSlotSharingGroup(slotSharingGroup);
        createIterationHead.setSlotSharingGroup(slotSharingGroup);
        createIterationIntermediate.setSlotSharingGroup(slotSharingGroup);
        createTask.setSlotSharingGroup(slotSharingGroup);
        createTask2.setSlotSharingGroup(slotSharingGroup);
        createOutput.setSlotSharingGroup(slotSharingGroup);
        createSync.setSlotSharingGroup(slotSharingGroup);
        createIterationIntermediate.setStrictlyCoLocatedWith(createIterationHead);
        createTask.setStrictlyCoLocatedWith(createIterationHead);
        createTask2.setStrictlyCoLocatedWith(createIterationHead);
        return jobGraph;
    }

    public JobGraph createJobGraphSolutionSetUpdateAndWorksetTail(String str, String str2, String str3, int i, int i2) {
        RecordSerializerFactory recordSerializerFactory = RecordSerializerFactory.get();
        RecordComparatorFactory recordComparatorFactory = new RecordComparatorFactory(new int[]{0}, new Class[]{LongValue.class}, new boolean[]{ITERATION_ID});
        RecordPairComparatorFactory recordPairComparatorFactory = RecordPairComparatorFactory.get();
        JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Solution Set Update, Workset Tail)");
        InputFormatVertex createVerticesInput = createVerticesInput(jobGraph, str, i, recordSerializerFactory, recordComparatorFactory);
        InputFormatVertex createEdgesInput = createEdgesInput(jobGraph, str2, i, recordSerializerFactory, recordComparatorFactory);
        JobVertex createIterationHead = createIterationHead(jobGraph, i, recordSerializerFactory, recordComparatorFactory, recordPairComparatorFactory);
        JobVertex createIterationIntermediate = createIterationIntermediate(jobGraph, i, recordSerializerFactory, recordComparatorFactory);
        TaskConfig taskConfig = new TaskConfig(createIterationIntermediate.getConfiguration());
        OutputFormatVertex createOutput = createOutput(jobGraph, str3, i, recordSerializerFactory);
        JobVertex createSync = createSync(jobGraph, i, i2);
        JobVertex createTask = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "Solution Set Update", jobGraph, i);
        TaskConfig taskConfig2 = new TaskConfig(createTask.getConfiguration());
        taskConfig2.setIterationId(ITERATION_ID);
        taskConfig2.setIsSolutionSetUpdate();
        taskConfig2.setIsSolutionSetUpdateWithoutReprobe();
        taskConfig2.addInputToGroup(0);
        taskConfig2.setInputSerializer(recordSerializerFactory, 0);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig2.setOutputComparator(recordComparatorFactory, 0);
        taskConfig2.setOutputSerializer(recordSerializerFactory);
        taskConfig2.setDriver(JoinWithSolutionSetSecondDriver.class);
        taskConfig2.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        taskConfig2.setDriverComparator(recordComparatorFactory, 0);
        taskConfig2.setDriverPairComparator(recordPairComparatorFactory);
        taskConfig2.setStubWrapper(new UserCodeClassWrapper(WorksetConnectedComponents.UpdateComponentIdMatch.class));
        JobVertex createTask2 = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail", jobGraph, i);
        TaskConfig taskConfig3 = new TaskConfig(createTask2.getConfiguration());
        taskConfig3.setIterationId(ITERATION_ID);
        taskConfig3.setIsWorksetIteration();
        taskConfig3.setIsWorksetUpdate();
        taskConfig3.addInputToGroup(0);
        taskConfig3.setInputSerializer(recordSerializerFactory, 0);
        taskConfig3.setOutputSerializer(recordSerializerFactory);
        taskConfig3.setDriver(CollectorMapDriver.class);
        taskConfig3.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        taskConfig3.setStubWrapper(new UserCodeClassWrapper(DummyMapper.class));
        JobGraphUtils.connect(createVerticesInput, createIterationHead, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(createEdgesInput, createIterationHead, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(createVerticesInput, createIterationHead, DistributionPattern.ALL_TO_ALL);
        JobGraphUtils.connect(createIterationHead, createIterationIntermediate, DistributionPattern.ALL_TO_ALL);
        taskConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, i);
        JobGraphUtils.connect(createIterationIntermediate, createTask, DistributionPattern.POINTWISE);
        taskConfig2.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createTask, createTask2, DistributionPattern.POINTWISE);
        taskConfig3.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createIterationHead, createOutput, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createIterationHead, createSync, DistributionPattern.POINTWISE);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        createVerticesInput.setSlotSharingGroup(slotSharingGroup);
        createEdgesInput.setSlotSharingGroup(slotSharingGroup);
        createIterationHead.setSlotSharingGroup(slotSharingGroup);
        createIterationIntermediate.setSlotSharingGroup(slotSharingGroup);
        createTask.setSlotSharingGroup(slotSharingGroup);
        createTask2.setSlotSharingGroup(slotSharingGroup);
        createOutput.setSlotSharingGroup(slotSharingGroup);
        createSync.setSlotSharingGroup(slotSharingGroup);
        createIterationIntermediate.setStrictlyCoLocatedWith(createIterationHead);
        createTask.setStrictlyCoLocatedWith(createIterationHead);
        createTask2.setStrictlyCoLocatedWith(createIterationHead);
        return jobGraph;
    }
}
