package org.apache.flink.runtime.io.network.partition;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.class */
public class SpillableSubpartitionTest extends SubpartitionTestBase {
    private static final ExecutorService executorService = Executors.newCachedThreadPool();
    private static final IOManager ioManager = new IOManagerAsync();

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest$AwaitableBufferAvailablityListener.class */
    private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener {
        private long numNotifiedBuffers;

        private AwaitableBufferAvailablityListener() {
        }

        public void notifyBuffersAvailable(long j) {
            this.numNotifiedBuffers += j;
        }

        long getNumNotifiedBuffers() {
            return this.numNotifiedBuffers;
        }

        void awaitNotifications(long j, long j2) throws InterruptedException {
            long currentTimeMillis = System.currentTimeMillis() + j2;
            while (this.numNotifiedBuffers < j && System.currentTimeMillis() < currentTimeMillis) {
                Thread.sleep(1L);
            }
        }
    }

    @AfterClass
    public static void shutdown() {
        executorService.shutdownNow();
        ioManager.shutdown();
    }

    @Override // org.apache.flink.runtime.io.network.partition.SubpartitionTestBase
    /* renamed from: createSubpartition */
    ResultSubpartition mo71createSubpartition() {
        return new SpillableSubpartition(0, (ResultPartition) Mockito.mock(ResultPartition.class), ioManager);
    }

    @Test
    public void testConcurrentFinishAndReleaseMemory() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AsynchronousBufferFileWriter asynchronousBufferFileWriter = (AsynchronousBufferFileWriter) Mockito.mock(AsynchronousBufferFileWriter.class);
        ((AsynchronousBufferFileWriter) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.io.network.partition.SpillableSubpartitionTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m73answer(InvocationOnMock invocationOnMock) throws Throwable {
                countDownLatch2.countDown();
                countDownLatch.await();
                return null;
            }
        }).when(asynchronousBufferFileWriter)).close();
        IOManager iOManager = (IOManager) Mockito.mock(IOManager.class);
        Mockito.when(iOManager.createBufferFileWriter((FileIOChannel.ID) Matchers.any(FileIOChannel.ID.class))).thenReturn(asynchronousBufferFileWriter);
        final SpillableSubpartition spillableSubpartition = new SpillableSubpartition(0, (ResultPartition) Mockito.mock(ResultPartition.class), iOManager);
        Assert.assertEquals(0L, spillableSubpartition.releaseMemory());
        Future submit = Executors.newSingleThreadExecutor().submit(new Callable<Void>() { // from class: org.apache.flink.runtime.io.network.partition.SpillableSubpartitionTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                spillableSubpartition.finish();
                return null;
            }
        });
        countDownLatch2.await();
        spillableSubpartition.releaseMemory();
        countDownLatch.countDown();
        submit.get();
    }

    @Test
    public void testReleasePartitionAndGetNext() throws Exception {
        SpillableSubpartition spillableSubpartition = new SpillableSubpartition(0, (ResultPartition) Mockito.mock(ResultPartition.class), ioManager);
        spillableSubpartition.finish();
        ResultSubpartitionView resultSubpartitionView = (ResultSubpartitionView) Mockito.spy(spillableSubpartition.createReadView(new TestInfiniteBufferProvider(), new BufferAvailabilityListener() { // from class: org.apache.flink.runtime.io.network.partition.SpillableSubpartitionTest.3
            public void notifyBuffersAvailable(long j) {
            }
        }));
        ((ResultSubpartitionView) Mockito.doNothing().when(resultSubpartitionView)).releaseAllResources();
        spillableSubpartition.release();
        Assert.assertNull(resultSubpartitionView.getNextBuffer());
    }

    @Test
    public void testConsumeSpilledPartition() throws Exception {
        SpillableSubpartition spillableSubpartition = new SpillableSubpartition(0, (ResultPartition) Mockito.mock(ResultPartition.class), ioManager);
        Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
        buffer.retain();
        buffer.retain();
        spillableSubpartition.add(buffer);
        spillableSubpartition.add(buffer);
        spillableSubpartition.add(buffer);
        Assert.assertEquals(3L, spillableSubpartition.releaseMemory());
        spillableSubpartition.finish();
        BufferAvailabilityListener bufferAvailabilityListener = (BufferAvailabilityListener) Mockito.mock(BufferAvailabilityListener.class);
        SpilledSubpartitionView createReadView = spillableSubpartition.createReadView(new TestInfiniteBufferProvider(), bufferAvailabilityListener);
        ((BufferAvailabilityListener) Mockito.verify(bufferAvailabilityListener, Mockito.times(1))).notifyBuffersAvailable(Matchers.eq(4L));
        Buffer nextBuffer = createReadView.getNextBuffer();
        Assert.assertNotNull(nextBuffer);
        nextBuffer.recycle();
        Buffer nextBuffer2 = createReadView.getNextBuffer();
        Assert.assertNotNull(nextBuffer2);
        nextBuffer2.recycle();
        Buffer nextBuffer3 = createReadView.getNextBuffer();
        Assert.assertNotNull(nextBuffer3);
        nextBuffer3.recycle();
        Buffer nextBuffer4 = createReadView.getNextBuffer();
        Assert.assertNotNull(nextBuffer4);
        Assert.assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(nextBuffer4, ClassLoader.getSystemClassLoader()).getClass());
        nextBuffer4.recycle();
    }

    @Test
    public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception {
        SpillableSubpartition spillableSubpartition = new SpillableSubpartition(0, (ResultPartition) Mockito.mock(ResultPartition.class), ioManager);
        Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
        buffer.retain();
        buffer.retain();
        spillableSubpartition.add(buffer);
        spillableSubpartition.add(buffer);
        spillableSubpartition.add(buffer);
        spillableSubpartition.finish();
        AwaitableBufferAvailablityListener awaitableBufferAvailablityListener = new AwaitableBufferAvailablityListener();
        SpillableSubpartitionView createReadView = spillableSubpartition.createReadView(new TestInfiniteBufferProvider(), awaitableBufferAvailablityListener);
        Assert.assertEquals(1L, awaitableBufferAvailablityListener.getNumNotifiedBuffers());
        Buffer nextBuffer = createReadView.getNextBuffer();
        Assert.assertNotNull(nextBuffer);
        nextBuffer.recycle();
        Assert.assertEquals(2L, awaitableBufferAvailablityListener.getNumNotifiedBuffers());
        Assert.assertEquals(2L, spillableSubpartition.releaseMemory());
        awaitableBufferAvailablityListener.awaitNotifications(4L, 30000L);
        Assert.assertEquals(4L, awaitableBufferAvailablityListener.getNumNotifiedBuffers());
        Buffer nextBuffer2 = createReadView.getNextBuffer();
        Assert.assertNotNull(nextBuffer2);
        nextBuffer2.recycle();
        Buffer nextBuffer3 = createReadView.getNextBuffer();
        Assert.assertNotNull(nextBuffer3);
        nextBuffer3.recycle();
        Buffer nextBuffer4 = createReadView.getNextBuffer();
        Assert.assertNotNull(nextBuffer4);
        Assert.assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(nextBuffer4, ClassLoader.getSystemClassLoader()).getClass());
        nextBuffer4.recycle();
    }
}
