package org.apache.flink.streaming.connectors.kafka;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.hamcrest.collection.IsIn;
import org.hamcrest.collection.IsMapContaining;
import org.hamcrest.core.IsNot;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.class */
public class FlinkKafkaConsumerBaseTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest$DummyFlinkKafkaConsumer.class */
    public static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
        private static final long serialVersionUID = 1;
        boolean isAutoCommitEnabled;
        private List<KafkaTopicPartition> mockFetchedPartitions;

        public DummyFlinkKafkaConsumer() {
            this(Collections.emptyList());
        }

        public DummyFlinkKafkaConsumer(List<KafkaTopicPartition> list) {
            super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema) Mockito.mock(KeyedDeserializationSchema.class));
            this.isAutoCommitEnabled = false;
            this.mockFetchedPartitions = (List) Preconditions.checkNotNull(list);
        }

        protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, StreamingRuntimeContext streamingRuntimeContext, OffsetCommitMode offsetCommitMode) throws Exception {
            return (AbstractFetcher) Mockito.mock(AbstractFetcher.class);
        }

        protected List<KafkaTopicPartition> getKafkaPartitions(List<String> list) {
            return this.mockFetchedPartitions;
        }

        protected boolean getIsAutoCommitEnabled() {
            return this.isAutoCommitEnabled;
        }

        public void setIsAutoCommitEnabled(boolean z) {
            this.isAutoCommitEnabled = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest$TestingListState.class */
    public static final class TestingListState<T> implements ListState<T> {
        private final List<T> list;
        private boolean clearCalled;

        private TestingListState() {
            this.list = new ArrayList();
            this.clearCalled = false;
        }

        public void clear() {
            this.list.clear();
            this.clearCalled = true;
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Iterable<T> m8get() throws Exception {
            return this.list;
        }

        public void add(T t) throws Exception {
            this.list.add(t);
        }

        public List<T> getList() {
            return this.list;
        }

        public boolean isClearCalled() {
            return this.clearCalled;
        }
    }

    @Test
    public void testEitherWatermarkExtractor() {
        try {
            new DummyFlinkKafkaConsumer().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null);
            Assert.fail();
        } catch (NullPointerException e) {
        }
        try {
            new DummyFlinkKafkaConsumer().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null);
            Assert.fail();
        } catch (NullPointerException e2) {
        }
        AssignerWithPeriodicWatermarks assignerWithPeriodicWatermarks = (AssignerWithPeriodicWatermarks) Mockito.mock(AssignerWithPeriodicWatermarks.class);
        AssignerWithPunctuatedWatermarks assignerWithPunctuatedWatermarks = (AssignerWithPunctuatedWatermarks) Mockito.mock(AssignerWithPunctuatedWatermarks.class);
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer();
        dummyFlinkKafkaConsumer.assignTimestampsAndWatermarks(assignerWithPeriodicWatermarks);
        try {
            dummyFlinkKafkaConsumer.assignTimestampsAndWatermarks(assignerWithPunctuatedWatermarks);
            Assert.fail();
        } catch (IllegalStateException e3) {
        }
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer2 = new DummyFlinkKafkaConsumer();
        dummyFlinkKafkaConsumer2.assignTimestampsAndWatermarks(assignerWithPunctuatedWatermarks);
        try {
            dummyFlinkKafkaConsumer2.assignTimestampsAndWatermarks(assignerWithPeriodicWatermarks);
            Assert.fail();
        } catch (IllegalStateException e4) {
        }
    }

    @Test
    public void ignoreCheckpointWhenNotRunning() throws Exception {
        FlinkKafkaConsumerBase consumer = getConsumer((AbstractFetcher) Mockito.mock(AbstractFetcher.class), new LinkedMap(), false);
        OperatorStateStore operatorStateStore = (OperatorStateStore) Mockito.mock(OperatorStateStore.class);
        TestingListState testingListState = new TestingListState();
        Mockito.when(operatorStateStore.getListState((ListStateDescriptor) Matchers.any(ListStateDescriptor.class))).thenReturn(testingListState);
        consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1L, 1L));
        Assert.assertFalse(testingListState.m8get().iterator().hasNext());
        consumer.notifyCheckpointComplete(66L);
    }

    @Test
    public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception {
        OperatorStateStore operatorStateStore = (OperatorStateStore) Mockito.mock(OperatorStateStore.class);
        TestingListState testingListState = new TestingListState();
        testingListState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 16768L));
        testingListState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 987654321L));
        FlinkKafkaConsumerBase consumer = getConsumer(null, new LinkedMap(), true);
        Mockito.when(operatorStateStore.getSerializableListState((String) Matchers.any(String.class))).thenReturn(testingListState);
        StateInitializationContext stateInitializationContext = (StateInitializationContext) Mockito.mock(StateInitializationContext.class);
        Mockito.when(stateInitializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
        Mockito.when(Boolean.valueOf(stateInitializationContext.isRestored())).thenReturn(true);
        consumer.initializeState(stateInitializationContext);
        consumer.open(new Configuration());
        consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17L, 17L));
        Assert.assertTrue(testingListState.isClearCalled());
        HashSet hashSet = new HashSet();
        Iterator it = testingListState.m8get().iterator();
        while (it.hasNext()) {
            hashSet.add((Serializable) it.next());
        }
        int i = 0;
        Iterator it2 = testingListState.m8get().iterator();
        while (it2.hasNext()) {
            Assert.assertTrue(hashSet.contains((Serializable) it2.next()));
            i++;
        }
        Assert.assertEquals(hashSet.size(), i);
    }

    @Test
    public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception {
        FlinkKafkaConsumerBase consumer = getConsumer(null, new LinkedMap(), true);
        OperatorStateStore operatorStateStore = (OperatorStateStore) Mockito.mock(OperatorStateStore.class);
        TestingListState testingListState = new TestingListState();
        Mockito.when(operatorStateStore.getSerializableListState((String) Matchers.any(String.class))).thenReturn(testingListState);
        StateInitializationContext stateInitializationContext = (StateInitializationContext) Mockito.mock(StateInitializationContext.class);
        Mockito.when(stateInitializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
        Mockito.when(Boolean.valueOf(stateInitializationContext.isRestored())).thenReturn(false);
        consumer.initializeState(stateInitializationContext);
        consumer.open(new Configuration());
        consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17L, 17L));
        Assert.assertFalse(testingListState.m8get().iterator().hasNext());
    }

    @Test
    public void checkUseFetcherWhenNoCheckpoint() throws Exception {
        FlinkKafkaConsumerBase consumer = getConsumer(null, new LinkedMap(), true);
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(new KafkaTopicPartition("test", 0));
        consumer.setSubscribedPartitions(arrayList);
        StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when(streamingRuntimeContext.getMetricGroup()).thenReturn(Mockito.mock(MetricGroup.class));
        consumer.setRuntimeContext(streamingRuntimeContext);
        OperatorStateStore operatorStateStore = (OperatorStateStore) Mockito.mock(OperatorStateStore.class);
        Mockito.when(operatorStateStore.getSerializableListState((String) Matchers.any(String.class))).thenReturn(new TestingListState());
        StateInitializationContext stateInitializationContext = (StateInitializationContext) Mockito.mock(StateInitializationContext.class);
        Mockito.when(stateInitializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
        Mockito.when(Boolean.valueOf(stateInitializationContext.isRestored())).thenReturn(false);
        consumer.initializeState(stateInitializationContext);
        consumer.run((SourceFunction.SourceContext) Mockito.mock(SourceFunction.SourceContext.class));
    }

    @Test
    public void testConfigureOnCheckpointsCommitMode() {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer();
        dummyFlinkKafkaConsumer.setIsAutoCommitEnabled(true);
        StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when(Integer.valueOf(streamingRuntimeContext.getIndexOfThisSubtask())).thenReturn(0);
        Mockito.when(Integer.valueOf(streamingRuntimeContext.getNumberOfParallelSubtasks())).thenReturn(1);
        Mockito.when(Boolean.valueOf(streamingRuntimeContext.isCheckpointingEnabled())).thenReturn(true);
        dummyFlinkKafkaConsumer.setRuntimeContext(streamingRuntimeContext);
        dummyFlinkKafkaConsumer.open(new Configuration());
        Assert.assertEquals(OffsetCommitMode.ON_CHECKPOINTS, dummyFlinkKafkaConsumer.getOffsetCommitMode());
    }

    @Test
    public void testConfigureAutoCommitMode() {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer();
        dummyFlinkKafkaConsumer.setIsAutoCommitEnabled(true);
        StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when(Integer.valueOf(streamingRuntimeContext.getIndexOfThisSubtask())).thenReturn(0);
        Mockito.when(Integer.valueOf(streamingRuntimeContext.getNumberOfParallelSubtasks())).thenReturn(1);
        Mockito.when(Boolean.valueOf(streamingRuntimeContext.isCheckpointingEnabled())).thenReturn(false);
        dummyFlinkKafkaConsumer.setRuntimeContext(streamingRuntimeContext);
        dummyFlinkKafkaConsumer.open(new Configuration());
        Assert.assertEquals(OffsetCommitMode.KAFKA_PERIODIC, dummyFlinkKafkaConsumer.getOffsetCommitMode());
    }

    @Test
    public void testConfigureDisableOffsetCommitWithCheckpointing() {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer();
        dummyFlinkKafkaConsumer.setIsAutoCommitEnabled(true);
        StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when(Integer.valueOf(streamingRuntimeContext.getIndexOfThisSubtask())).thenReturn(0);
        Mockito.when(Integer.valueOf(streamingRuntimeContext.getNumberOfParallelSubtasks())).thenReturn(1);
        Mockito.when(Boolean.valueOf(streamingRuntimeContext.isCheckpointingEnabled())).thenReturn(true);
        dummyFlinkKafkaConsumer.setRuntimeContext(streamingRuntimeContext);
        dummyFlinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
        dummyFlinkKafkaConsumer.open(new Configuration());
        Assert.assertEquals(OffsetCommitMode.DISABLED, dummyFlinkKafkaConsumer.getOffsetCommitMode());
    }

    @Test
    public void testConfigureDisableOffsetCommitWithoutCheckpointing() {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer();
        dummyFlinkKafkaConsumer.setIsAutoCommitEnabled(false);
        StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when(Integer.valueOf(streamingRuntimeContext.getIndexOfThisSubtask())).thenReturn(0);
        Mockito.when(Integer.valueOf(streamingRuntimeContext.getNumberOfParallelSubtasks())).thenReturn(1);
        Mockito.when(Boolean.valueOf(streamingRuntimeContext.isCheckpointingEnabled())).thenReturn(false);
        dummyFlinkKafkaConsumer.setRuntimeContext(streamingRuntimeContext);
        dummyFlinkKafkaConsumer.open(new Configuration());
        Assert.assertEquals(OffsetCommitMode.DISABLED, dummyFlinkKafkaConsumer.getOffsetCommitMode());
    }

    @Test
    public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(new KafkaTopicPartition("abc", 13), 16768L);
        hashMap.put(new KafkaTopicPartition("def", 7), 987654321L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new KafkaTopicPartition("abc", 13), 16770L);
        hashMap2.put(new KafkaTopicPartition("def", 7), 987654329L);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(new KafkaTopicPartition("abc", 13), 16780L);
        hashMap3.put(new KafkaTopicPartition("def", 7), 987654377L);
        AbstractFetcher abstractFetcher = (AbstractFetcher) Mockito.mock(AbstractFetcher.class);
        Mockito.when(abstractFetcher.snapshotCurrentState()).thenReturn(hashMap, new HashMap[]{hashMap2, hashMap3});
        LinkedMap linkedMap = new LinkedMap();
        FlinkKafkaConsumerBase consumer = getConsumer(abstractFetcher, linkedMap, true);
        StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when(Boolean.valueOf(streamingRuntimeContext.isCheckpointingEnabled())).thenReturn(true);
        consumer.setRuntimeContext(streamingRuntimeContext);
        Assert.assertEquals(0L, linkedMap.size());
        OperatorStateStore operatorStateStore = (OperatorStateStore) Mockito.mock(OperatorStateStore.class);
        TestingListState testingListState = new TestingListState();
        Mockito.when(operatorStateStore.getSerializableListState((String) Matchers.any(String.class))).thenReturn(testingListState);
        StateInitializationContext stateInitializationContext = (StateInitializationContext) Mockito.mock(StateInitializationContext.class);
        Mockito.when(stateInitializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
        Mockito.when(Boolean.valueOf(stateInitializationContext.isRestored())).thenReturn(false, new Boolean[]{true, true, true});
        consumer.initializeState(stateInitializationContext);
        consumer.open(new Configuration());
        consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138L, 138L));
        HashMap hashMap4 = new HashMap();
        for (Tuple2 tuple2 : testingListState.m8get()) {
            hashMap4.put(tuple2.f0, tuple2.f1);
        }
        Assert.assertEquals(hashMap, hashMap4);
        Assert.assertEquals(1L, linkedMap.size());
        Assert.assertEquals(hashMap, linkedMap.get(138L));
        consumer.snapshotState(new StateSnapshotContextSynchronousImpl(140L, 140L));
        HashMap hashMap5 = new HashMap();
        for (Tuple2 tuple22 : testingListState.m8get()) {
            hashMap5.put(tuple22.f0, tuple22.f1);
        }
        Assert.assertEquals(hashMap2, hashMap5);
        Assert.assertEquals(2L, linkedMap.size());
        Assert.assertEquals(hashMap2, linkedMap.get(140L));
        consumer.notifyCheckpointComplete(138L);
        Assert.assertEquals(1L, linkedMap.size());
        Assert.assertTrue(linkedMap.containsKey(140L));
        consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141L, 141L));
        HashMap hashMap6 = new HashMap();
        for (Tuple2 tuple23 : testingListState.m8get()) {
            hashMap6.put(tuple23.f0, tuple23.f1);
        }
        Assert.assertEquals(hashMap3, hashMap6);
        Assert.assertEquals(2L, linkedMap.size());
        Assert.assertEquals(hashMap3, linkedMap.get(141L));
        consumer.notifyCheckpointComplete(141L);
        Assert.assertEquals(0L, linkedMap.size());
        consumer.notifyCheckpointComplete(666L);
        Assert.assertEquals(0L, linkedMap.size());
        OperatorStateStore operatorStateStore2 = (OperatorStateStore) Mockito.mock(OperatorStateStore.class);
        TestingListState testingListState2 = new TestingListState();
        Mockito.when(operatorStateStore2.getListState((ListStateDescriptor) Matchers.any(ListStateDescriptor.class))).thenReturn(testingListState2);
        for (int i = 100; i < 600; i++) {
            consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i));
            testingListState2.clear();
        }
        Assert.assertEquals(100L, linkedMap.size());
        consumer.notifyCheckpointComplete(598L);
        Assert.assertEquals(1L, linkedMap.size());
        consumer.notifyCheckpointComplete(590L);
        consumer.notifyCheckpointComplete(599L);
        Assert.assertEquals(0L, linkedMap.size());
    }

    @Test
    public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(new KafkaTopicPartition("abc", 13), 16768L);
        hashMap.put(new KafkaTopicPartition("def", 7), 987654321L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new KafkaTopicPartition("abc", 13), 16770L);
        hashMap2.put(new KafkaTopicPartition("def", 7), 987654329L);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(new KafkaTopicPartition("abc", 13), 16780L);
        hashMap3.put(new KafkaTopicPartition("def", 7), 987654377L);
        AbstractFetcher abstractFetcher = (AbstractFetcher) Mockito.mock(AbstractFetcher.class);
        Mockito.when(abstractFetcher.snapshotCurrentState()).thenReturn(hashMap, new HashMap[]{hashMap2, hashMap3});
        FlinkKafkaConsumerBase consumer = getConsumer(abstractFetcher, new LinkedMap(), true);
        StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when(Boolean.valueOf(streamingRuntimeContext.isCheckpointingEnabled())).thenReturn(true);
        consumer.setRuntimeContext(streamingRuntimeContext);
        consumer.setCommitOffsetsOnCheckpoints(false);
        Assert.assertEquals(0L, r0.size());
        OperatorStateStore operatorStateStore = (OperatorStateStore) Mockito.mock(OperatorStateStore.class);
        TestingListState testingListState = new TestingListState();
        Mockito.when(operatorStateStore.getSerializableListState((String) Matchers.any(String.class))).thenReturn(testingListState);
        StateInitializationContext stateInitializationContext = (StateInitializationContext) Mockito.mock(StateInitializationContext.class);
        Mockito.when(stateInitializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
        Mockito.when(Boolean.valueOf(stateInitializationContext.isRestored())).thenReturn(false, new Boolean[]{true, true, true});
        consumer.initializeState(stateInitializationContext);
        consumer.open(new Configuration());
        consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138L, 138L));
        HashMap hashMap4 = new HashMap();
        for (Tuple2 tuple2 : testingListState.m8get()) {
            hashMap4.put(tuple2.f0, tuple2.f1);
        }
        Assert.assertEquals(hashMap, hashMap4);
        Assert.assertEquals(0L, r0.size());
        consumer.snapshotState(new StateSnapshotContextSynchronousImpl(140L, 140L));
        HashMap hashMap5 = new HashMap();
        for (Tuple2 tuple22 : testingListState.m8get()) {
            hashMap5.put(tuple22.f0, tuple22.f1);
        }
        Assert.assertEquals(hashMap2, hashMap5);
        Assert.assertEquals(0L, r0.size());
        consumer.notifyCheckpointComplete(138L);
        ((AbstractFetcher) Mockito.verify(abstractFetcher, Mockito.never())).commitInternalOffsetsToKafka(Matchers.anyMap(), (KafkaCommitCallback) Matchers.any(KafkaCommitCallback.class));
        consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141L, 141L));
        HashMap hashMap6 = new HashMap();
        for (Tuple2 tuple23 : testingListState.m8get()) {
            hashMap6.put(tuple23.f0, tuple23.f1);
        }
        Assert.assertEquals(hashMap3, hashMap6);
        Assert.assertEquals(0L, r0.size());
        consumer.notifyCheckpointComplete(141L);
        ((AbstractFetcher) Mockito.verify(abstractFetcher, Mockito.never())).commitInternalOffsetsToKafka(Matchers.anyMap(), (KafkaCommitCallback) Matchers.any(KafkaCommitCallback.class));
        consumer.notifyCheckpointComplete(666L);
        ((AbstractFetcher) Mockito.verify(abstractFetcher, Mockito.never())).commitInternalOffsetsToKafka(Matchers.anyMap(), (KafkaCommitCallback) Matchers.any(KafkaCommitCallback.class));
        OperatorStateStore operatorStateStore2 = (OperatorStateStore) Mockito.mock(OperatorStateStore.class);
        TestingListState testingListState2 = new TestingListState();
        Mockito.when(operatorStateStore2.getListState((ListStateDescriptor) Matchers.any(ListStateDescriptor.class))).thenReturn(testingListState2);
        for (int i = 100; i < 600; i++) {
            consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i));
            testingListState2.clear();
        }
        Assert.assertEquals(0L, r0.size());
        consumer.notifyCheckpointComplete(598L);
        ((AbstractFetcher) Mockito.verify(abstractFetcher, Mockito.never())).commitInternalOffsetsToKafka(Matchers.anyMap(), (KafkaCommitCallback) Matchers.any(KafkaCommitCallback.class));
        consumer.notifyCheckpointComplete(590L);
        ((AbstractFetcher) Mockito.verify(abstractFetcher, Mockito.never())).commitInternalOffsetsToKafka(Matchers.anyMap(), (KafkaCommitCallback) Matchers.any(KafkaCommitCallback.class));
        consumer.notifyCheckpointComplete(599L);
        ((AbstractFetcher) Mockito.verify(abstractFetcher, Mockito.never())).commitInternalOffsetsToKafka(Matchers.anyMap(), (KafkaCommitCallback) Matchers.any(KafkaCommitCallback.class));
    }

    @Test
    public void testScaleUp() throws Exception {
        testRescaling(5, 2, 15, 1000);
    }

    @Test
    public void testScaleDown() throws Exception {
        testRescaling(5, 10, 2, 100);
    }

    private void testRescaling(int i, int i2, int i3, int i4) throws Exception {
        Preconditions.checkArgument(i4 >= i2, "invalid test case for Kafka repartitioning; Kafka only allows increasing partitions.");
        ArrayList arrayList = new ArrayList();
        for (int i5 = 0; i5 < i2; i5++) {
            arrayList.add(new KafkaTopicPartition("test-topic", i5));
        }
        SourceFunction[] sourceFunctionArr = new DummyFlinkKafkaConsumer[i];
        AbstractStreamOperatorTestHarness[] abstractStreamOperatorTestHarnessArr = new AbstractStreamOperatorTestHarness[i];
        for (int i6 = 0; i6 < i; i6++) {
            sourceFunctionArr[i6] = new DummyFlinkKafkaConsumer(arrayList);
            abstractStreamOperatorTestHarnessArr[i6] = createTestHarness(sourceFunctionArr[i6], i, i6);
            abstractStreamOperatorTestHarnessArr[i6].initializeState((OperatorStateHandles) null);
            abstractStreamOperatorTestHarnessArr[i6].open();
        }
        HashMap hashMap = new HashMap();
        for (int i7 = 0; i7 < i; i7++) {
            Map subscribedPartitionsToStartOffsets = sourceFunctionArr[i7].getSubscribedPartitionsToStartOffsets();
            Iterator it = subscribedPartitionsToStartOffsets.keySet().iterator();
            while (it.hasNext()) {
                Assert.assertThat(hashMap, IsNot.not(IsMapContaining.hasKey((KafkaTopicPartition) it.next())));
            }
            hashMap.putAll(subscribedPartitionsToStartOffsets);
        }
        Assert.assertThat(hashMap.values(), org.hamcrest.Matchers.hasSize(i2));
        Assert.assertThat(arrayList, org.hamcrest.Matchers.everyItem(IsIn.isIn(hashMap.keySet())));
        OperatorStateHandles[] operatorStateHandlesArr = new OperatorStateHandles[i];
        for (int i8 = 0; i8 < i; i8++) {
            operatorStateHandlesArr[i8] = abstractStreamOperatorTestHarnessArr[i8].snapshot(0L, 0L);
        }
        OperatorStateHandles repackageState = AbstractStreamOperatorTestHarness.repackageState(operatorStateHandlesArr);
        ArrayList arrayList2 = new ArrayList();
        for (int i9 = 0; i9 < i4; i9++) {
            arrayList2.add(new KafkaTopicPartition("test-topic", i9));
        }
        SourceFunction[] sourceFunctionArr2 = new DummyFlinkKafkaConsumer[i3];
        AbstractStreamOperatorTestHarness[] abstractStreamOperatorTestHarnessArr2 = new AbstractStreamOperatorTestHarness[i3];
        for (int i10 = 0; i10 < i3; i10++) {
            sourceFunctionArr2[i10] = new DummyFlinkKafkaConsumer(arrayList2);
            abstractStreamOperatorTestHarnessArr2[i10] = createTestHarness(sourceFunctionArr2[i10], i3, i10);
            abstractStreamOperatorTestHarnessArr2[i10].initializeState(repackageState);
            abstractStreamOperatorTestHarnessArr2[i10].open();
        }
        HashMap hashMap2 = new HashMap();
        for (int i11 = 0; i11 < i3; i11++) {
            Map subscribedPartitionsToStartOffsets2 = sourceFunctionArr2[i11].getSubscribedPartitionsToStartOffsets();
            Iterator it2 = subscribedPartitionsToStartOffsets2.keySet().iterator();
            while (it2.hasNext()) {
                Assert.assertThat(hashMap2, IsNot.not(IsMapContaining.hasKey((KafkaTopicPartition) it2.next())));
            }
            hashMap2.putAll(subscribedPartitionsToStartOffsets2);
        }
        Assert.assertThat(hashMap2.values(), org.hamcrest.Matchers.hasSize(i2));
        Assert.assertThat(arrayList, org.hamcrest.Matchers.everyItem(IsIn.isIn(hashMap2.keySet())));
    }

    @Test
    public void testRestoredStateInsensitiveToMissingPartitions() throws Exception {
        List<KafkaTopicPartition> asList = Arrays.asList(new KafkaTopicPartition("test-topic", 0), new KafkaTopicPartition("test-topic", 1), new KafkaTopicPartition("test-topic", 2));
        testRestoredStateInsensitiveToFetchedPartitions(asList, asList.subList(0, 2));
    }

    @Test
    public void testRestoredStateInsensitiveToNewPartitions() throws Exception {
        testRestoredStateInsensitiveToFetchedPartitions(Arrays.asList(new KafkaTopicPartition("test-topic", 0), new KafkaTopicPartition("test-topic", 1), new KafkaTopicPartition("test-topic", 2)), Arrays.asList(new KafkaTopicPartition("test-topic", 0), new KafkaTopicPartition("test-topic", 1), new KafkaTopicPartition("test-topic", 2), new KafkaTopicPartition("test-topic", 3), new KafkaTopicPartition("test-topic", 4)));
    }

    @Test
    public void testRestoredStateInsensitiveToDifferentPartitionOrdering() throws Exception {
        testRestoredStateInsensitiveToFetchedPartitions(Arrays.asList(new KafkaTopicPartition("test-topic", 0), new KafkaTopicPartition("test-topic", 1), new KafkaTopicPartition("test-topic", 2)), Arrays.asList(new KafkaTopicPartition("test-topic", 0), new KafkaTopicPartition("test-topic", 2), new KafkaTopicPartition("test-topic", 1)));
    }

    private void testRestoredStateInsensitiveToFetchedPartitions(List<KafkaTopicPartition> list, List<KafkaTopicPartition> list2) throws Exception {
        RuntimeContext runtimeContext = (StreamingRuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when(Boolean.valueOf(runtimeContext.isCheckpointingEnabled())).thenReturn(true);
        Mockito.when(Integer.valueOf(runtimeContext.getNumberOfParallelSubtasks())).thenReturn(1);
        Mockito.when(Integer.valueOf(runtimeContext.getIndexOfThisSubtask())).thenReturn(0);
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer(list);
        dummyFlinkKafkaConsumer.setRuntimeContext(runtimeContext);
        TestingListState testingListState = new TestingListState();
        OperatorStateStore operatorStateStore = (OperatorStateStore) Mockito.mock(OperatorStateStore.class);
        Mockito.when(operatorStateStore.getSerializableListState((String) Matchers.any(String.class))).thenReturn(testingListState);
        StateInitializationContext stateInitializationContext = (StateInitializationContext) Mockito.mock(StateInitializationContext.class);
        Mockito.when(stateInitializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
        Mockito.when(Boolean.valueOf(stateInitializationContext.isRestored())).thenReturn(false);
        dummyFlinkKafkaConsumer.initializeState(stateInitializationContext);
        dummyFlinkKafkaConsumer.open(new Configuration());
        dummyFlinkKafkaConsumer.snapshotState(new StateSnapshotContextSynchronousImpl(141L, 141L));
        HashMap hashMap = new HashMap();
        for (Tuple2 tuple2 : testingListState.m8get()) {
            hashMap.put(tuple2.f0, tuple2.f1);
        }
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer2 = new DummyFlinkKafkaConsumer(list2);
        dummyFlinkKafkaConsumer2.setRuntimeContext(runtimeContext);
        StateInitializationContext stateInitializationContext2 = (StateInitializationContext) Mockito.mock(StateInitializationContext.class);
        Mockito.when(stateInitializationContext2.getOperatorStateStore()).thenReturn(operatorStateStore);
        Mockito.when(Boolean.valueOf(stateInitializationContext2.isRestored())).thenReturn(true);
        dummyFlinkKafkaConsumer2.initializeState(stateInitializationContext2);
        dummyFlinkKafkaConsumer2.open(new Configuration());
        dummyFlinkKafkaConsumer2.snapshotState(new StateSnapshotContextSynchronousImpl(141L, 141L));
        HashMap hashMap2 = new HashMap();
        for (Tuple2 tuple22 : testingListState.m8get()) {
            hashMap2.put(tuple22.f0, tuple22.f1);
        }
        Assert.assertEquals(hashMap, hashMap2);
    }

    private static <T> FlinkKafkaConsumerBase<T> getConsumer(AbstractFetcher<T, ?> abstractFetcher, LinkedMap linkedMap, boolean z) throws Exception {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer();
        StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when(Boolean.valueOf(streamingRuntimeContext.isCheckpointingEnabled())).thenReturn(true);
        dummyFlinkKafkaConsumer.setRuntimeContext(streamingRuntimeContext);
        Field declaredField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
        declaredField.setAccessible(true);
        declaredField.set(dummyFlinkKafkaConsumer, abstractFetcher);
        Field declaredField2 = FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit");
        declaredField2.setAccessible(true);
        declaredField2.set(dummyFlinkKafkaConsumer, linkedMap);
        Field declaredField3 = FlinkKafkaConsumerBase.class.getDeclaredField("running");
        declaredField3.setAccessible(true);
        declaredField3.set(dummyFlinkKafkaConsumer, Boolean.valueOf(z));
        return dummyFlinkKafkaConsumer;
    }

    private static <T> AbstractStreamOperatorTestHarness<T> createTestHarness(SourceFunction<T> sourceFunction, int i, int i2) throws Exception {
        AbstractStreamOperatorTestHarness<T> abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness<>(new StreamSource(sourceFunction), 16383, i, i2);
        abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
        return abstractStreamOperatorTestHarness;
    }
}
