package org.apache.flink.runtime.state.heap.async;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/async/HeapListStateTest.class */
public class HeapListStateTest extends HeapStateBackendTestBase {
    @Test
    public void testAddAndGet() throws Exception {
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("my-state", Long.class);
        listStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        AsyncHeapKeyedStateBackend<String> createKeyedBackend = createKeyedBackend();
        try {
            HeapListState createListState = createKeyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
            AbstractHeapMergingState abstractHeapMergingState = (AbstractHeapMergingState) createListState;
            abstractHeapMergingState.setCurrentNamespace(VoidNamespace.INSTANCE);
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(createListState.get());
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(createListState.get());
            createListState.add(17L);
            createListState.add(11L);
            Assert.assertEquals(Arrays.asList(17L, 11L), createListState.get());
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(createListState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertNull(createListState.get());
            createListState.add(1L);
            createListState.add(2L);
            createKeyedBackend.setCurrentKey("def");
            Assert.assertEquals(Arrays.asList(17L, 11L), createListState.get());
            createListState.clear();
            Assert.assertNull(createListState.get());
            createKeyedBackend.setCurrentKey("g");
            createListState.add(3L);
            createListState.add(2L);
            createListState.add(1L);
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(createListState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertEquals(Arrays.asList(1L, 2L, 3L, 2L, 1L), createListState.get());
            createListState.clear();
            createListState.getStateTable();
            Assert.assertTrue(abstractHeapMergingState.getStateTable().isEmpty());
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testMerging() throws Exception {
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("my-state", Long.class);
        listStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        HashSet hashSet = new HashSet(Arrays.asList(11L, 22L, 33L, 44L, 55L));
        AsyncHeapKeyedStateBackend<String> createKeyedBackend = createKeyedBackend();
        try {
            AbstractHeapMergingState createListState = createKeyedBackend.createListState(IntSerializer.INSTANCE, listStateDescriptor);
            AbstractHeapMergingState abstractHeapMergingState = createListState;
            createKeyedBackend.setCurrentKey("abc");
            abstractHeapMergingState.setCurrentNamespace(1);
            createListState.add(33L);
            createListState.add(55L);
            abstractHeapMergingState.setCurrentNamespace(2);
            createListState.add(22L);
            createListState.add(11L);
            abstractHeapMergingState.setCurrentNamespace(3);
            createListState.add(44L);
            createKeyedBackend.setCurrentKey("def");
            abstractHeapMergingState.setCurrentNamespace(1);
            createListState.add(11L);
            createListState.add(44L);
            abstractHeapMergingState.setCurrentNamespace(3);
            createListState.add(22L);
            createListState.add(55L);
            createListState.add(33L);
            createKeyedBackend.setCurrentKey("jkl");
            abstractHeapMergingState.setCurrentNamespace(1);
            createListState.add(11L);
            createListState.add(22L);
            createListState.add(33L);
            createListState.add(44L);
            createListState.add(55L);
            createKeyedBackend.setCurrentKey("mno");
            abstractHeapMergingState.setCurrentNamespace(3);
            createListState.add(11L);
            createListState.add(22L);
            createListState.add(33L);
            createListState.add(44L);
            createListState.add(55L);
            createKeyedBackend.setCurrentKey("abc");
            abstractHeapMergingState.mergeNamespaces(1, Arrays.asList(2, 3));
            abstractHeapMergingState.setCurrentNamespace(1);
            validateResult((Iterable) createListState.get(), hashSet);
            createKeyedBackend.setCurrentKey("def");
            abstractHeapMergingState.mergeNamespaces(1, Arrays.asList(2, 3));
            abstractHeapMergingState.setCurrentNamespace(1);
            validateResult((Iterable) createListState.get(), hashSet);
            createKeyedBackend.setCurrentKey("ghi");
            abstractHeapMergingState.mergeNamespaces(1, Arrays.asList(2, 3));
            abstractHeapMergingState.setCurrentNamespace(1);
            Assert.assertNull(createListState.get());
            createKeyedBackend.setCurrentKey("jkl");
            abstractHeapMergingState.mergeNamespaces(1, Arrays.asList(2, 3));
            abstractHeapMergingState.setCurrentNamespace(1);
            validateResult((Iterable) createListState.get(), hashSet);
            createKeyedBackend.setCurrentKey("mno");
            abstractHeapMergingState.mergeNamespaces(1, Arrays.asList(2, 3));
            abstractHeapMergingState.setCurrentNamespace(1);
            validateResult((Iterable) createListState.get(), hashSet);
            createKeyedBackend.setCurrentKey("abc");
            abstractHeapMergingState.setCurrentNamespace(1);
            createListState.clear();
            createKeyedBackend.setCurrentKey("def");
            abstractHeapMergingState.setCurrentNamespace(1);
            createListState.clear();
            createKeyedBackend.setCurrentKey("ghi");
            abstractHeapMergingState.setCurrentNamespace(1);
            createListState.clear();
            createKeyedBackend.setCurrentKey("jkl");
            abstractHeapMergingState.setCurrentNamespace(1);
            createListState.clear();
            createKeyedBackend.setCurrentKey("mno");
            abstractHeapMergingState.setCurrentNamespace(1);
            createListState.clear();
            Assert.assertTrue(abstractHeapMergingState.getStateTable().isEmpty());
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    private static <T> void validateResult(Iterable<T> iterable, Set<T> set) {
        int i = 0;
        Iterator<T> it = iterable.iterator();
        while (it.hasNext()) {
            i++;
            Assert.assertTrue(set.contains(it.next()));
        }
        Assert.assertEquals(set.size(), i);
    }
}
