package org.apache.flink.test.accumulators;

import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.io.TextInputFormat;
import org.apache.flink.api.java.record.operators.BulkIteration;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorIterativeITCase.class */
public class AccumulatorIterativeITCase extends RecordAPITestBase {
    private static final String INPUT = "1\n2\n3\n";
    private static final String EXPECTED = "6\n";
    private static final int NUM_ITERATIONS = 3;
    private static final int NUM_SUBTASKS = 1;
    protected String dataPath;
    protected String resultPath;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorIterativeITCase$SumReducer.class */
    public static final class SumReducer extends ReduceFunction implements Serializable {
        private static final long serialVersionUID = 1;
        private IntCounter testCounter = new IntCounter();

        SumReducer() {
        }

        public void reduce(Iterator<Record> it, Collector<Record> collector) {
            int i = 0;
            while (it.hasNext()) {
                Integer valueOf = Integer.valueOf(Integer.parseInt(it.next().getField(0, StringValue.class).getValue()));
                i += valueOf.intValue();
                this.testCounter.add(valueOf);
            }
            collector.collect(new Record(new StringValue(Integer.toString(i))));
        }

        public void close() throws Exception {
            super.close();
            getRuntimeContext().addAccumulator("test", this.testCounter);
        }
    }

    public AccumulatorIterativeITCase(Configuration configuration) {
        super(configuration);
    }

    protected void preSubmit() throws Exception {
        this.dataPath = createTempFile("datapoints.txt", INPUT);
        this.resultPath = getTempFilePath("result");
    }

    protected void postSubmit() throws Exception {
        compareResultsByLinesInMemory(EXPECTED, this.resultPath);
        Assert.assertEquals(18, (Integer) getJobExecutionResult().getAccumulatorResult("test"));
    }

    protected Plan getTestJob() {
        return getTestPlanPlan(this.config.getInteger("IterationAllReducer#NoSubtasks", NUM_SUBTASKS), this.dataPath, this.resultPath);
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Configuration configuration = new Configuration();
        configuration.setInteger("IterationAllReducer#NoSubtasks", NUM_SUBTASKS);
        return toParameterList(new Configuration[]{configuration});
    }

    static Plan getTestPlanPlan(int i, String str, String str2) {
        FileDataSource fileDataSource = new FileDataSource(TextInputFormat.class, str, "input");
        BulkIteration bulkIteration = new BulkIteration("Loop");
        bulkIteration.setInput(fileDataSource);
        bulkIteration.setMaximumNumberOfIterations(NUM_ITERATIONS);
        bulkIteration.setNextPartialSolution(ReduceOperator.builder(new SumReducer()).input(bulkIteration.getPartialSolution()).name("Compute sum (Reduce)").build());
        Plan plan = new Plan(new FileDataSink(new CsvOutputFormat("\n", " ", new Class[]{StringValue.class}), str2, bulkIteration, "Output"), "Iteration with AllReducer (keyless Reducer)");
        plan.setDefaultParallelism(i);
        return plan;
    }
}
