package org.apache.flink.runtime.jobmanager;

import java.util.BitSet;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest.class */
public class LegacySlotCountExceedingParallelismTest extends TestLogger {
    private static final int NUMBER_OF_TMS = 2;
    private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    private static final int PARALLELISM = 4;
    public static final String JOB_NAME = "SlotCountExceedingParallelismTest (no slot sharing, blocking results)";
    private static TestingCluster flink;

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest$RoundRobinSubtaskIndexSender.class */
    public static class RoundRobinSubtaskIndexSender extends AbstractInvokable {
        public static final String CONFIG_KEY = "number-of-times-to-send";

        public RoundRobinSubtaskIndexSender(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            RecordWriter recordWriter = new RecordWriter(getEnvironment().getWriter(0));
            int integer = getTaskConfiguration().getInteger("number-of-times-to-send", 0);
            IntValue intValue = new IntValue(getEnvironment().getTaskInfo().getIndexOfThisSubtask());
            for (int i = 0; i < integer; i++) {
                try {
                    recordWriter.emit(intValue);
                } catch (Throwable th) {
                    recordWriter.clearBuffers();
                    throw th;
                }
            }
            recordWriter.flushAll();
            recordWriter.clearBuffers();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest$SubtaskIndexReceiver.class */
    public static class SubtaskIndexReceiver extends AbstractInvokable {
        public static final String CONFIG_KEY = "number-of-indexes-to-receive";

        public SubtaskIndexReceiver(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            RecordReader recordReader = new RecordReader(getEnvironment().getInputGate(0), IntValue.class, getEnvironment().getTaskManagerInfo().getTmpDirectories());
            try {
                int integer = getTaskConfiguration().getInteger("number-of-indexes-to-receive", 0);
                BitSet bitSet = new BitSet(integer);
                int i = 0;
                while (true) {
                    IntValue next = recordReader.next();
                    if (next == null) {
                        if (bitSet.cardinality() != integer) {
                            throw new IllegalStateException("Finished receive, but did not receive all expected subtask indexes.");
                        }
                        return;
                    }
                    i++;
                    if (i > integer) {
                        throw new IllegalStateException("Received more records than expected.");
                    }
                    int value = next.getValue();
                    if (bitSet.get(value)) {
                        throw new IllegalStateException("Received expected subtask index twice.");
                    }
                    bitSet.set(value, true);
                }
            } finally {
                recordReader.clearBuffers();
            }
        }
    }

    @BeforeClass
    public static void setUp() throws Exception {
        flink = TestingUtils.startTestingCluster(2, 2, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (flink != null) {
            flink.stop();
        }
    }

    @Test
    public void testNoSlotSharingAndBlockingResultSender() throws Exception {
        submitJobGraphAndWait(createTestJobGraph("SlotCountExceedingParallelismTest (no slot sharing, blocking results)", 8, PARALLELISM));
    }

    @Test
    public void testNoSlotSharingAndBlockingResultReceiver() throws Exception {
        submitJobGraphAndWait(createTestJobGraph("SlotCountExceedingParallelismTest (no slot sharing, blocking results)", PARALLELISM, 8));
    }

    @Test
    public void testNoSlotSharingAndBlockingResultBoth() throws Exception {
        submitJobGraphAndWait(createTestJobGraph("SlotCountExceedingParallelismTest (no slot sharing, blocking results)", 8, 8));
    }

    private void submitJobGraphAndWait(JobGraph jobGraph) throws JobExecutionException {
        flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
    }

    private JobGraph createTestJobGraph(String str, int i, int i2) {
        JobVertex jobVertex = new JobVertex("Sender");
        jobVertex.setInvokableClass(RoundRobinSubtaskIndexSender.class);
        jobVertex.getConfiguration().setInteger("number-of-times-to-send", i2);
        jobVertex.setParallelism(i);
        JobVertex jobVertex2 = new JobVertex("Receiver");
        jobVertex2.setInvokableClass(SubtaskIndexReceiver.class);
        jobVertex2.getConfiguration().setInteger("number-of-indexes-to-receive", i);
        jobVertex2.setParallelism(i2);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        JobGraph jobGraph = new JobGraph(str, new JobVertex[]{jobVertex, jobVertex2});
        jobGraph.setAllowQueuedScheduling(true);
        return jobGraph;
    }
}
