package org.apache.flink.runtime.query;

import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import java.net.ConnectException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateClient;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.query.netty.UnknownKvStateID;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapValueState;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.MathUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/query/QueryableStateClientTest.class */
public class QueryableStateClientTest {
    private static final ActorSystem testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    private static final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS);

    @AfterClass
    public static void tearDown() throws Exception {
        if (testActorSystem != null) {
            testActorSystem.shutdown();
        }
    }

    @Test
    public void testForceLookupOnOutdatedLocation() throws Exception {
        KvStateLocationLookupService kvStateLocationLookupService = (KvStateLocationLookupService) Mockito.mock(KvStateLocationLookupService.class);
        KvStateClient kvStateClient = (KvStateClient) Mockito.mock(KvStateClient.class);
        QueryableStateClient queryableStateClient = new QueryableStateClient(kvStateLocationLookupService, kvStateClient, testActorSystem.dispatcher());
        try {
            JobID jobID = new JobID();
            Mockito.when(kvStateLocationLookupService.getKvStateLookupInfo((JobID) Matchers.eq(jobID), (String) Matchers.eq("lucky"))).thenReturn(Futures.failed(new UnknownKvStateLocation("lucky")));
            try {
                Await.result(queryableStateClient.getKvState(jobID, "lucky", 0, new byte[0]), timeout);
                Assert.fail("Did not throw expected UnknownKvStateLocation exception");
            } catch (UnknownKvStateLocation e) {
            }
            ((KvStateLocationLookupService) Mockito.verify(kvStateLocationLookupService, Mockito.times(2))).getKvStateLookupInfo((JobID) Matchers.eq(jobID), (String) Matchers.eq("lucky"));
            Mockito.when(kvStateLocationLookupService.getKvStateLookupInfo((JobID) Matchers.eq(jobID), (String) Matchers.eq("unlucky"))).thenReturn(Futures.successful(new KvStateLocation(jobID, new JobVertexID(), 4, "unlucky")));
            try {
                Await.result(queryableStateClient.getKvState(jobID, "unlucky", 0, new byte[0]), timeout);
                Assert.fail("Did not throw expected UnknownKvStateKeyGroupLocation exception");
            } catch (UnknownKvStateKeyGroupLocation e2) {
            }
            ((KvStateLocationLookupService) Mockito.verify(kvStateLocationLookupService, Mockito.times(2))).getKvStateLookupInfo((JobID) Matchers.eq(jobID), (String) Matchers.eq("unlucky"));
            KvStateID kvStateID = new KvStateID();
            Future failed = Futures.failed(new UnknownKvStateID(kvStateID));
            KvStateServerAddress kvStateServerAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 12323);
            KvStateLocation kvStateLocation = new KvStateLocation(jobID, new JobVertexID(), 4, "water");
            for (int i = 0; i < 4; i++) {
                kvStateLocation.registerKvState(new KeyGroupRange(i, i), kvStateID, kvStateServerAddress);
            }
            Mockito.when(kvStateLocationLookupService.getKvStateLookupInfo((JobID) Matchers.eq(jobID), (String) Matchers.eq("water"))).thenReturn(Futures.successful(kvStateLocation));
            Mockito.when(kvStateClient.getKvState((KvStateServerAddress) Matchers.eq(kvStateServerAddress), (KvStateID) Matchers.eq(kvStateID), (byte[]) Matchers.any(byte[].class))).thenReturn(failed);
            try {
                Await.result(queryableStateClient.getKvState(jobID, "water", 0, new byte[0]), timeout);
                Assert.fail("Did not throw expected UnknownKvStateID exception");
            } catch (UnknownKvStateID e3) {
            }
            ((KvStateLocationLookupService) Mockito.verify(kvStateLocationLookupService, Mockito.times(2))).getKvStateLookupInfo((JobID) Matchers.eq(jobID), (String) Matchers.eq("water"));
            Future failed2 = Futures.failed(new ConnectException());
            KvStateID kvStateID2 = new KvStateID();
            KvStateServerAddress kvStateServerAddress2 = new KvStateServerAddress(InetAddress.getLocalHost(), 11123);
            KvStateLocation kvStateLocation2 = new KvStateLocation(jobID, new JobVertexID(), 4, "space");
            for (int i2 = 0; i2 < 4; i2++) {
                kvStateLocation2.registerKvState(new KeyGroupRange(i2, i2), kvStateID2, kvStateServerAddress2);
            }
            Mockito.when(kvStateLocationLookupService.getKvStateLookupInfo((JobID) Matchers.eq(jobID), (String) Matchers.eq("space"))).thenReturn(Futures.successful(kvStateLocation2));
            Mockito.when(kvStateClient.getKvState((KvStateServerAddress) Matchers.eq(kvStateServerAddress2), (KvStateID) Matchers.eq(kvStateID2), (byte[]) Matchers.any(byte[].class))).thenReturn(failed2);
            try {
                Await.result(queryableStateClient.getKvState(jobID, "space", 0, new byte[0]), timeout);
                Assert.fail("Did not throw expected ConnectException exception");
            } catch (ConnectException e4) {
            }
            ((KvStateLocationLookupService) Mockito.verify(kvStateLocationLookupService, Mockito.times(2))).getKvStateLookupInfo((JobID) Matchers.eq(jobID), (String) Matchers.eq("space"));
            Mockito.when(kvStateLocationLookupService.getKvStateLookupInfo((JobID) Matchers.eq(jobID), (String) Matchers.eq("universe"))).thenReturn(Futures.failed(new RuntimeException("Test exception")));
            queryableStateClient.getKvState(jobID, "universe", 0, new byte[0]);
            ((KvStateLocationLookupService) Mockito.verify(kvStateLocationLookupService, Mockito.times(1))).getKvStateLookupInfo((JobID) Matchers.eq(jobID), (String) Matchers.eq("universe"));
            queryableStateClient.shutDown();
        } catch (Throwable th) {
            queryableStateClient.shutDown();
            throw th;
        }
    }

    @Test
    public void testIntegrationWithKvStateServer() throws Exception {
        int i;
        int i2;
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        KvStateServer[] kvStateServerArr = new KvStateServer[2];
        KvStateRequestStats[] kvStateRequestStatsArr = new AtomicKvStateRequestStats[2];
        QueryableStateClient queryableStateClient = null;
        KvStateClient kvStateClient = null;
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        AbstractKeyedStateBackend createKeyedStateBackend = new MemoryStateBackend().createKeyedStateBackend(new DummyEnvironment("test", 1, 0), new JobID(), "test_op", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
        try {
            KvStateRegistry[] kvStateRegistryArr = new KvStateRegistry[2];
            KvStateID[] kvStateIDArr = new KvStateID[2];
            ArrayList arrayList = new ArrayList();
            for (int i3 = 0; i3 < 2; i3++) {
                kvStateRegistryArr[i3] = new KvStateRegistry();
                kvStateRequestStatsArr[i3] = new AtomicKvStateRequestStats();
                kvStateServerArr[i3] = new KvStateServer(InetAddress.getLocalHost(), 0, 1, 1, kvStateRegistryArr[i3], kvStateRequestStatsArr[i3]);
                kvStateServerArr[i3].start();
                ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", IntSerializer.INSTANCE);
                HeapValueState heapValueState = new HeapValueState(createKeyedStateBackend, valueStateDescriptor, new StateTable(new RegisteredBackendStateMetaInfo(valueStateDescriptor.getType(), valueStateDescriptor.getName(), VoidNamespaceSerializer.INSTANCE, IntSerializer.INSTANCE), new KeyGroupRange(0, 1)), IntSerializer.INSTANCE, VoidNamespaceSerializer.INSTANCE);
                arrayList.add(heapValueState);
                kvStateIDArr[i3] = kvStateRegistryArr[i3].registerKvState(jobID, new JobVertexID(), new KeyGroupRange(i3, i3), "choco", heapValueState);
            }
            int[] iArr = new int[2];
            for (int i4 = 0; i4 < 1024; i4++) {
                int murmurHash = MathUtils.murmurHash(i4) % 2;
                iArr[murmurHash] = iArr[murmurHash] + 1;
                HeapValueState heapValueState2 = (HeapValueState) arrayList.get(murmurHash);
                createKeyedStateBackend.setCurrentKey(Integer.valueOf(i4));
                heapValueState2.setCurrentNamespace(VoidNamespace.INSTANCE);
                heapValueState2.update(Integer.valueOf(1337 + i4));
            }
            KvStateLocation kvStateLocation = new KvStateLocation(jobID, jobVertexID, 2, "choco");
            for (int i5 = 0; i5 < 2; i5++) {
                kvStateLocation.registerKvState(new KeyGroupRange(i5, i5), kvStateIDArr[i5], kvStateServerArr[i5].getAddress());
            }
            KvStateLocationLookupService kvStateLocationLookupService = (KvStateLocationLookupService) Mockito.mock(KvStateLocationLookupService.class);
            Mockito.when(kvStateLocationLookupService.getKvStateLookupInfo((JobID) Matchers.eq(jobID), (String) Matchers.eq("choco"))).thenReturn(Futures.successful(kvStateLocation));
            kvStateClient = new KvStateClient(1, atomicKvStateRequestStats);
            queryableStateClient = new QueryableStateClient(kvStateLocationLookupService, kvStateClient, testActorSystem.dispatcher());
            ArrayList arrayList2 = new ArrayList(1024);
            for (int i6 = 0; i6 < 1024; i6++) {
                arrayList2.add(queryableStateClient.getKvState(jobID, "choco", i6, KvStateRequestSerializer.serializeKeyAndNamespace(Integer.valueOf(i6), IntSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE)));
            }
            int i7 = 0;
            Iterator it = ((Iterable) Await.result(Futures.sequence(arrayList2, testActorSystem.dispatcher()), timeout)).iterator();
            while (it.hasNext()) {
                Assert.assertEquals(1337 + i7, ((Integer) KvStateRequestSerializer.deserializeValue((byte[]) it.next(), IntSerializer.INSTANCE)).intValue());
                i7++;
            }
            for (int i8 = 0; i8 < 2; i8++) {
                int i9 = 0;
                while (i9 < 10) {
                    try {
                        Assert.assertEquals("Unexpected number of requests", iArr[i8], kvStateRequestStatsArr[i8].getNumRequests());
                        Assert.assertEquals("Unexpected success requests", iArr[i8], kvStateRequestStatsArr[i8].getNumSuccessful());
                        Assert.assertEquals("Unexpected failed requests", 0L, kvStateRequestStatsArr[i8].getNumFailed());
                        break;
                    } finally {
                        if (i == i2) {
                        }
                    }
                }
            }
            if (queryableStateClient != null) {
                queryableStateClient.shutDown();
            }
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            for (KvStateServer kvStateServer : kvStateServerArr) {
                if (kvStateServer != null) {
                    kvStateServer.shutDown();
                }
            }
        } catch (Throwable th) {
            if (queryableStateClient != null) {
                queryableStateClient.shutDown();
            }
            if (kvStateClient != null) {
                kvStateClient.shutDown();
            }
            for (KvStateServer kvStateServer2 : kvStateServerArr) {
                if (kvStateServer2 != null) {
                    kvStateServer2.shutDown();
                }
            }
            throw th;
        }
    }

    @Test
    public void testLookupMultipleJobIds() throws Exception {
        KvStateLocation kvStateLocation = new KvStateLocation(new JobID(), new JobVertexID(), 1, "unique-per-job");
        kvStateLocation.registerKvState(new KeyGroupRange(0, 0), new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892));
        JobID jobID = new JobID();
        JobID jobID2 = new JobID();
        KvStateLocationLookupService kvStateLocationLookupService = (KvStateLocationLookupService) Mockito.mock(KvStateLocationLookupService.class);
        Mockito.when(kvStateLocationLookupService.getKvStateLookupInfo((JobID) Matchers.any(JobID.class), Matchers.anyString())).thenReturn(Futures.successful(kvStateLocation));
        KvStateClient kvStateClient = (KvStateClient) Mockito.mock(KvStateClient.class);
        Mockito.when(kvStateClient.getKvState((KvStateServerAddress) Matchers.any(KvStateServerAddress.class), (KvStateID) Matchers.any(KvStateID.class), (byte[]) Matchers.any(byte[].class))).thenReturn(Futures.successful(new byte[0]));
        QueryableStateClient queryableStateClient = new QueryableStateClient(kvStateLocationLookupService, kvStateClient, testActorSystem.dispatcher());
        queryableStateClient.getKvState(jobID, "unique-per-job", 0, new byte[0]);
        queryableStateClient.getKvState(jobID2, "unique-per-job", 0, new byte[0]);
        ((KvStateLocationLookupService) Mockito.verify(kvStateLocationLookupService, Mockito.times(1))).getKvStateLookupInfo((JobID) Matchers.eq(jobID), (String) Matchers.eq("unique-per-job"));
        ((KvStateLocationLookupService) Mockito.verify(kvStateLocationLookupService, Mockito.times(1))).getKvStateLookupInfo((JobID) Matchers.eq(jobID2), (String) Matchers.eq("unique-per-job"));
    }
}
