package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.class */
public class FlinkKafkaConsumerBaseMigrationTest {
    private final MigrationVersion flinkGenerateSavepointVersion = null;
    private static final HashMap<KafkaTopicPartition, Long> PARTITION_STATE = new HashMap<>();
    private static final List<String> TOPICS;
    private final MigrationVersion testMigrateVersion;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest$DummyFlinkKafkaConsumer.class */
    public static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
        private static final long serialVersionUID = 1;
        private final List<KafkaTopicPartition> partitions;
        private final AbstractFetcher<T, ?> fetcher;

        DummyFlinkKafkaConsumer(AbstractFetcher<T, ?> abstractFetcher, List<String> list, List<KafkaTopicPartition> list2, long j) {
            super(list, (Pattern) null, (KafkaDeserializationSchema) Mockito.mock(KafkaDeserializationSchema.class), j, false);
            this.fetcher = abstractFetcher;
            this.partitions = list2;
        }

        DummyFlinkKafkaConsumer(List<String> list, List<KafkaTopicPartition> list2, long j) {
            this((AbstractFetcher) Mockito.mock(AbstractFetcher.class), list, list2, j);
        }

        protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<WatermarkStrategy<T>> serializedValue, StreamingRuntimeContext streamingRuntimeContext, OffsetCommitMode offsetCommitMode, MetricGroup metricGroup, boolean z) throws Exception {
            return this.fetcher;
        }

        protected AbstractPartitionDiscoverer createPartitionDiscoverer(KafkaTopicsDescriptor kafkaTopicsDescriptor, int i, int i2) {
            AbstractPartitionDiscoverer abstractPartitionDiscoverer = (AbstractPartitionDiscoverer) Mockito.mock(AbstractPartitionDiscoverer.class);
            try {
                PowerMockito.when(abstractPartitionDiscoverer.discoverPartitions()).thenReturn(this.partitions);
            } catch (Exception e) {
            }
            PowerMockito.when(Boolean.valueOf(abstractPartitionDiscoverer.setAndCheckDiscoveredPartition((KafkaTopicPartition) Matchers.any(KafkaTopicPartition.class)))).thenReturn(true);
            return abstractPartitionDiscoverer;
        }

        protected boolean getIsAutoCommitEnabled() {
            return false;
        }

        protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(Collection<KafkaTopicPartition> collection, long j) {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest$DummySourceContext.class */
    private static abstract class DummySourceContext implements SourceFunction.SourceContext<String> {
        private final Object lock;

        private DummySourceContext() {
            this.lock = new Object();
        }

        public void collectWithTimestamp(String str, long j) {
        }

        public void emitWatermark(Watermark watermark) {
        }

        public Object getCheckpointLock() {
            return this.lock;
        }

        public void close() {
        }

        public void markAsTemporarilyIdle() {
        }
    }

    @Parameterized.Parameters(name = "Migration Savepoint: {0}")
    public static Collection<MigrationVersion> parameters() {
        return Arrays.asList(MigrationVersion.v1_4, MigrationVersion.v1_5, MigrationVersion.v1_6, MigrationVersion.v1_7, MigrationVersion.v1_8, MigrationVersion.v1_9);
    }

    public FlinkKafkaConsumerBaseMigrationTest(MigrationVersion migrationVersion) {
        this.testMigrateVersion = migrationVersion;
    }

    @Test
    @Ignore
    public void writeSnapshot() throws Exception {
        writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink" + this.flinkGenerateSavepointVersion + "-snapshot", PARTITION_STATE);
        writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink" + this.flinkGenerateSavepointVersion + "-empty-state-snapshot", new HashMap<>());
    }

    private void writeSnapshot(String str, HashMap<KafkaTopicPartition, Long> hashMap) throws Exception {
        OperatorSubtaskState snapshot;
        final OneShotLatch oneShotLatch = new OneShotLatch();
        AbstractFetcher abstractFetcher = (AbstractFetcher) Mockito.mock(AbstractFetcher.class);
        ((AbstractFetcher) PowerMockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m1answer(InvocationOnMock invocationOnMock) throws Throwable {
                oneShotLatch.trigger();
                return null;
            }
        }).when(abstractFetcher)).runFetchLoop();
        PowerMockito.when(abstractFetcher.snapshotCurrentState()).thenReturn(hashMap);
        final DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer(abstractFetcher, TOPICS, new ArrayList(PARTITION_STATE.keySet()), Long.MIN_VALUE);
        StreamSource streamSource = new StreamSource(dummyFlinkKafkaConsumer);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(streamSource, 1, 1, 0);
        abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        abstractStreamOperatorTestHarness.setup();
        abstractStreamOperatorTestHarness.open();
        final Throwable[] thArr = new Throwable[1];
        Thread thread = new Thread() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    dummyFlinkKafkaConsumer.run(new DummySourceContext() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.2.1
                        public void collect(String str2) {
                        }
                    });
                } catch (Throwable th) {
                    th.printStackTrace();
                    thArr[0] = th;
                }
            }
        };
        thread.start();
        if (!oneShotLatch.isTriggered()) {
            oneShotLatch.await();
        }
        synchronized (abstractStreamOperatorTestHarness.getCheckpointLock()) {
            snapshot = abstractStreamOperatorTestHarness.snapshot(0L, 0L);
        }
        OperatorSnapshotUtil.writeStateHandle(snapshot, str);
        streamSource.close();
        thread.join();
    }

    @Test
    public void testRestoreFromEmptyStateNoPartitions() throws Exception {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer(Collections.singletonList("dummy-topic"), Collections.emptyList(), Long.MIN_VALUE);
        StreamSource streamSource = new StreamSource(dummyFlinkKafkaConsumer);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(streamSource, 1, 1, 0);
        abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        abstractStreamOperatorTestHarness.setup();
        abstractStreamOperatorTestHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink" + this.testMigrateVersion + "-empty-state-snapshot"));
        abstractStreamOperatorTestHarness.open();
        Assert.assertTrue(dummyFlinkKafkaConsumer.getSubscribedPartitionsToStartOffsets() != null);
        Assert.assertTrue(dummyFlinkKafkaConsumer.getSubscribedPartitionsToStartOffsets().isEmpty());
        Assert.assertTrue(dummyFlinkKafkaConsumer.getRestoredState().isEmpty());
        streamSource.close();
        streamSource.cancel();
    }

    @Test
    public void testRestoreFromEmptyStateWithPartitions() throws Exception {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer(TOPICS, new ArrayList(PARTITION_STATE.keySet()), Long.MIN_VALUE);
        StreamSource streamSource = new StreamSource(dummyFlinkKafkaConsumer);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(streamSource, 1, 1, 0);
        abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        abstractStreamOperatorTestHarness.setup();
        abstractStreamOperatorTestHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink" + this.testMigrateVersion + "-empty-state-snapshot"));
        abstractStreamOperatorTestHarness.open();
        HashMap hashMap = new HashMap();
        Iterator<KafkaTopicPartition> it = PARTITION_STATE.keySet().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), -915623761775L);
        }
        Assert.assertTrue(dummyFlinkKafkaConsumer.getSubscribedPartitionsToStartOffsets() != null);
        Assert.assertTrue(!dummyFlinkKafkaConsumer.getSubscribedPartitionsToStartOffsets().isEmpty());
        Assert.assertEquals(hashMap, dummyFlinkKafkaConsumer.getSubscribedPartitionsToStartOffsets());
        Assert.assertTrue(dummyFlinkKafkaConsumer.getRestoredState() != null);
        Assert.assertTrue(!dummyFlinkKafkaConsumer.getSubscribedPartitionsToStartOffsets().isEmpty());
        for (Map.Entry entry : hashMap.entrySet()) {
            Assert.assertEquals(entry.getValue(), dummyFlinkKafkaConsumer.getRestoredState().get(entry.getKey()));
        }
        streamSource.close();
        streamSource.cancel();
    }

    @Test
    public void testRestore() throws Exception {
        DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer(TOPICS, new ArrayList(PARTITION_STATE.keySet()), Long.MIN_VALUE);
        StreamSource streamSource = new StreamSource(dummyFlinkKafkaConsumer);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(streamSource, 1, 1, 0);
        abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        abstractStreamOperatorTestHarness.setup();
        abstractStreamOperatorTestHarness.initializeState(OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink" + this.testMigrateVersion + "-snapshot"));
        abstractStreamOperatorTestHarness.open();
        Assert.assertTrue(dummyFlinkKafkaConsumer.getSubscribedPartitionsToStartOffsets() != null);
        Assert.assertTrue(!dummyFlinkKafkaConsumer.getSubscribedPartitionsToStartOffsets().isEmpty());
        Assert.assertEquals(PARTITION_STATE, dummyFlinkKafkaConsumer.getSubscribedPartitionsToStartOffsets());
        Assert.assertTrue(dummyFlinkKafkaConsumer.getRestoredState() != null);
        Assert.assertEquals(PARTITION_STATE, dummyFlinkKafkaConsumer.getRestoredState());
        streamSource.close();
        streamSource.cancel();
    }

    static {
        PARTITION_STATE.put(new KafkaTopicPartition("abc", 13), 16768L);
        PARTITION_STATE.put(new KafkaTopicPartition("def", 7), 987654321L);
        TOPICS = (List) new ArrayList(PARTITION_STATE.keySet()).stream().map(kafkaTopicPartition -> {
            return kafkaTopicPartition.getTopic();
        }).distinct().collect(Collectors.toList());
    }
}
