package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.class */
public class PartitionRequestClientTest {
    @Test
    public void testRetriggerPartitionRequest() throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        ChannelHandler partitionRequestClientHandler = new PartitionRequestClientHandler();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{partitionRequestClientHandler});
        PartitionRequestClient partitionRequestClient = new PartitionRequestClient(embeddedChannel, partitionRequestClientHandler, (ConnectionID) Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory) Mockito.mock(PartitionRequestClientFactory.class));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate createSingleInputGate = PartitionRequestClientHandlerTest.createSingleInputGate();
        RemoteInputChannel createRemoteInputChannel = PartitionRequestClientHandlerTest.createRemoteInputChannel(createSingleInputGate, partitionRequestClient, 1, 2);
        try {
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(6, 6));
            createSingleInputGate.assignExclusiveSegments(networkBufferPool, 2);
            createRemoteInputChannel.requestSubpartition(0);
            Assert.assertTrue(embeddedChannel.isWritable());
            Object readOutbound = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound, Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(createRemoteInputChannel.getInputChannelId(), ((NettyMessage.PartitionRequest) readOutbound).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) readOutbound).credit);
            createSingleInputGate.retriggerPartitionRequest(createRemoteInputChannel.getPartitionId().getPartitionId());
            runAllScheduledPendingTasks(embeddedChannel, currentTimeMillis);
            Object readOutbound2 = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound2, Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(createRemoteInputChannel.getInputChannelId(), ((NettyMessage.PartitionRequest) readOutbound2).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) readOutbound2).credit);
            createSingleInputGate.retriggerPartitionRequest(createRemoteInputChannel.getPartitionId().getPartitionId());
            runAllScheduledPendingTasks(embeddedChannel, currentTimeMillis);
            Object readOutbound3 = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound3, Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(createRemoteInputChannel.getInputChannelId(), ((NettyMessage.PartitionRequest) readOutbound3).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) readOutbound3).credit);
            Assert.assertNull(embeddedChannel.readOutbound());
            createSingleInputGate.releaseAllResources();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        } catch (Throwable th) {
            createSingleInputGate.releaseAllResources();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testDoublePartitionRequest() throws Exception {
        ChannelHandler partitionRequestClientHandler = new PartitionRequestClientHandler();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{partitionRequestClientHandler});
        PartitionRequestClient partitionRequestClient = new PartitionRequestClient(embeddedChannel, partitionRequestClientHandler, (ConnectionID) Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory) Mockito.mock(PartitionRequestClientFactory.class));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate createSingleInputGate = PartitionRequestClientHandlerTest.createSingleInputGate();
        RemoteInputChannel createRemoteInputChannel = PartitionRequestClientHandlerTest.createRemoteInputChannel(createSingleInputGate, partitionRequestClient);
        try {
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(6, 6));
            createSingleInputGate.assignExclusiveSegments(networkBufferPool, 2);
            createRemoteInputChannel.requestSubpartition(0);
            Assert.assertTrue(embeddedChannel.isWritable());
            Object readOutbound = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound, Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(createRemoteInputChannel.getInputChannelId(), ((NettyMessage.PartitionRequest) readOutbound).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) readOutbound).credit);
            Assert.assertNull(embeddedChannel.readOutbound());
            createSingleInputGate.releaseAllResources();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        } catch (Throwable th) {
            createSingleInputGate.releaseAllResources();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
            throw th;
        }
    }

    void runAllScheduledPendingTasks(EmbeddedChannel embeddedChannel, long j) throws InterruptedException {
        while (embeddedChannel.runScheduledPendingTasks() != -1 && System.currentTimeMillis() < j) {
            Thread.sleep(1L);
        }
    }
}
