package org.apache.flink.streaming.connectors.kafka;

import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.connectors.kafka.testutils.TestPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIn;
import org.hamcrest.collection.IsMapContaining;
import org.hamcrest.core.IsNot;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.class */
public class FlinkKafkaConsumerBaseTest extends TestLogger {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest$DummyFlinkKafkaConsumer.class */
    public static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
        private static final long serialVersionUID = 1;
        private SupplierWithException<AbstractFetcher<T, ?>, Exception> testFetcherSupplier;
        private AbstractPartitionDiscoverer testPartitionDiscoverer;
        private boolean isAutoCommitEnabled;

        DummyFlinkKafkaConsumer() {
            this(false);
        }

        DummyFlinkKafkaConsumer(boolean z) {
            this((AbstractFetcher) Mockito.mock(AbstractFetcher.class), (AbstractPartitionDiscoverer) Mockito.mock(AbstractPartitionDiscoverer.class), z);
        }

        DummyFlinkKafkaConsumer(AbstractPartitionDiscoverer abstractPartitionDiscoverer) {
            this((AbstractFetcher) Mockito.mock(AbstractFetcher.class), abstractPartitionDiscoverer, false);
        }

        DummyFlinkKafkaConsumer(SupplierWithException<AbstractFetcher<T, ?>, Exception> supplierWithException, AbstractPartitionDiscoverer abstractPartitionDiscoverer, long j) {
            this((SupplierWithException) supplierWithException, abstractPartitionDiscoverer, false, j);
        }

        DummyFlinkKafkaConsumer(AbstractFetcher<T, ?> abstractFetcher, AbstractPartitionDiscoverer abstractPartitionDiscoverer, boolean z) {
            this(abstractFetcher, abstractPartitionDiscoverer, z, Long.MIN_VALUE);
        }

        DummyFlinkKafkaConsumer(AbstractFetcher<T, ?> abstractFetcher, AbstractPartitionDiscoverer abstractPartitionDiscoverer, boolean z, long j) {
            this(() -> {
                return abstractFetcher;
            }, abstractPartitionDiscoverer, z, j);
        }

        DummyFlinkKafkaConsumer(SupplierWithException<AbstractFetcher<T, ?>, Exception> supplierWithException, AbstractPartitionDiscoverer abstractPartitionDiscoverer, boolean z, long j) {
            super(Collections.singletonList("dummy-topic"), (Pattern) null, (KeyedDeserializationSchema) Mockito.mock(KeyedDeserializationSchema.class), j, false);
            this.testFetcherSupplier = supplierWithException;
            this.testPartitionDiscoverer = abstractPartitionDiscoverer;
            this.isAutoCommitEnabled = z;
        }

        protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, StreamingRuntimeContext streamingRuntimeContext, OffsetCommitMode offsetCommitMode, MetricGroup metricGroup, boolean z) throws Exception {
            return (AbstractFetcher) this.testFetcherSupplier.get();
        }

        protected AbstractPartitionDiscoverer createPartitionDiscoverer(KafkaTopicsDescriptor kafkaTopicsDescriptor, int i, int i2) {
            return this.testPartitionDiscoverer;
        }

        protected boolean getIsAutoCommitEnabled() {
            return this.isAutoCommitEnabled;
        }

        protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(Collection<KafkaTopicPartition> collection, long j) {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest$DummyPartitionDiscoverer.class */
    private static class DummyPartitionDiscoverer extends AbstractPartitionDiscoverer {
        private final List<String> allTopics;
        private final List<KafkaTopicPartition> allPartitions;
        private volatile boolean closed;
        private volatile boolean wakedUp;

        private DummyPartitionDiscoverer() {
            super(new KafkaTopicsDescriptor(Collections.singletonList("foo"), (Pattern) null), 0, 1);
            this.closed = false;
            this.wakedUp = false;
            this.allTopics = Collections.singletonList("foo");
            this.allPartitions = Collections.singletonList(new KafkaTopicPartition("foo", 0));
        }

        protected void initializeConnections() {
        }

        protected void wakeupConnections() {
            this.wakedUp = true;
        }

        protected void closeConnections() {
            this.closed = true;
        }

        protected List<String> getAllTopics() throws AbstractPartitionDiscoverer.WakeupException {
            checkState();
            return this.allTopics;
        }

        protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> list) throws AbstractPartitionDiscoverer.WakeupException {
            checkState();
            return this.allPartitions;
        }

        private void checkState() throws AbstractPartitionDiscoverer.WakeupException {
            if (this.wakedUp || this.closed) {
                throw new AbstractPartitionDiscoverer.WakeupException();
            }
        }

        boolean isClosed() {
            return this.closed;
        }

        public boolean isWakedUp() {
            return this.wakedUp;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest$FailingPartitionDiscoverer.class */
    private static class FailingPartitionDiscoverer extends AbstractPartitionDiscoverer {
        private volatile boolean closed;
        private final RuntimeException failureCause;

        public FailingPartitionDiscoverer(RuntimeException runtimeException) {
            super(new KafkaTopicsDescriptor(Arrays.asList("foo"), (Pattern) null), 0, 1);
            this.closed = false;
            this.failureCause = runtimeException;
        }

        protected void initializeConnections() throws Exception {
            this.closed = false;
        }

        protected void wakeupConnections() {
        }

        protected void closeConnections() throws Exception {
            this.closed = true;
        }

        protected List<String> getAllTopics() throws AbstractPartitionDiscoverer.WakeupException {
            return null;
        }

        protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> list) throws AbstractPartitionDiscoverer.WakeupException {
            return null;
        }

        public List<KafkaTopicPartition> discoverPartitions() throws AbstractPartitionDiscoverer.WakeupException, AbstractPartitionDiscoverer.ClosedException {
            throw this.failureCause;
        }

        public boolean isClosed() {
            return this.closed;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest$MockFetcher.class */
    private static class MockFetcher<T> extends AbstractFetcher<T, Object> {
        private final OneShotLatch runLatch;
        private final OneShotLatch stopLatch;
        private final ArrayDeque<HashMap<KafkaTopicPartition, Long>> stateSnapshotsToReturn;
        private Map<KafkaTopicPartition, Long> lastCommittedOffsets;
        private int commitCount;

        @SafeVarargs
        private MockFetcher(HashMap<KafkaTopicPartition, Long>... hashMapArr) throws Exception {
            super(new TestSourceContext(), new HashMap(), (SerializedValue) null, (SerializedValue) null, new TestProcessingTimeService(), 0L, MockFetcher.class.getClassLoader(), new UnregisteredMetricsGroup(), false);
            this.runLatch = new OneShotLatch();
            this.stopLatch = new OneShotLatch();
            this.stateSnapshotsToReturn = new ArrayDeque<>();
            this.commitCount = 0;
            this.stateSnapshotsToReturn.addAll(Arrays.asList(hashMapArr));
        }

        protected void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> map, @Nonnull KafkaCommitCallback kafkaCommitCallback) throws Exception {
            this.lastCommittedOffsets = map;
            this.commitCount++;
            kafkaCommitCallback.onSuccess();
        }

        public void runFetchLoop() throws Exception {
            this.runLatch.trigger();
            this.stopLatch.await();
        }

        public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
            Preconditions.checkState(!this.stateSnapshotsToReturn.isEmpty());
            return this.stateSnapshotsToReturn.poll();
        }

        protected Object createKafkaPartitionHandle(KafkaTopicPartition kafkaTopicPartition) {
            throw new UnsupportedOperationException();
        }

        public void cancel() {
            this.stopLatch.trigger();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void waitUntilRun() throws InterruptedException {
            this.runLatch.await();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Map<KafkaTopicPartition, Long> getAndClearLastCommittedOffsets() {
            Map<KafkaTopicPartition, Long> map = this.lastCommittedOffsets;
            this.lastCommittedOffsets = null;
            return map;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getCommitCount() {
            return this.commitCount;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest$MockFunctionInitializationContext.class */
    public static class MockFunctionInitializationContext implements FunctionInitializationContext {
        private final boolean isRestored;
        private final OperatorStateStore operatorStateStore;

        private MockFunctionInitializationContext(boolean z, OperatorStateStore operatorStateStore) {
            this.isRestored = z;
            this.operatorStateStore = operatorStateStore;
        }

        public boolean isRestored() {
            return this.isRestored;
        }

        public OperatorStateStore getOperatorStateStore() {
            return this.operatorStateStore;
        }

        public KeyedStateStore getKeyedStateStore() {
            throw new UnsupportedOperationException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest$MockOperatorStateStore.class */
    public static class MockOperatorStateStore implements OperatorStateStore {
        private final ListState<?> mockRestoredUnionListState;

        private MockOperatorStateStore(ListState<?> listState) {
            this.mockRestoredUnionListState = listState;
        }

        public <S> ListState<S> getUnionListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
            return (ListState<S>) this.mockRestoredUnionListState;
        }

        public <T extends Serializable> ListState<T> getSerializableListState(String str) throws Exception {
            return new TestingListState();
        }

        public <S> ListState<S> getOperatorState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> mapStateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        public <S> ListState<S> getListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        public Set<String> getRegisteredStateNames() {
            throw new UnsupportedOperationException();
        }

        public Set<String> getRegisteredBroadcastStateNames() {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest$TestingFetcher.class */
    private static class TestingFetcher<T, KPH> extends AbstractFetcher<T, KPH> {
        private volatile boolean isRunning;

        protected TestingFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, ProcessingTimeService processingTimeService, long j, ClassLoader classLoader, MetricGroup metricGroup, boolean z) throws Exception {
            super(sourceContext, map, serializedValue, serializedValue2, processingTimeService, j, classLoader, metricGroup, z);
            this.isRunning = true;
        }

        public void runFetchLoop() throws Exception {
            while (this.isRunning) {
                Thread.sleep(10L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        protected void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> map, @Nonnull KafkaCommitCallback kafkaCommitCallback) throws Exception {
        }

        protected KPH createKafkaPartitionHandle(KafkaTopicPartition kafkaTopicPartition) {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest$TestingFlinkKafkaConsumer.class */
    private static class TestingFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
        private static final long serialVersionUID = 935384661907656996L;
        private final AbstractPartitionDiscoverer partitionDiscoverer;

        TestingFlinkKafkaConsumer(AbstractPartitionDiscoverer abstractPartitionDiscoverer, long j) {
            super(Collections.singletonList("dummy-topic"), (Pattern) null, (KeyedDeserializationSchema) Mockito.mock(KeyedDeserializationSchema.class), j, false);
            this.partitionDiscoverer = abstractPartitionDiscoverer;
        }

        protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, StreamingRuntimeContext streamingRuntimeContext, OffsetCommitMode offsetCommitMode, MetricGroup metricGroup, boolean z) throws Exception {
            return new TestingFetcher(sourceContext, map, serializedValue, serializedValue2, streamingRuntimeContext.getProcessingTimeService(), 0L, getClass().getClassLoader(), metricGroup, z);
        }

        protected AbstractPartitionDiscoverer createPartitionDiscoverer(KafkaTopicsDescriptor kafkaTopicsDescriptor, int i, int i2) {
            return this.partitionDiscoverer;
        }

        protected boolean getIsAutoCommitEnabled() {
            return false;
        }

        protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(Collection<KafkaTopicPartition> collection, long j) {
            throw new UnsupportedOperationException("fetchOffsetsWithTimestamp is not supported");
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest$TestingListState.class */
    private static final class TestingListState<T> implements ListState<T> {
        private final List<T> list;
        private boolean clearCalled;

        private TestingListState() {
            this.list = new ArrayList();
            this.clearCalled = false;
        }

        public void clear() {
            this.list.clear();
            this.clearCalled = true;
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Iterable<T> m2get() throws Exception {
            return this.list;
        }

        public void add(T t) throws Exception {
            Preconditions.checkNotNull(t, "You cannot add null to a ListState.");
            this.list.add(t);
        }

        public List<T> getList() {
            return this.list;
        }

        boolean isClearCalled() {
            return this.clearCalled;
        }

        public void update(List<T> list) throws Exception {
            clear();
            addAll(list);
        }

        public void addAll(List<T> list) throws Exception {
            if (list != null) {
                list.forEach(obj -> {
                    Preconditions.checkNotNull(obj, "You cannot add null to a ListState.");
                });
                this.list.addAll(list);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest$WakeupBeforeCloseTestingPartitionDiscoverer.class */
    private static class WakeupBeforeCloseTestingPartitionDiscoverer extends DummyPartitionDiscoverer {
        private WakeupBeforeCloseTestingPartitionDiscoverer() {
            super();
        }

        @Override // org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseTest.DummyPartitionDiscoverer
        protected void closeConnections() {
            if (!isWakedUp()) {
                Assert.fail("Partition discoverer should have been waked up first before closing.");
            }
            super.closeConnections();
        }
    }

    @Test
    public void testEitherWatermarkExtractor() {
        try {
            new DummyFlinkKafkaConsumer().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null);
            Assert.fail();
        } catch (NullPointerException e) {
        }
        try {
            new DummyFlinkKafkaConsumer().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null);
            Assert.fail();
        } catch (NullPointerException e2) {
        }
        AssignerWithPeriodicWatermarks assignerWithPeriodicWatermarks = (AssignerWithPeriodicWatermarks) Mockito.mock(AssignerWithPeriodicWatermarks.class);
        AssignerWithPunctuatedWatermarks assignerWithPunctuatedWatermarks = (AssignerWithPunctuatedWatermarks) Mockito.mock(AssignerWithPunctuatedWatermarks.class);
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer();
        dummyFlinkKafkaConsumer.assignTimestampsAndWatermarks(assignerWithPeriodicWatermarks);
        try {
            dummyFlinkKafkaConsumer.assignTimestampsAndWatermarks(assignerWithPunctuatedWatermarks);
            Assert.fail();
        } catch (IllegalStateException e3) {
        }
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer2 = new DummyFlinkKafkaConsumer();
        dummyFlinkKafkaConsumer2.assignTimestampsAndWatermarks(assignerWithPunctuatedWatermarks);
        try {
            dummyFlinkKafkaConsumer2.assignTimestampsAndWatermarks(assignerWithPeriodicWatermarks);
            Assert.fail();
        } catch (IllegalStateException e4) {
        }
    }

    @Test
    public void ignoreCheckpointWhenNotRunning() throws Exception {
        MockFetcher mockFetcher = new MockFetcher(new HashMap[0]);
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer((AbstractFetcher) mockFetcher, (AbstractPartitionDiscoverer) Mockito.mock(AbstractPartitionDiscoverer.class), false);
        TestingListState testingListState = new TestingListState();
        setupConsumer(dummyFlinkKafkaConsumer, false, testingListState, true, 0, 1);
        dummyFlinkKafkaConsumer.snapshotState(new StateSnapshotContextSynchronousImpl(1L, 1L));
        Assert.assertFalse(testingListState.m2get().iterator().hasNext());
        dummyFlinkKafkaConsumer.notifyCheckpointComplete(1L);
        Assert.assertNull(mockFetcher.getAndClearLastCommittedOffsets());
        Assert.assertEquals(0L, mockFetcher.getCommitCount());
    }

    @Test
    public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer();
        TestingListState testingListState = new TestingListState();
        setupConsumer(dummyFlinkKafkaConsumer, true, testingListState, true, 0, 1);
        dummyFlinkKafkaConsumer.snapshotState(new StateSnapshotContextSynchronousImpl(17L, 17L));
        Assert.assertTrue(testingListState.isClearCalled());
        HashSet hashSet = new HashSet();
        Iterator it = testingListState.m2get().iterator();
        while (it.hasNext()) {
            hashSet.add((Serializable) it.next());
        }
        int i = 0;
        Iterator it2 = testingListState.m2get().iterator();
        while (it2.hasNext()) {
            Assert.assertTrue(hashSet.contains((Serializable) it2.next()));
            i++;
        }
        Assert.assertEquals(hashSet.size(), i);
    }

    @Test
    public void testConfigureOnCheckpointsCommitMode() throws Exception {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer(true);
        setupConsumer(dummyFlinkKafkaConsumer, false, null, true, 0, 1);
        Assert.assertEquals(OffsetCommitMode.ON_CHECKPOINTS, dummyFlinkKafkaConsumer.getOffsetCommitMode());
    }

    @Test
    public void testConfigureAutoCommitMode() throws Exception {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer(true);
        setupConsumer(dummyFlinkKafkaConsumer);
        Assert.assertEquals(OffsetCommitMode.KAFKA_PERIODIC, dummyFlinkKafkaConsumer.getOffsetCommitMode());
    }

    @Test
    public void testConfigureDisableOffsetCommitWithCheckpointing() throws Exception {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer(true);
        dummyFlinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
        setupConsumer(dummyFlinkKafkaConsumer, false, null, true, 0, 1);
        Assert.assertEquals(OffsetCommitMode.DISABLED, dummyFlinkKafkaConsumer.getOffsetCommitMode());
    }

    @Test
    public void testConfigureDisableOffsetCommitWithoutCheckpointing() throws Exception {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer(false);
        setupConsumer(dummyFlinkKafkaConsumer);
        Assert.assertEquals(OffsetCommitMode.DISABLED, dummyFlinkKafkaConsumer.getOffsetCommitMode());
    }

    @Test
    public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(new KafkaTopicPartition("abc", 13), 16768L);
        hashMap.put(new KafkaTopicPartition("def", 7), 987654321L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new KafkaTopicPartition("abc", 13), 16770L);
        hashMap2.put(new KafkaTopicPartition("def", 7), 987654329L);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(new KafkaTopicPartition("abc", 13), 16780L);
        hashMap3.put(new KafkaTopicPartition("def", 7), 987654377L);
        MockFetcher mockFetcher = new MockFetcher(new HashMap[]{hashMap, hashMap2, hashMap3});
        final DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer((AbstractFetcher) mockFetcher, (AbstractPartitionDiscoverer) Mockito.mock(AbstractPartitionDiscoverer.class), false);
        TestingListState testingListState = new TestingListState();
        setupConsumer(dummyFlinkKafkaConsumer, false, testingListState, true, 0, 1);
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseTest.1
            public void go() throws Exception {
                dummyFlinkKafkaConsumer.run(new TestSourceContext());
            }
        };
        checkedThread.start();
        mockFetcher.waitUntilRun();
        Assert.assertEquals(0L, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().size());
        dummyFlinkKafkaConsumer.snapshotState(new StateSnapshotContextSynchronousImpl(138L, 138L));
        HashMap hashMap4 = new HashMap();
        for (Tuple2 tuple2 : testingListState.m2get()) {
            hashMap4.put(tuple2.f0, tuple2.f1);
        }
        Assert.assertEquals(hashMap, hashMap4);
        Assert.assertEquals(1L, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().size());
        Assert.assertEquals(hashMap, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().get(138L));
        dummyFlinkKafkaConsumer.snapshotState(new StateSnapshotContextSynchronousImpl(140L, 140L));
        HashMap hashMap5 = new HashMap();
        for (Tuple2 tuple22 : testingListState.m2get()) {
            hashMap5.put(tuple22.f0, tuple22.f1);
        }
        Assert.assertEquals(hashMap2, hashMap5);
        Assert.assertEquals(2L, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().size());
        Assert.assertEquals(hashMap2, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().get(140L));
        dummyFlinkKafkaConsumer.notifyCheckpointComplete(138L);
        Assert.assertEquals(1L, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().size());
        Assert.assertTrue(dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().containsKey(140L));
        Assert.assertEquals(hashMap, mockFetcher.getAndClearLastCommittedOffsets());
        Assert.assertEquals(1L, mockFetcher.getCommitCount());
        dummyFlinkKafkaConsumer.snapshotState(new StateSnapshotContextSynchronousImpl(141L, 141L));
        HashMap hashMap6 = new HashMap();
        for (Tuple2 tuple23 : testingListState.m2get()) {
            hashMap6.put(tuple23.f0, tuple23.f1);
        }
        Assert.assertEquals(hashMap3, hashMap6);
        Assert.assertEquals(2L, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().size());
        Assert.assertEquals(hashMap3, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().get(141L));
        dummyFlinkKafkaConsumer.notifyCheckpointComplete(141L);
        Assert.assertEquals(0L, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().size());
        Assert.assertEquals(hashMap3, mockFetcher.getAndClearLastCommittedOffsets());
        Assert.assertEquals(2L, mockFetcher.getCommitCount());
        dummyFlinkKafkaConsumer.notifyCheckpointComplete(666L);
        Assert.assertEquals(0L, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().size());
        Assert.assertNull(mockFetcher.getAndClearLastCommittedOffsets());
        Assert.assertEquals(2L, mockFetcher.getCommitCount());
        dummyFlinkKafkaConsumer.cancel();
        checkedThread.sync();
    }

    @Test
    public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(new KafkaTopicPartition("abc", 13), 16768L);
        hashMap.put(new KafkaTopicPartition("def", 7), 987654321L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new KafkaTopicPartition("abc", 13), 16770L);
        hashMap2.put(new KafkaTopicPartition("def", 7), 987654329L);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(new KafkaTopicPartition("abc", 13), 16780L);
        hashMap3.put(new KafkaTopicPartition("def", 7), 987654377L);
        MockFetcher mockFetcher = new MockFetcher(new HashMap[]{hashMap, hashMap2, hashMap3});
        final DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer((AbstractFetcher) mockFetcher, (AbstractPartitionDiscoverer) Mockito.mock(AbstractPartitionDiscoverer.class), false);
        dummyFlinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
        TestingListState testingListState = new TestingListState();
        setupConsumer(dummyFlinkKafkaConsumer, false, testingListState, true, 0, 1);
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseTest.2
            public void go() throws Exception {
                dummyFlinkKafkaConsumer.run(new TestSourceContext());
            }
        };
        checkedThread.start();
        mockFetcher.waitUntilRun();
        Assert.assertEquals(0L, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().size());
        dummyFlinkKafkaConsumer.snapshotState(new StateSnapshotContextSynchronousImpl(138L, 138L));
        HashMap hashMap4 = new HashMap();
        for (Tuple2 tuple2 : testingListState.m2get()) {
            hashMap4.put(tuple2.f0, tuple2.f1);
        }
        Assert.assertEquals(hashMap, hashMap4);
        Assert.assertEquals(0L, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().size());
        dummyFlinkKafkaConsumer.snapshotState(new StateSnapshotContextSynchronousImpl(140L, 140L));
        HashMap hashMap5 = new HashMap();
        for (Tuple2 tuple22 : testingListState.m2get()) {
            hashMap5.put(tuple22.f0, tuple22.f1);
        }
        Assert.assertEquals(hashMap2, hashMap5);
        Assert.assertEquals(0L, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().size());
        dummyFlinkKafkaConsumer.notifyCheckpointComplete(138L);
        Assert.assertEquals(0L, mockFetcher.getCommitCount());
        Assert.assertNull(mockFetcher.getAndClearLastCommittedOffsets());
        dummyFlinkKafkaConsumer.snapshotState(new StateSnapshotContextSynchronousImpl(141L, 141L));
        HashMap hashMap6 = new HashMap();
        for (Tuple2 tuple23 : testingListState.m2get()) {
            hashMap6.put(tuple23.f0, tuple23.f1);
        }
        Assert.assertEquals(hashMap3, hashMap6);
        Assert.assertEquals(0L, dummyFlinkKafkaConsumer.getPendingOffsetsToCommit().size());
        dummyFlinkKafkaConsumer.notifyCheckpointComplete(141L);
        Assert.assertEquals(0L, mockFetcher.getCommitCount());
        Assert.assertNull(mockFetcher.getAndClearLastCommittedOffsets());
        dummyFlinkKafkaConsumer.notifyCheckpointComplete(666L);
        Assert.assertEquals(0L, mockFetcher.getCommitCount());
        Assert.assertNull(mockFetcher.getAndClearLastCommittedOffsets());
        dummyFlinkKafkaConsumer.cancel();
        checkedThread.sync();
    }

    @Test
    public void testClosePartitionDiscovererWhenOpenThrowException() throws Exception {
        RuntimeException runtimeException = new RuntimeException((Throwable) new FlinkException("Test partition discoverer exception"));
        FailingPartitionDiscoverer failingPartitionDiscoverer = new FailingPartitionDiscoverer(runtimeException);
        testFailingConsumerLifecycle(new DummyFlinkKafkaConsumer<>(failingPartitionDiscoverer), runtimeException);
        Assert.assertTrue("partitionDiscoverer should be closed when consumer is closed", failingPartitionDiscoverer.isClosed());
    }

    @Test
    public void testClosePartitionDiscovererWhenCreateKafkaFetcherFails() throws Exception {
        FlinkException flinkException = new FlinkException("Create Kafka fetcher failure.");
        DummyPartitionDiscoverer dummyPartitionDiscoverer = new DummyPartitionDiscoverer();
        testFailingConsumerLifecycle(new DummyFlinkKafkaConsumer(() -> {
            throw flinkException;
        }, dummyPartitionDiscoverer, 100L), flinkException);
        Assert.assertTrue("partitionDiscoverer should be closed when consumer is closed", dummyPartitionDiscoverer.isClosed());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testClosePartitionDiscovererWhenKafkaFetcherFails() throws Exception {
        FlinkException flinkException = new FlinkException("Run Kafka fetcher failure.");
        WakeupBeforeCloseTestingPartitionDiscoverer wakeupBeforeCloseTestingPartitionDiscoverer = new WakeupBeforeCloseTestingPartitionDiscoverer();
        AbstractFetcher abstractFetcher = (AbstractFetcher) Mockito.mock(AbstractFetcher.class);
        ((AbstractFetcher) Mockito.doThrow(new Throwable[]{flinkException}).when(abstractFetcher)).runFetchLoop();
        testFailingConsumerLifecycle(new DummyFlinkKafkaConsumer(() -> {
            return abstractFetcher;
        }, wakeupBeforeCloseTestingPartitionDiscoverer, 100L), flinkException);
        Assert.assertTrue("partitionDiscoverer should be closed when consumer is closed", wakeupBeforeCloseTestingPartitionDiscoverer.isClosed());
    }

    private void testFailingConsumerLifecycle(FlinkKafkaConsumerBase<String> flinkKafkaConsumerBase, @Nonnull Exception exc) throws Exception {
        try {
            setupConsumer(flinkKafkaConsumerBase);
            flinkKafkaConsumerBase.run(new TestSourceContext());
            Assert.fail("Exception should have been thrown from open / run method of FlinkKafkaConsumerBase.");
        } catch (Exception e) {
            Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e, th -> {
                return th.equals(exc);
            }).isPresent()), Matchers.is(true));
        }
        flinkKafkaConsumerBase.close();
    }

    @Test
    public void testClosePartitionDiscovererWithCancellation() throws Exception {
        DummyPartitionDiscoverer dummyPartitionDiscoverer = new DummyPartitionDiscoverer();
        testNormalConsumerLifecycle(new TestingFlinkKafkaConsumer(dummyPartitionDiscoverer, 100L));
        Assert.assertTrue("partitionDiscoverer should be closed when consumer is closed", dummyPartitionDiscoverer.isClosed());
    }

    private void testNormalConsumerLifecycle(FlinkKafkaConsumerBase<String> flinkKafkaConsumerBase) throws Exception {
        setupConsumer(flinkKafkaConsumerBase);
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> {
            flinkKafkaConsumerBase.run(new TestSourceContext());
        }));
        flinkKafkaConsumerBase.close();
        runAsync.get();
    }

    private void setupConsumer(FlinkKafkaConsumerBase<String> flinkKafkaConsumerBase) throws Exception {
        setupConsumer(flinkKafkaConsumerBase, false, null, false, 0, 1);
    }

    @Test
    public void testScaleUp() throws Exception {
        testRescaling(5, 2, 8, 30);
    }

    @Test
    public void testScaleDown() throws Exception {
        testRescaling(5, 10, 2, 100);
    }

    private void testRescaling(int i, int i2, int i3, int i4) throws Exception {
        Preconditions.checkArgument(i4 >= i2, "invalid test case for Kafka repartitioning; Kafka only allows increasing partitions.");
        ArrayList arrayList = new ArrayList();
        for (int i5 = 0; i5 < i2; i5++) {
            arrayList.add(new KafkaTopicPartition("test-topic", i5));
        }
        SourceFunction[] sourceFunctionArr = new DummyFlinkKafkaConsumer[i];
        AbstractStreamOperatorTestHarness[] abstractStreamOperatorTestHarnessArr = new AbstractStreamOperatorTestHarness[i];
        for (int i6 = 0; i6 < i; i6++) {
            sourceFunctionArr[i6] = new DummyFlinkKafkaConsumer((AbstractFetcher) Mockito.mock(AbstractFetcher.class), (AbstractPartitionDiscoverer) new TestPartitionDiscoverer(new KafkaTopicsDescriptor(Collections.singletonList("test-topic"), (Pattern) null), i6, i, TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList("test-topic")), TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(arrayList)), false);
            abstractStreamOperatorTestHarnessArr[i6] = createTestHarness(sourceFunctionArr[i6], i, i6);
            abstractStreamOperatorTestHarnessArr[i6].initializeState((OperatorSubtaskState) null);
            abstractStreamOperatorTestHarnessArr[i6].open();
        }
        HashMap hashMap = new HashMap();
        for (int i7 = 0; i7 < i; i7++) {
            Map subscribedPartitionsToStartOffsets = sourceFunctionArr[i7].getSubscribedPartitionsToStartOffsets();
            Iterator it = subscribedPartitionsToStartOffsets.keySet().iterator();
            while (it.hasNext()) {
                Assert.assertThat(hashMap, IsNot.not(IsMapContaining.hasKey((KafkaTopicPartition) it.next())));
            }
            hashMap.putAll(subscribedPartitionsToStartOffsets);
        }
        Assert.assertThat(hashMap.values(), Matchers.hasSize(i2));
        Assert.assertThat(arrayList, Matchers.everyItem(IsIn.isIn(hashMap.keySet())));
        OperatorSubtaskState[] operatorSubtaskStateArr = new OperatorSubtaskState[i];
        for (int i8 = 0; i8 < i; i8++) {
            operatorSubtaskStateArr[i8] = abstractStreamOperatorTestHarnessArr[i8].snapshot(0L, 0L);
        }
        OperatorSubtaskState repackageState = AbstractStreamOperatorTestHarness.repackageState(operatorSubtaskStateArr);
        ArrayList arrayList2 = new ArrayList();
        for (int i9 = 0; i9 < i4; i9++) {
            arrayList2.add(new KafkaTopicPartition("test-topic", i9));
        }
        SourceFunction[] sourceFunctionArr2 = new DummyFlinkKafkaConsumer[i3];
        AbstractStreamOperatorTestHarness[] abstractStreamOperatorTestHarnessArr2 = new AbstractStreamOperatorTestHarness[i3];
        for (int i10 = 0; i10 < i3; i10++) {
            sourceFunctionArr2[i10] = new DummyFlinkKafkaConsumer((AbstractFetcher) Mockito.mock(AbstractFetcher.class), (AbstractPartitionDiscoverer) new TestPartitionDiscoverer(new KafkaTopicsDescriptor(Collections.singletonList("test-topic"), (Pattern) null), i10, i3, TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList("test-topic")), TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(arrayList2)), false);
            abstractStreamOperatorTestHarnessArr2[i10] = createTestHarness(sourceFunctionArr2[i10], i3, i10);
            abstractStreamOperatorTestHarnessArr2[i10].initializeState(repackageState);
            abstractStreamOperatorTestHarnessArr2[i10].open();
        }
        HashMap hashMap2 = new HashMap();
        for (int i11 = 0; i11 < i3; i11++) {
            Map subscribedPartitionsToStartOffsets2 = sourceFunctionArr2[i11].getSubscribedPartitionsToStartOffsets();
            Iterator it2 = subscribedPartitionsToStartOffsets2.keySet().iterator();
            while (it2.hasNext()) {
                Assert.assertThat(hashMap2, IsNot.not(IsMapContaining.hasKey((KafkaTopicPartition) it2.next())));
            }
            hashMap2.putAll(subscribedPartitionsToStartOffsets2);
        }
        Assert.assertThat(hashMap2.values(), Matchers.hasSize(i4));
        Assert.assertThat(arrayList, Matchers.everyItem(IsIn.isIn(hashMap2.keySet())));
    }

    private static <T> AbstractStreamOperatorTestHarness<T> createTestHarness(SourceFunction<T> sourceFunction, int i, int i2) throws Exception {
        AbstractStreamOperatorTestHarness<T> abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness<>(new StreamSource(sourceFunction), 16383, i, i2);
        abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
        return abstractStreamOperatorTestHarness;
    }

    private static <T, S> void setupConsumer(FlinkKafkaConsumerBase<T> flinkKafkaConsumerBase, boolean z, ListState<S> listState, boolean z2, int i, int i2) throws Exception {
        flinkKafkaConsumerBase.setRuntimeContext(new MockStreamingRuntimeContext(z2, i2, i));
        flinkKafkaConsumerBase.initializeState(new MockFunctionInitializationContext(z, new MockOperatorStateStore(listState)));
        flinkKafkaConsumerBase.open(new Configuration());
    }
}
