package org.apache.flink.runtime.io.network.partition;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.class */
public class PipelinedSubpartitionTest extends SubpartitionTestBase {
    private static final ExecutorService executorService = Executors.newCachedThreadPool();

    @AfterClass
    public static void shutdownExecutorService() throws Exception {
        executorService.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.SubpartitionTestBase
    /* renamed from: createSubpartition, reason: merged with bridge method [inline-methods] */
    public PipelinedSubpartition mo84createSubpartition() {
        return new PipelinedSubpartition(0, (ResultPartition) Mockito.mock(ResultPartition.class));
    }

    @Test
    public void testIllegalReadViewRequest() throws Exception {
        PipelinedSubpartition mo84createSubpartition = mo84createSubpartition();
        Assert.assertNotNull(mo84createSubpartition.createReadView((BufferProvider) null, new BufferAvailabilityListener() { // from class: org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest.1
            public void notifyBuffersAvailable(long j) {
            }
        }));
        try {
            mo84createSubpartition.createReadView((BufferProvider) null, new BufferAvailabilityListener() { // from class: org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest.2
                public void notifyBuffersAvailable(long j) {
                }
            });
            Assert.fail("Did not throw expected exception after duplicate notifyNonEmpty view request.");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void testBasicPipelinedProduceConsumeLogic() throws Exception {
        PipelinedSubpartition mo84createSubpartition = mo84createSubpartition();
        BufferAvailabilityListener bufferAvailabilityListener = (BufferAvailabilityListener) Mockito.mock(BufferAvailabilityListener.class);
        PipelinedSubpartitionView createReadView = mo84createSubpartition.createReadView((BufferProvider) null, bufferAvailabilityListener);
        Assert.assertNull(createReadView.getNextBuffer());
        ((BufferAvailabilityListener) Mockito.verify(bufferAvailabilityListener, Mockito.times(1))).notifyBuffersAvailable(Matchers.eq(0L));
        mo84createSubpartition.add(TestBufferFactory.createBuffer());
        ((BufferAvailabilityListener) Mockito.verify(bufferAvailabilityListener, Mockito.times(1))).notifyBuffersAvailable(Matchers.eq(1L));
        Assert.assertNotNull(createReadView.getNextBuffer());
        Assert.assertNull(createReadView.getNextBuffer());
        mo84createSubpartition.add(TestBufferFactory.createBuffer());
        ((BufferAvailabilityListener) Mockito.verify(bufferAvailabilityListener, Mockito.times(2))).notifyBuffersAvailable(Matchers.eq(1L));
    }

    @Test
    public void testConcurrentFastProduceAndFastConsume() throws Exception {
        testProduceConsume(false, false);
    }

    @Test
    public void testConcurrentFastProduceAndSlowConsume() throws Exception {
        testProduceConsume(false, true);
    }

    @Test
    public void testConcurrentSlowProduceAndFastConsume() throws Exception {
        testProduceConsume(true, false);
    }

    @Test
    public void testConcurrentSlowProduceAndSlowConsume() throws Exception {
        testProduceConsume(true, true);
    }

    @Test
    public void testIsReleasedChecksParent() throws Exception {
        PipelinedSubpartition pipelinedSubpartition = (PipelinedSubpartition) Mockito.mock(PipelinedSubpartition.class);
        PipelinedSubpartitionView pipelinedSubpartitionView = new PipelinedSubpartitionView(pipelinedSubpartition, (BufferAvailabilityListener) Mockito.mock(BufferAvailabilityListener.class));
        Assert.assertFalse(pipelinedSubpartitionView.isReleased());
        ((PipelinedSubpartition) Mockito.verify(pipelinedSubpartition, Mockito.times(1))).isReleased();
        Mockito.when(Boolean.valueOf(pipelinedSubpartition.isReleased())).thenReturn(true);
        Assert.assertTrue(pipelinedSubpartitionView.isReleased());
        ((PipelinedSubpartition) Mockito.verify(pipelinedSubpartition, Mockito.times(2))).isReleased();
    }

    private void testProduceConsume(boolean z, boolean z2) throws Exception {
        TestProducerSource testProducerSource = new TestProducerSource() { // from class: org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest.3
            private BufferProvider bufferProvider = new TestPooledBufferProvider(8);
            private int numberOfBuffers;

            @Override // org.apache.flink.runtime.io.network.util.TestProducerSource
            public BufferOrEvent getNextBufferOrEvent() throws Exception {
                if (this.numberOfBuffers == 128) {
                    return null;
                }
                Buffer requestBufferBlocking = this.bufferProvider.requestBufferBlocking();
                MemorySegment memorySegment = requestBufferBlocking.getMemorySegment();
                int size = this.numberOfBuffers * (memorySegment.size() / 4);
                for (int i = 0; i < memorySegment.size(); i += 4) {
                    memorySegment.putInt(i, size);
                    size++;
                }
                this.numberOfBuffers++;
                return new BufferOrEvent(requestBufferBlocking, 0);
            }
        };
        TestConsumerCallback testConsumerCallback = new TestConsumerCallback() { // from class: org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest.4
            private int numberOfBuffers;

            @Override // org.apache.flink.runtime.io.network.util.TestConsumerCallback
            public void onBuffer(Buffer buffer) {
                MemorySegment memorySegment = buffer.getMemorySegment();
                int size = this.numberOfBuffers * (memorySegment.size() / 4);
                for (int i = 0; i < memorySegment.size(); i += 4) {
                    Assert.assertEquals(size, memorySegment.getInt(i));
                    size++;
                }
                this.numberOfBuffers++;
                buffer.recycle();
            }

            @Override // org.apache.flink.runtime.io.network.util.TestConsumerCallback
            public void onEvent(AbstractEvent abstractEvent) {
            }
        };
        PipelinedSubpartition mo84createSubpartition = mo84createSubpartition();
        TestSubpartitionConsumer testSubpartitionConsumer = new TestSubpartitionConsumer(z2, testConsumerCallback);
        testSubpartitionConsumer.setSubpartitionView(mo84createSubpartition.createReadView((BufferProvider) null, testSubpartitionConsumer));
        Future submit = executorService.submit(new TestSubpartitionProducer(mo84createSubpartition, z, testProducerSource));
        Future submit2 = executorService.submit(testSubpartitionConsumer);
        submit.get();
        submit2.get();
    }
}
