package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.sort.ExternalSortLargeRecordsITCase;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateTable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.class */
public class CopyOnWriteStateTableTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest$MockInternalKeyContext.class */
    static class MockInternalKeyContext<T> implements InternalKeyContext<T> {
        private T key;
        private final TypeSerializer<T> serializer;
        private final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);

        public MockInternalKeyContext(TypeSerializer<T> typeSerializer) {
            this.serializer = typeSerializer;
        }

        public void setKey(T t) {
            this.key = t;
        }

        public T getCurrentKey() {
            return this.key;
        }

        public int getCurrentKeyGroupIndex() {
            return 0;
        }

        public int getNumberOfKeyGroups() {
            return 1;
        }

        public KeyGroupRange getKeyGroupRange() {
            return this.keyGroupRange;
        }

        public TypeSerializer<T> getKeySerializer() {
            return this.serializer;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest$TestDuplicateSerializer.class */
    static class TestDuplicateSerializer extends TypeSerializer<Integer> {
        private static final long serialVersionUID = 1;
        private static final Integer ZERO = 0;
        private boolean disabled = false;

        public boolean isImmutableType() {
            return true;
        }

        public TypeSerializer<Integer> duplicate() {
            return new TestDuplicateSerializer();
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public Integer m323createInstance() {
            return ZERO;
        }

        public Integer copy(Integer num) {
            return num;
        }

        public Integer copy(Integer num, Integer num2) {
            return num;
        }

        public int getLength() {
            return 4;
        }

        public void serialize(Integer num, DataOutputView dataOutputView) throws IOException {
            Assert.assertFalse(this.disabled);
            dataOutputView.writeInt(num.intValue());
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Integer m322deserialize(DataInputView dataInputView) throws IOException {
            Assert.assertFalse(this.disabled);
            return Integer.valueOf(dataInputView.readInt());
        }

        public Integer deserialize(Integer num, DataInputView dataInputView) throws IOException {
            Assert.assertFalse(this.disabled);
            return m322deserialize(dataInputView);
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
            Assert.assertFalse(this.disabled);
            dataOutputView.writeInt(dataInputView.readInt());
        }

        public boolean equals(Object obj) {
            return obj instanceof TestDuplicateSerializer;
        }

        public boolean canEqual(Object obj) {
            return obj instanceof TestDuplicateSerializer;
        }

        public int hashCode() {
            return getClass().hashCode();
        }

        public void disable() {
            this.disabled = true;
        }

        public TypeSerializerConfigSnapshot snapshotConfiguration() {
            throw new UnsupportedOperationException();
        }

        public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot typeSerializerConfigSnapshot) {
            throw new UnsupportedOperationException();
        }
    }

    @Test
    public void testPutGetRemoveContainsTransform() throws Exception {
        CopyOnWriteStateTable copyOnWriteStateTable = new CopyOnWriteStateTable(new MockInternalKeyContext(IntSerializer.INSTANCE), new RegisteredKeyedBackendStateMetaInfo(StateDescriptor.Type.UNKNOWN, "test", IntSerializer.INSTANCE, new ArrayListSerializer(IntSerializer.INSTANCE)));
        ArrayList arrayList = new ArrayList();
        arrayList.add(41);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(42);
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(43);
        Assert.assertNull(copyOnWriteStateTable.putAndGetOld(1, 1, arrayList));
        Assert.assertEquals(arrayList, copyOnWriteStateTable.get(1, 1));
        Assert.assertEquals(1L, copyOnWriteStateTable.size());
        Assert.assertNull(copyOnWriteStateTable.putAndGetOld(2, 1, arrayList2));
        Assert.assertEquals(arrayList2, copyOnWriteStateTable.get(2, 1));
        Assert.assertEquals(2L, copyOnWriteStateTable.size());
        Assert.assertNull(copyOnWriteStateTable.putAndGetOld(1, 2, arrayList3));
        Assert.assertEquals(arrayList3, copyOnWriteStateTable.get(1, 2));
        Assert.assertEquals(3L, copyOnWriteStateTable.size());
        Assert.assertTrue(copyOnWriteStateTable.containsKey(2, 1));
        Assert.assertFalse(copyOnWriteStateTable.containsKey(3, 1));
        Assert.assertFalse(copyOnWriteStateTable.containsKey(2, 3));
        copyOnWriteStateTable.put(2, 1, (Object) null);
        Assert.assertTrue(copyOnWriteStateTable.containsKey(2, 1));
        Assert.assertEquals(3L, copyOnWriteStateTable.size());
        Assert.assertNull(copyOnWriteStateTable.get(2, 1));
        copyOnWriteStateTable.put(2, 1, arrayList2);
        Assert.assertEquals(3L, copyOnWriteStateTable.size());
        Assert.assertEquals(arrayList2, copyOnWriteStateTable.removeAndGetOld(2, 1));
        Assert.assertFalse(copyOnWriteStateTable.containsKey(2, 1));
        Assert.assertEquals(2L, copyOnWriteStateTable.size());
        copyOnWriteStateTable.remove(1, 2);
        Assert.assertFalse(copyOnWriteStateTable.containsKey(1, 2));
        Assert.assertEquals(1L, copyOnWriteStateTable.size());
        Assert.assertNull(copyOnWriteStateTable.removeAndGetOld(4, 2));
        Assert.assertEquals(1L, copyOnWriteStateTable.size());
        StateTransformationFunction<ArrayList<Integer>, Integer> stateTransformationFunction = new StateTransformationFunction<ArrayList<Integer>, Integer>() { // from class: org.apache.flink.runtime.state.heap.CopyOnWriteStateTableTest.1
            public ArrayList<Integer> apply(ArrayList<Integer> arrayList4, Integer num) throws Exception {
                arrayList4.add(num);
                return arrayList4;
            }
        };
        copyOnWriteStateTable.transform(1, 1, 4711, stateTransformationFunction);
        Assert.assertEquals((ArrayList) stateTransformationFunction.apply(arrayList, 4711), copyOnWriteStateTable.get(1, 1));
    }

    @Test
    public void testIncrementalRehash() {
        CopyOnWriteStateTable copyOnWriteStateTable = new CopyOnWriteStateTable(new MockInternalKeyContext(IntSerializer.INSTANCE), new RegisteredKeyedBackendStateMetaInfo(StateDescriptor.Type.UNKNOWN, "test", IntSerializer.INSTANCE, new ArrayListSerializer(IntSerializer.INSTANCE)));
        int i = 0;
        int i2 = 0;
        while (!copyOnWriteStateTable.isRehashing()) {
            int i3 = i;
            i++;
            copyOnWriteStateTable.put(Integer.valueOf(i3), 0, new ArrayList());
            if (i % 8 == 0) {
                int i4 = i2;
                i2++;
                copyOnWriteStateTable.remove(Integer.valueOf(i4), 0);
            }
        }
        Assert.assertEquals(i - i2, copyOnWriteStateTable.size());
        while (copyOnWriteStateTable.isRehashing()) {
            int i5 = i;
            i++;
            copyOnWriteStateTable.put(Integer.valueOf(i5), 0, new ArrayList());
            if (i % 8 == 0) {
                int i6 = i2;
                i2++;
                copyOnWriteStateTable.remove(Integer.valueOf(i6), 0);
            }
        }
        Assert.assertEquals(i - i2, copyOnWriteStateTable.size());
        for (int i7 = 0; i7 < i; i7++) {
            if (i7 < i2) {
                Assert.assertFalse(copyOnWriteStateTable.containsKey(Integer.valueOf(i7), 0));
            } else {
                Assert.assertTrue(copyOnWriteStateTable.containsKey(Integer.valueOf(i7), 0));
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testRandomModificationsAndCopyOnWriteIsolation() throws Exception {
        CopyOnWriteStateTable copyOnWriteStateTable = new CopyOnWriteStateTable(new MockInternalKeyContext(IntSerializer.INSTANCE), new RegisteredKeyedBackendStateMetaInfo(StateDescriptor.Type.UNKNOWN, "test", IntSerializer.INSTANCE, new ArrayListSerializer(IntSerializer.INSTANCE)));
        HashMap hashMap = new HashMap();
        Random random = new Random(42L);
        CopyOnWriteStateTable.StateTableEntry[] stateTableEntryArr = null;
        int i = 0;
        Tuple3<Integer, Integer, ArrayList<Integer>>[] tuple3Arr = null;
        int i2 = 0;
        int i3 = 0;
        int i4 = 0;
        StateTransformationFunction<ArrayList<Integer>, Integer> stateTransformationFunction = new StateTransformationFunction<ArrayList<Integer>, Integer>() { // from class: org.apache.flink.runtime.state.heap.CopyOnWriteStateTableTest.2
            public ArrayList<Integer> apply(ArrayList<Integer> arrayList, Integer num) throws Exception {
                if (arrayList == null) {
                    arrayList = new ArrayList<>();
                }
                arrayList.add(num);
                return arrayList;
            }
        };
        for (int i5 = 0; i5 < 10000000; i5++) {
            int nextInt = random.nextInt(20);
            int nextInt2 = random.nextInt(4);
            Tuple2 tuple2 = new Tuple2(Integer.valueOf(nextInt), Integer.valueOf(nextInt2));
            int nextInt3 = random.nextInt(7);
            ArrayList arrayList = null;
            ArrayList arrayList2 = null;
            switch (nextInt3) {
                case ExternalSortLargeRecordsITCase.SmallOrMediumOrLargeValue.SMALL_SIZE /* 0 */:
                case 1:
                    arrayList = (ArrayList) copyOnWriteStateTable.get(Integer.valueOf(nextInt), Integer.valueOf(nextInt2));
                    arrayList2 = (ArrayList) hashMap.get(tuple2);
                    if (null == arrayList) {
                        arrayList = new ArrayList();
                        copyOnWriteStateTable.put(Integer.valueOf(nextInt), Integer.valueOf(nextInt2), arrayList);
                        arrayList2 = new ArrayList();
                        hashMap.put(tuple2, arrayList2);
                        break;
                    }
                    break;
                case 2:
                    copyOnWriteStateTable.put(Integer.valueOf(nextInt), Integer.valueOf(nextInt2), new ArrayList());
                    hashMap.put(tuple2, new ArrayList());
                    break;
                case 3:
                    arrayList = (ArrayList) copyOnWriteStateTable.putAndGetOld(Integer.valueOf(nextInt), Integer.valueOf(nextInt2), new ArrayList());
                    arrayList2 = (ArrayList) hashMap.put(tuple2, new ArrayList());
                    break;
                case 4:
                    copyOnWriteStateTable.remove(Integer.valueOf(nextInt), Integer.valueOf(nextInt2));
                    hashMap.remove(tuple2);
                    break;
                case 5:
                    arrayList = (ArrayList) copyOnWriteStateTable.removeAndGetOld(Integer.valueOf(nextInt), Integer.valueOf(nextInt2));
                    arrayList2 = (ArrayList) hashMap.remove(tuple2);
                    break;
                case 6:
                    int nextInt4 = random.nextInt(1000);
                    copyOnWriteStateTable.transform(Integer.valueOf(nextInt), Integer.valueOf(nextInt2), Integer.valueOf(nextInt4), stateTransformationFunction);
                    hashMap.put(tuple2, stateTransformationFunction.apply(hashMap.remove(tuple2), Integer.valueOf(nextInt4)));
                    break;
                default:
                    Assert.fail("Unknown op-code " + nextInt3);
                    break;
            }
            Assert.assertEquals(hashMap.size(), copyOnWriteStateTable.size());
            if (arrayList != null) {
                if (!random.nextBoolean() || arrayList.isEmpty()) {
                    arrayList.add(Integer.valueOf(i2));
                    arrayList2.add(Integer.valueOf(i2));
                    i2++;
                } else {
                    arrayList.remove(arrayList.size() - 1);
                    arrayList2.remove(arrayList2.size() - 1);
                }
            }
            Assert.assertEquals(arrayList2, arrayList);
            if (i5 > 0 && i5 % 500 == 0) {
                if (stateTableEntryArr != null) {
                    deepCheck(tuple3Arr, convert(stateTableEntryArr, i));
                    if (i5 % 1000 == 0) {
                        i3++;
                        copyOnWriteStateTable.snapshotTableArrays();
                        copyOnWriteStateTable.releaseSnapshot(i3);
                    }
                    if (i5 % 5000 == 0) {
                        stateTableEntryArr = null;
                        tuple3Arr = null;
                        i = 0;
                        copyOnWriteStateTable.releaseSnapshot(i4);
                    }
                } else {
                    i3++;
                    i4 = i3;
                    stateTableEntryArr = copyOnWriteStateTable.snapshotTableArrays();
                    i = copyOnWriteStateTable.size();
                    tuple3Arr = manualDeepDump(hashMap);
                }
            }
        }
    }

    @Test
    public void testCopyOnWriteContracts() {
        CopyOnWriteStateTable copyOnWriteStateTable = new CopyOnWriteStateTable(new MockInternalKeyContext(IntSerializer.INSTANCE), new RegisteredKeyedBackendStateMetaInfo(StateDescriptor.Type.UNKNOWN, "test", IntSerializer.INSTANCE, new ArrayListSerializer(IntSerializer.INSTANCE)));
        ArrayList arrayList = new ArrayList(1);
        ArrayList arrayList2 = new ArrayList(1);
        ArrayList arrayList3 = new ArrayList(1);
        ArrayList arrayList4 = new ArrayList(1);
        ArrayList arrayList5 = new ArrayList(1);
        arrayList.add(1);
        arrayList2.add(2);
        arrayList3.add(3);
        arrayList4.add(4);
        arrayList5.add(5);
        copyOnWriteStateTable.put(1, 1, arrayList);
        copyOnWriteStateTable.put(2, 1, arrayList2);
        copyOnWriteStateTable.put(4, 1, arrayList4);
        copyOnWriteStateTable.put(5, 1, arrayList5);
        Assert.assertTrue(copyOnWriteStateTable.get(1, 1) == arrayList);
        CopyOnWriteStateTableSnapshot createSnapshot = copyOnWriteStateTable.createSnapshot();
        ArrayList arrayList6 = (ArrayList) copyOnWriteStateTable.get(1, 1);
        Assert.assertFalse(arrayList6 == arrayList);
        Assert.assertEquals(arrayList, arrayList6);
        copyOnWriteStateTable.put(3, 1, arrayList3);
        Assert.assertTrue(arrayList6 == copyOnWriteStateTable.get(1, 1));
        CopyOnWriteStateTableSnapshot createSnapshot2 = copyOnWriteStateTable.createSnapshot();
        Assert.assertFalse(arrayList6 == copyOnWriteStateTable.get(1, 1));
        Assert.assertEquals(arrayList6, copyOnWriteStateTable.get(1, 1));
        copyOnWriteStateTable.releaseSnapshot(createSnapshot2);
        Assert.assertTrue(arrayList3 == copyOnWriteStateTable.get(3, 1));
        Assert.assertFalse(arrayList4 == copyOnWriteStateTable.get(4, 1));
        copyOnWriteStateTable.releaseSnapshot(createSnapshot);
        Assert.assertTrue(arrayList5 == copyOnWriteStateTable.get(5, 1));
    }

    @Test
    public void testSerializerDuplicationInSnapshot() throws IOException {
        TestDuplicateSerializer testDuplicateSerializer = new TestDuplicateSerializer();
        TestDuplicateSerializer testDuplicateSerializer2 = new TestDuplicateSerializer();
        final TestDuplicateSerializer testDuplicateSerializer3 = new TestDuplicateSerializer();
        RegisteredKeyedBackendStateMetaInfo registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo(StateDescriptor.Type.VALUE, "test", testDuplicateSerializer, testDuplicateSerializer2);
        final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
        CopyOnWriteStateTable copyOnWriteStateTable = new CopyOnWriteStateTable(new InternalKeyContext<Integer>() { // from class: org.apache.flink.runtime.state.heap.CopyOnWriteStateTableTest.3
            /* renamed from: getCurrentKey, reason: merged with bridge method [inline-methods] */
            public Integer m320getCurrentKey() {
                return 0;
            }

            public int getCurrentKeyGroupIndex() {
                return 0;
            }

            public int getNumberOfKeyGroups() {
                return 1;
            }

            public KeyGroupRange getKeyGroupRange() {
                return keyGroupRange;
            }

            public TypeSerializer<Integer> getKeySerializer() {
                return testDuplicateSerializer3;
            }
        }, registeredKeyedBackendStateMetaInfo);
        copyOnWriteStateTable.put(0, 0, 0, 0);
        copyOnWriteStateTable.put(1, 0, 0, 1);
        copyOnWriteStateTable.put(2, 0, 1, 2);
        CopyOnWriteStateTableSnapshot createSnapshot = copyOnWriteStateTable.createSnapshot();
        try {
            testDuplicateSerializer.disable();
            testDuplicateSerializer3.disable();
            testDuplicateSerializer2.disable();
            createSnapshot.writeMappingsInKeyGroup(new DataOutputViewStreamWrapper(new ByteArrayOutputStreamWithPos(1024)), 0);
            copyOnWriteStateTable.releaseSnapshot(createSnapshot);
        } catch (Throwable th) {
            copyOnWriteStateTable.releaseSnapshot(createSnapshot);
            throw th;
        }
    }

    private static <K, N, S> Tuple3<K, N, S>[] convert(CopyOnWriteStateTable.StateTableEntry<K, N, S>[] stateTableEntryArr, int i) {
        Tuple3<K, N, S>[] tuple3Arr = new Tuple3[i];
        int i2 = 0;
        for (CopyOnWriteStateTable.StateTableEntry<K, N, S> stateTableEntry : stateTableEntryArr) {
            while (true) {
                CopyOnWriteStateTable.StateTableEntry<K, N, S> stateTableEntry2 = stateTableEntry;
                if (null != stateTableEntry2) {
                    int i3 = i2;
                    i2++;
                    tuple3Arr[i3] = new Tuple3<>(stateTableEntry2.getKey(), stateTableEntry2.getNamespace(), stateTableEntry2.getState());
                    stateTableEntry = stateTableEntry2.next;
                }
            }
        }
        Assert.assertEquals(i, i2);
        return tuple3Arr;
    }

    private Tuple3<Integer, Integer, ArrayList<Integer>>[] manualDeepDump(HashMap<Tuple2<Integer, Integer>, ArrayList<Integer>> hashMap) {
        Tuple3<Integer, Integer, ArrayList<Integer>>[] tuple3Arr = new Tuple3[hashMap.size()];
        int i = 0;
        for (Map.Entry<Tuple2<Integer, Integer>, ArrayList<Integer>> entry : hashMap.entrySet()) {
            int i2 = i;
            i++;
            tuple3Arr[i2] = new Tuple3<>((Integer) entry.getKey().f0, (Integer) entry.getKey().f1, new ArrayList(entry.getValue()));
        }
        return tuple3Arr;
    }

    private void deepCheck(Tuple3<Integer, Integer, ArrayList<Integer>>[] tuple3Arr, Tuple3<Integer, Integer, ArrayList<Integer>>[] tuple3Arr2) {
        if (tuple3Arr == tuple3Arr2) {
            return;
        }
        Assert.assertEquals(tuple3Arr.length, tuple3Arr2.length);
        Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>> comparator = new Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>>() { // from class: org.apache.flink.runtime.state.heap.CopyOnWriteStateTableTest.4
            @Override // java.util.Comparator
            public int compare(Tuple3<Integer, Integer, ArrayList<Integer>> tuple3, Tuple3<Integer, Integer, ArrayList<Integer>> tuple32) {
                int intValue = ((Integer) tuple3.f1).intValue() - ((Integer) tuple32.f1).intValue();
                return intValue != 0 ? intValue : ((Integer) tuple3.f0).intValue() - ((Integer) tuple32.f0).intValue();
            }
        };
        Arrays.sort(tuple3Arr, comparator);
        Arrays.sort(tuple3Arr2, comparator);
        for (int i = 0; i < tuple3Arr.length; i++) {
            Tuple3<Integer, Integer, ArrayList<Integer>> tuple3 = tuple3Arr[i];
            Tuple3<Integer, Integer, ArrayList<Integer>> tuple32 = tuple3Arr2[i];
            Assert.assertEquals(tuple3.f0, tuple32.f0);
            Assert.assertEquals(tuple3.f1, tuple32.f1);
            Assert.assertEquals(tuple3.f2, tuple32.f2);
        }
    }
}
