package org.apache.flink.streaming.connectors.kafka;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

@Deprecated
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBuilderTestBase.class */
public abstract class KafkaTableSourceBuilderTestBase {
    private static final String TOPIC = "testTopic";
    static final String[] FIELD_NAMES = {"field1", "field2", "time1", "time2", "field3"};
    static final TypeInformation[] FIELD_TYPES = {Types.LONG(), Types.STRING(), Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.DOUBLE()};
    private static final TableSchema SCHEMA = new TableSchema(FIELD_NAMES, FIELD_TYPES);
    private static final Properties PROPS = createSourceProperties();

    @Test
    public void testKafkaConsumer() {
        KafkaTableSource.Builder builder = getBuilder();
        configureBuilder(builder);
        KafkaTableSource kafkaTableSource = (KafkaTableSource) Mockito.spy(builder.build());
        StreamExecutionEnvironment streamExecutionEnvironment = (StreamExecutionEnvironment) Mockito.mock(StreamExecutionEnvironment.class);
        Mockito.when(streamExecutionEnvironment.addSource((SourceFunction) Matchers.any(SourceFunction.class))).thenReturn(Mockito.mock(DataStreamSource.class));
        kafkaTableSource.getDataStream(streamExecutionEnvironment);
        ((StreamExecutionEnvironment) Mockito.verify(streamExecutionEnvironment)).addSource((SourceFunction) Matchers.any(getFlinkKafkaConsumer()));
        ((KafkaTableSource) Mockito.verify(kafkaTableSource)).getKafkaConsumer((String) Matchers.eq(TOPIC), (Properties) Matchers.eq(PROPS), (DeserializationSchema) Matchers.any(getDeserializationSchema()));
    }

    @Test
    public void testTableSchema() {
        KafkaTableSource.Builder builder = getBuilder();
        configureBuilder(builder);
        TableSchema tableSchema = builder.build().getTableSchema();
        Assert.assertNotNull(tableSchema);
        Assert.assertEquals(5L, tableSchema.getColumnNames().length);
        Assert.assertEquals("field1", tableSchema.getColumnNames()[0]);
        Assert.assertEquals("field2", tableSchema.getColumnNames()[1]);
        Assert.assertEquals("time1", tableSchema.getColumnNames()[2]);
        Assert.assertEquals("time2", tableSchema.getColumnNames()[3]);
        Assert.assertEquals("field3", tableSchema.getColumnNames()[4]);
        Assert.assertEquals(Types.LONG(), tableSchema.getTypes()[0]);
        Assert.assertEquals(Types.STRING(), tableSchema.getTypes()[1]);
        Assert.assertEquals(Types.SQL_TIMESTAMP(), tableSchema.getTypes()[2]);
        Assert.assertEquals(Types.SQL_TIMESTAMP(), tableSchema.getTypes()[3]);
        Assert.assertEquals(Types.DOUBLE(), tableSchema.getTypes()[4]);
    }

    @Test
    public void testNoTimeAttributes() {
        KafkaTableSource.Builder builder = getBuilder();
        configureBuilder(builder);
        KafkaTableSource build = builder.build();
        Assert.assertNull(build.getProctimeAttribute());
        Assert.assertNotNull(build.getRowtimeAttributeDescriptors());
        Assert.assertTrue(build.getRowtimeAttributeDescriptors().isEmpty());
    }

    @Test
    public void testProctimeAttribute() {
        KafkaTableSource.Builder builder = getBuilder();
        configureBuilder(builder);
        builder.withProctimeAttribute("time1");
        KafkaTableSource build = builder.build();
        Assert.assertEquals(build.getProctimeAttribute(), "time1");
        Assert.assertNotNull(build.getRowtimeAttributeDescriptors());
        Assert.assertTrue(build.getRowtimeAttributeDescriptors().isEmpty());
    }

    @Test
    public void testRowtimeAttribute() {
        KafkaTableSource.Builder builder = getBuilder();
        configureBuilder(builder);
        builder.withRowtimeAttribute("time2", new ExistingField("time2"), new AscendingTimestamps());
        KafkaTableSource build = builder.build();
        Assert.assertNull(build.getProctimeAttribute());
        List rowtimeAttributeDescriptors = build.getRowtimeAttributeDescriptors();
        Assert.assertNotNull(rowtimeAttributeDescriptors);
        Assert.assertEquals(1L, rowtimeAttributeDescriptors.size());
        RowtimeAttributeDescriptor rowtimeAttributeDescriptor = (RowtimeAttributeDescriptor) rowtimeAttributeDescriptors.get(0);
        Assert.assertEquals("time2", rowtimeAttributeDescriptor.getAttributeName());
        Assert.assertTrue(rowtimeAttributeDescriptor.getTimestampExtractor() instanceof ExistingField);
        Assert.assertEquals(1L, rowtimeAttributeDescriptor.getTimestampExtractor().getArgumentFields().length);
        Assert.assertEquals("time2", rowtimeAttributeDescriptor.getTimestampExtractor().getArgumentFields()[0]);
        Assert.assertTrue(rowtimeAttributeDescriptor.getWatermarkStrategy() instanceof AscendingTimestamps);
    }

    @Test
    public void testRowtimeAttribute2() {
        KafkaTableSource.Builder builder = getBuilder();
        configureBuilder(builder);
        try {
            builder.withKafkaTimestampAsRowtimeAttribute("time2", new AscendingTimestamps());
            KafkaTableSource build = builder.build();
            Assert.assertNull(build.getProctimeAttribute());
            List rowtimeAttributeDescriptors = build.getRowtimeAttributeDescriptors();
            Assert.assertNotNull(rowtimeAttributeDescriptors);
            Assert.assertEquals(1L, rowtimeAttributeDescriptors.size());
            RowtimeAttributeDescriptor rowtimeAttributeDescriptor = (RowtimeAttributeDescriptor) rowtimeAttributeDescriptors.get(0);
            Assert.assertEquals("time2", rowtimeAttributeDescriptor.getAttributeName());
            Assert.assertTrue(rowtimeAttributeDescriptor.getTimestampExtractor() instanceof StreamRecordTimestamp);
            Assert.assertTrue(rowtimeAttributeDescriptor.getTimestampExtractor().getArgumentFields().length == 0);
            Assert.assertTrue(rowtimeAttributeDescriptor.getWatermarkStrategy() instanceof AscendingTimestamps);
        } catch (Exception e) {
            if (builder.supportsKafkaTimestamps()) {
                Assert.fail();
            }
        }
    }

    @Test
    public void testConsumerOffsets() {
        KafkaTableSource.Builder builder = getBuilder();
        configureBuilder(builder);
        KafkaTableSource kafkaTableSource = (KafkaTableSource) Mockito.spy(builder.build());
        Mockito.when(kafkaTableSource.createKafkaConsumer(TOPIC, PROPS, (DeserializationSchema) null)).thenReturn(Mockito.mock(getFlinkKafkaConsumer()));
        ((FlinkKafkaConsumerBase) Mockito.verify(kafkaTableSource.getKafkaConsumer(TOPIC, PROPS, (DeserializationSchema) null))).setStartFromGroupOffsets();
        builder.fromEarliest();
        KafkaTableSource kafkaTableSource2 = (KafkaTableSource) Mockito.spy(builder.build());
        Mockito.when(kafkaTableSource2.createKafkaConsumer(TOPIC, PROPS, (DeserializationSchema) null)).thenReturn(Mockito.mock(getFlinkKafkaConsumer()));
        ((FlinkKafkaConsumerBase) Mockito.verify(kafkaTableSource2.getKafkaConsumer(TOPIC, PROPS, (DeserializationSchema) null))).setStartFromEarliest();
        builder.fromLatest();
        KafkaTableSource kafkaTableSource3 = (KafkaTableSource) Mockito.spy(builder.build());
        Mockito.when(kafkaTableSource3.createKafkaConsumer(TOPIC, PROPS, (DeserializationSchema) null)).thenReturn(Mockito.mock(getFlinkKafkaConsumer()));
        ((FlinkKafkaConsumerBase) Mockito.verify(kafkaTableSource3.getKafkaConsumer(TOPIC, PROPS, (DeserializationSchema) null))).setStartFromLatest();
        builder.fromGroupOffsets();
        KafkaTableSource kafkaTableSource4 = (KafkaTableSource) Mockito.spy(builder.build());
        Mockito.when(kafkaTableSource4.createKafkaConsumer(TOPIC, PROPS, (DeserializationSchema) null)).thenReturn(Mockito.mock(getFlinkKafkaConsumer()));
        ((FlinkKafkaConsumerBase) Mockito.verify(kafkaTableSource4.getKafkaConsumer(TOPIC, PROPS, (DeserializationSchema) null))).setStartFromGroupOffsets();
        builder.fromSpecificOffsets((Map) Mockito.mock(Map.class));
        KafkaTableSource kafkaTableSource5 = (KafkaTableSource) Mockito.spy(builder.build());
        Mockito.when(kafkaTableSource5.createKafkaConsumer(TOPIC, PROPS, (DeserializationSchema) null)).thenReturn(Mockito.mock(getFlinkKafkaConsumer()));
        ((FlinkKafkaConsumerBase) Mockito.verify(kafkaTableSource5.getKafkaConsumer(TOPIC, PROPS, (DeserializationSchema) null))).setStartFromSpecificOffsets((Map) Matchers.any(Map.class));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract KafkaTableSource.Builder getBuilder();

    protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema();

    protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer();

    /* JADX INFO: Access modifiers changed from: protected */
    public void configureBuilder(KafkaTableSource.Builder builder) {
        builder.forTopic(TOPIC).withKafkaProperties(PROPS).withSchema(SCHEMA);
    }

    private static Properties createSourceProperties() {
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", "dummy");
        properties.setProperty("group.id", "dummy");
        return properties;
    }
}
