package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.class */
public class PipelinedSubpartitionWithReadViewTest {
    private PipelinedSubpartition subpartition;
    private AwaitableBufferAvailablityListener availablityListener;
    private PipelinedSubpartitionView readView;

    @Before
    public void setup() throws IOException {
        this.subpartition = new PipelinedSubpartition(0, (ResultPartition) Mockito.mock(ResultPartition.class));
        this.availablityListener = new AwaitableBufferAvailablityListener();
        this.readView = this.subpartition.createReadView(this.availablityListener);
    }

    @After
    public void tearDown() {
        this.readView.releaseAllResources();
        this.subpartition.release();
    }

    @Test(expected = IllegalStateException.class)
    public void testAddTwoNonFinishedBuffer() {
        this.subpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
        this.subpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
        Assert.assertNull(this.readView.getNextBuffer());
    }

    @Test
    public void testAddEmptyNonFinishedBuffer() {
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        BufferBuilder createBufferBuilder = BufferBuilderTestUtils.createBufferBuilder();
        this.subpartition.add(createBufferBuilder.createBufferConsumer());
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        Assert.assertNull(this.readView.getNextBuffer());
        createBufferBuilder.finish();
        this.subpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
        Assert.assertNull(this.readView.getNextBuffer());
        Assert.assertEquals(1L, this.subpartition.getBuffersInBacklog());
    }

    @Test
    public void testAddNonEmptyNotFinishedBuffer() throws Exception {
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        BufferBuilder createBufferBuilder = BufferBuilderTestUtils.createBufferBuilder();
        createBufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
        this.subpartition.add(createBufferBuilder.createBufferConsumer());
        SubpartitionTestBase.assertNextBuffer(this.readView, 1024, false, 1, false, false);
        Assert.assertEquals(1L, this.subpartition.getBuffersInBacklog());
    }

    @Test
    public void testUnfinishedBufferBehindFinished() throws Exception {
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferBuilder(1024).createBufferConsumer());
        MatcherAssert.assertThat(Long.valueOf(this.availablityListener.getNumNotifications()), Matchers.greaterThan(0L));
        SubpartitionTestBase.assertNextBuffer(this.readView, 1025, false, 1, false, true);
        SubpartitionTestBase.assertNextBuffer(this.readView, 1024, false, 1, false, false);
        SubpartitionTestBase.assertNoNextBuffer(this.readView);
    }

    @Test
    public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferBuilder(1024).createBufferConsumer());
        long numNotifications = this.availablityListener.getNumNotifications();
        this.subpartition.flush();
        MatcherAssert.assertThat(Long.valueOf(numNotifications), Matchers.greaterThan(0L));
        Assert.assertEquals(numNotifications, this.availablityListener.getNumNotifications());
        SubpartitionTestBase.assertNextBuffer(this.readView, 1025, true, 1, false, true);
        SubpartitionTestBase.assertNextBuffer(this.readView, 1024, false, 1, false, false);
        SubpartitionTestBase.assertNoNextBuffer(this.readView);
    }

    @Test
    public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
        this.subpartition.flush();
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferBuilder(1024).createBufferConsumer());
        SubpartitionTestBase.assertNextBuffer(this.readView, 1025, false, 1, false, true);
        long numNotifications = this.availablityListener.getNumNotifications();
        this.subpartition.flush();
        Assert.assertEquals(numNotifications + 1, this.availablityListener.getNumNotifications());
        this.subpartition.flush();
        Assert.assertEquals(numNotifications + 1, this.availablityListener.getNumNotifications());
        SubpartitionTestBase.assertNextBuffer(this.readView, 1024, false, 1, false, false);
        SubpartitionTestBase.assertNoNextBuffer(this.readView);
    }

    @Test
    public void testMultipleEmptyBuffers() throws Exception {
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(0));
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(0));
        Assert.assertEquals(2L, this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(0));
        Assert.assertEquals(2L, this.availablityListener.getNumNotifications());
        Assert.assertEquals(3L, this.subpartition.getBuffersInBacklog());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(1024));
        Assert.assertEquals(2L, this.availablityListener.getNumNotifications());
        SubpartitionTestBase.assertNextBuffer(this.readView, 1024, false, 0, false, true);
    }

    @Test
    public void testEmptyFlush() {
        this.subpartition.flush();
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
    }

    @Test
    public void testBasicPipelinedProduceConsumeLogic() throws Exception {
        Assert.assertFalse(this.readView.nextBufferIsEvent());
        SubpartitionTestBase.assertNoNextBuffer(this.readView);
        Assert.assertFalse(this.readView.nextBufferIsEvent());
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(32768));
        Assert.assertFalse(this.readView.nextBufferIsEvent());
        Assert.assertEquals(1L, this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(1L, this.subpartition.getBuffersInBacklog());
        Assert.assertEquals(0L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
        SubpartitionTestBase.assertNextBuffer(this.readView, 32768, false, this.subpartition.getBuffersInBacklog() - 1, false, true);
        Assert.assertEquals(32768L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        SubpartitionTestBase.assertNoNextBuffer(this.readView);
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(32768));
        Assert.assertFalse(this.readView.nextBufferIsEvent());
        Assert.assertEquals(2L, this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(1L, this.subpartition.getBuffersInBacklog());
        Assert.assertEquals(32768L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(2L, this.availablityListener.getNumNotifications());
        SubpartitionTestBase.assertNextBuffer(this.readView, 32768, false, this.subpartition.getBuffersInBacklog() - 1, false, true);
        Assert.assertEquals(65536L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        SubpartitionTestBase.assertNoNextBuffer(this.readView);
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(32768));
        Assert.assertFalse(this.readView.nextBufferIsEvent());
        this.subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(32768));
        Assert.assertFalse(this.readView.nextBufferIsEvent());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(32768));
        Assert.assertFalse(this.readView.nextBufferIsEvent());
        Assert.assertEquals(5L, this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(2L, this.subpartition.getBuffersInBacklog());
        Assert.assertEquals(65536L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(4L, this.availablityListener.getNumNotifications());
        SubpartitionTestBase.assertNextBuffer(this.readView, 32768, true, this.subpartition.getBuffersInBacklog() - 1, true, true);
        Assert.assertEquals(98304L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(1L, this.subpartition.getBuffersInBacklog());
        SubpartitionTestBase.assertNextEvent(this.readView, 32768, null, true, this.subpartition.getBuffersInBacklog(), false, true);
        Assert.assertEquals(131072L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(1L, this.subpartition.getBuffersInBacklog());
        SubpartitionTestBase.assertNextBuffer(this.readView, 32768, false, this.subpartition.getBuffersInBacklog() - 1, false, true);
        Assert.assertEquals(163840L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        SubpartitionTestBase.assertNoNextBuffer(this.readView);
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        Assert.assertEquals(5L, this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(163840L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(4L, this.availablityListener.getNumNotifications());
    }
}
