package org.apache.flink.runtime.zookeeper;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.framework.api.BackgroundCallback;
import org.apache.flink.shaded.org.apache.curator.framework.api.CuratorEvent;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.class */
public class ZooKeeperStateHandleStoreITCase extends TestLogger {
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase$LongRetrievableStateHandle.class */
    public static class LongRetrievableStateHandle implements RetrievableStateHandle<Long> {
        private static final long serialVersionUID = -3555329254423838912L;
        private final Long state;
        private int numberOfDiscardCalls;

        public LongRetrievableStateHandle(Long l) {
            this.state = l;
        }

        /* renamed from: retrieveState, reason: merged with bridge method [inline-methods] */
        public Long m395retrieveState() throws Exception {
            return this.state;
        }

        public void discardState() throws Exception {
            this.numberOfDiscardCalls++;
        }

        public long getStateSize() {
            return 0L;
        }

        public int getNumberOfDiscardCalls() {
            return this.numberOfDiscardCalls;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase$LongStateStorage.class */
    private static class LongStateStorage implements RetrievableStateStorageHelper<Long> {
        private final List<LongRetrievableStateHandle> stateHandles;

        private LongStateStorage() {
            this.stateHandles = new ArrayList();
        }

        public RetrievableStateHandle<Long> store(Long l) throws Exception {
            LongRetrievableStateHandle longRetrievableStateHandle = new LongRetrievableStateHandle(l);
            this.stateHandles.add(longRetrievableStateHandle);
            return longRetrievableStateHandle;
        }

        List<LongRetrievableStateHandle> getStateHandles() {
            return this.stateHandles;
        }
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (ZooKeeper != null) {
            ZooKeeper.shutdown();
        }
    }

    @Before
    public void cleanUp() throws Exception {
        ZooKeeper.deleteAll();
    }

    @Test
    public void testAdd() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), new LongStateStorage(), Executors.directExecutor());
        zooKeeperStateHandleStore.add("/testAdd", 1239712317L);
        Assert.assertEquals(1L, zooKeeperStateHandleStore.getAll().size());
        Assert.assertEquals(1239712317L, zooKeeperStateHandleStore.get("/testAdd").retrieveState());
        Stat stat = (Stat) ZooKeeper.getClient().checkExists().forPath("/testAdd");
        Assert.assertNotNull(stat);
        Assert.assertEquals(0L, stat.getEphemeralOwner());
        Assert.assertEquals(1239712317L, (Long) ((RetrievableStateHandle) InstantiationUtil.deserializeObject((byte[]) ZooKeeper.getClient().getData().forPath("/testAdd"), ClassLoader.getSystemClassLoader())).retrieveState());
    }

    @Test
    public void testAddWithCreateMode() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), longStateStorage, Executors.directExecutor());
        Long l = 3457347234L;
        CreateMode[] values = CreateMode.values();
        for (int i = 0; i < values.length; i++) {
            CreateMode createMode = values[i];
            l = Long.valueOf(l.longValue() + i);
            String str = "/testAddWithCreateMode" + createMode.name();
            zooKeeperStateHandleStore.add(str, l, createMode);
            if (createMode.isSequential()) {
                Iterator it = ((List) ZooKeeper.getClient().getChildren().forPath("/")).iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    String str2 = (String) it.next();
                    if (str2.startsWith("testAddWithCreateMode" + createMode.name())) {
                        str = "/" + str2;
                        break;
                    }
                }
            }
            Assert.assertEquals(i + 1, zooKeeperStateHandleStore.getAll().size());
            Assert.assertEquals(l, longStateStorage.getStateHandles().get(i).m395retrieveState());
            Stat stat = (Stat) ZooKeeper.getClient().checkExists().forPath(str);
            Assert.assertNotNull(stat);
            if (createMode.isEphemeral()) {
                Assert.assertTrue(stat.getEphemeralOwner() != 0);
            } else {
                Assert.assertEquals(0L, stat.getEphemeralOwner());
            }
            Assert.assertEquals(l, (Long) ((RetrievableStateHandle) InstantiationUtil.deserializeObject((byte[]) ZooKeeper.getClient().getData().forPath(str), ClassLoader.getSystemClassLoader())).retrieveState());
        }
    }

    @Test(expected = Exception.class)
    public void testAddAlreadyExistingPath() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), new LongStateStorage(), Executors.directExecutor());
        ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath");
        zooKeeperStateHandleStore.add("/testAddAlreadyExistingPath", 1L);
    }

    @Test
    public void testAddDiscardStateHandleAfterFailure() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        CuratorFramework curatorFramework = (CuratorFramework) Mockito.spy(ZooKeeper.getClient());
        Mockito.when(curatorFramework.create()).thenThrow(new Throwable[]{new RuntimeException("Expected test Exception.")});
        try {
            new ZooKeeperStateHandleStore(curatorFramework, longStateStorage, Executors.directExecutor()).add("/testAddDiscardStateHandleAfterFailure", 81282227L);
            Assert.fail("Did not throw expected exception");
        } catch (Exception e) {
        }
        Assert.assertEquals(1L, longStateStorage.getStateHandles().size());
        Assert.assertEquals(81282227L, longStateStorage.getStateHandles().get(0).m395retrieveState());
        Assert.assertEquals(1L, longStateStorage.getStateHandles().get(0).getNumberOfDiscardCalls());
    }

    @Test
    public void testReplace() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), longStateStorage, Executors.directExecutor());
        zooKeeperStateHandleStore.add("/testReplace", 30968470898L);
        zooKeeperStateHandleStore.replace("/testReplace", 0, 88383776661L);
        Assert.assertEquals(2L, longStateStorage.getStateHandles().size());
        Assert.assertEquals(30968470898L, longStateStorage.getStateHandles().get(0).m395retrieveState());
        Assert.assertEquals(88383776661L, longStateStorage.getStateHandles().get(1).m395retrieveState());
        Stat stat = (Stat) ZooKeeper.getClient().checkExists().forPath("/testReplace");
        Assert.assertNotNull(stat);
        Assert.assertEquals(0L, stat.getEphemeralOwner());
        Assert.assertEquals(88383776661L, (Long) ((RetrievableStateHandle) InstantiationUtil.deserializeObject((byte[]) ZooKeeper.getClient().getData().forPath("/testReplace"), ClassLoader.getSystemClassLoader())).retrieveState());
    }

    @Test(expected = Exception.class)
    public void testReplaceNonExistingPath() throws Exception {
        new ZooKeeperStateHandleStore(ZooKeeper.getClient(), new LongStateStorage(), Executors.directExecutor()).replace("/testReplaceNonExistingPath", 0, 1L);
    }

    @Test
    public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        CuratorFramework curatorFramework = (CuratorFramework) Mockito.spy(ZooKeeper.getClient());
        Mockito.when(curatorFramework.setData()).thenThrow(new Throwable[]{new RuntimeException("Expected test Exception.")});
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(curatorFramework, longStateStorage, Executors.directExecutor());
        zooKeeperStateHandleStore.add("/testReplaceDiscardStateHandleAfterFailure", 30968470898L);
        try {
            zooKeeperStateHandleStore.replace("/testReplaceDiscardStateHandleAfterFailure", 0, 88383776661L);
            Assert.fail("Did not throw expected exception");
        } catch (Exception e) {
        }
        Assert.assertEquals(2L, longStateStorage.getStateHandles().size());
        Assert.assertEquals(30968470898L, longStateStorage.getStateHandles().get(0).m395retrieveState());
        Assert.assertEquals(88383776661L, longStateStorage.getStateHandles().get(1).m395retrieveState());
        Assert.assertEquals(1L, longStateStorage.getStateHandles().get(1).getNumberOfDiscardCalls());
        Assert.assertEquals(30968470898L, (Long) ((RetrievableStateHandle) InstantiationUtil.deserializeObject((byte[]) ZooKeeper.getClient().getData().forPath("/testReplaceDiscardStateHandleAfterFailure"), ClassLoader.getSystemClassLoader())).retrieveState());
    }

    @Test
    public void testGetAndExists() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), new LongStateStorage(), Executors.directExecutor());
        Assert.assertEquals(-1L, zooKeeperStateHandleStore.exists("/testGetAndExists"));
        zooKeeperStateHandleStore.add("/testGetAndExists", 311222268470898L);
        Assert.assertEquals(311222268470898L, zooKeeperStateHandleStore.get("/testGetAndExists").retrieveState());
        Assert.assertTrue(zooKeeperStateHandleStore.exists("/testGetAndExists") >= 0);
    }

    @Test(expected = Exception.class)
    public void testGetNonExistingPath() throws Exception {
        new ZooKeeperStateHandleStore(ZooKeeper.getClient(), new LongStateStorage(), Executors.directExecutor()).get("/testGetNonExistingPath");
    }

    @Test
    public void testGetAll() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), new LongStateStorage(), Executors.directExecutor());
        HashSet hashSet = new HashSet();
        hashSet.add(311222268470898L);
        hashSet.add(132812888L);
        hashSet.add(27255442L);
        hashSet.add(11122233124L);
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            zooKeeperStateHandleStore.add("/testGetAll", Long.valueOf(((Long) it.next()).longValue()), CreateMode.PERSISTENT_SEQUENTIAL);
        }
        Iterator it2 = zooKeeperStateHandleStore.getAll().iterator();
        while (it2.hasNext()) {
            Assert.assertTrue(hashSet.remove(((RetrievableStateHandle) ((Tuple2) it2.next()).f0).retrieveState()));
        }
        Assert.assertEquals(0L, hashSet.size());
    }

    @Test
    public void testGetAllSortedByName() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), new LongStateStorage(), Executors.directExecutor());
        Long[] lArr = {311222268470898L, 132812888L, 27255442L, 11122233124L};
        for (Long l : lArr) {
            zooKeeperStateHandleStore.add("/testGetAllSortedByName", Long.valueOf(l.longValue()), CreateMode.PERSISTENT_SEQUENTIAL);
        }
        List allSortedByName = zooKeeperStateHandleStore.getAllSortedByName();
        Assert.assertEquals(lArr.length, allSortedByName.size());
        for (int i = 0; i < lArr.length; i++) {
            Assert.assertEquals(lArr[i], ((RetrievableStateHandle) ((Tuple2) allSortedByName.get(i)).f0).retrieveState());
        }
    }

    @Test
    public void testRemove() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), new LongStateStorage(), Executors.directExecutor());
        zooKeeperStateHandleStore.add("/testRemove", 27255442L);
        zooKeeperStateHandleStore.remove("/testRemove");
        Assert.assertEquals(0L, ((List) ZooKeeper.getClient().getChildren().forPath("/")).size());
    }

    @Test
    public void testRemoveWithCallback() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), new LongStateStorage(), Executors.directExecutor());
        zooKeeperStateHandleStore.add("/testRemoveWithCallback", 27255442L);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        BackgroundCallback backgroundCallback = (BackgroundCallback) Mockito.mock(BackgroundCallback.class);
        ((BackgroundCallback) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreITCase.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m394answer(InvocationOnMock invocationOnMock) throws Throwable {
                countDownLatch.countDown();
                return null;
            }
        }).when(backgroundCallback)).processResult((CuratorFramework) Matchers.eq(ZooKeeper.getClient()), (CuratorEvent) Matchers.any(CuratorEvent.class));
        zooKeeperStateHandleStore.remove("/testRemoveWithCallback", backgroundCallback);
        Assert.assertEquals(0L, ((List) ZooKeeper.getClient().getChildren().forPath("/")).size());
        countDownLatch.await();
        ((BackgroundCallback) Mockito.verify(backgroundCallback, Mockito.times(1))).processResult((CuratorFramework) Matchers.eq(ZooKeeper.getClient()), (CuratorEvent) Matchers.any(CuratorEvent.class));
    }

    @Test
    public void testRemoveAndDiscardState() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), new LongStateStorage(), Executors.directExecutor());
        zooKeeperStateHandleStore.add("/testDiscard", 27255442L);
        zooKeeperStateHandleStore.removeAndDiscardState("/testDiscard");
        Assert.assertEquals(0L, ((List) ZooKeeper.getClient().getChildren().forPath("/")).size());
    }

    @Test
    public void testRemoveAndDiscardAllState() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), new LongStateStorage(), Executors.directExecutor());
        HashSet hashSet = new HashSet();
        hashSet.add(311222268470898L);
        hashSet.add(132812888L);
        hashSet.add(27255442L);
        hashSet.add(11122233124L);
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            zooKeeperStateHandleStore.add("/testDiscardAll", Long.valueOf(((Long) it.next()).longValue()), CreateMode.PERSISTENT_SEQUENTIAL);
        }
        zooKeeperStateHandleStore.removeAndDiscardAllState();
        Assert.assertEquals(0L, ((List) ZooKeeper.getClient().getChildren().forPath("/")).size());
    }

    @Test
    public void testCorruptedData() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), new LongStateStorage(), Executors.directExecutor());
        HashSet<Long> hashSet = new HashSet();
        hashSet.add(1L);
        hashSet.add(2L);
        hashSet.add(3L);
        for (Long l : hashSet) {
            zooKeeperStateHandleStore.add("/" + l, l);
        }
        ZooKeeper.getClient().setData().forPath("/2", new byte[2]);
        List all = zooKeeperStateHandleStore.getAll();
        HashSet hashSet2 = new HashSet(hashSet);
        hashSet2.remove(2L);
        HashSet hashSet3 = new HashSet(hashSet2.size());
        Iterator it = all.iterator();
        while (it.hasNext()) {
            hashSet3.add(((RetrievableStateHandle) ((Tuple2) it.next()).f0).retrieveState());
        }
        Assert.assertEquals(hashSet2, hashSet3);
        List allSortedByName = zooKeeperStateHandleStore.getAllSortedByName();
        hashSet3.clear();
        Iterator it2 = allSortedByName.iterator();
        while (it2.hasNext()) {
            hashSet3.add(((RetrievableStateHandle) ((Tuple2) it2.next()).f0).retrieveState());
        }
        Assert.assertEquals(hashSet2, hashSet3);
    }
}
