package org.apache.flink.streaming.connectors.kafka;

import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.types.Row;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.class */
public abstract class KafkaTableSourceTestBase {
    private static final String TOPIC = "testTopic";
    private static final String[] FIELD_NAMES = {"mylong", "mystring", "myboolean", "mydouble", "missingField"};
    private static final TypeInformation<?>[] FIELD_TYPES = {BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO};
    private static final Properties PROPERTIES = createSourceProperties();

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase$AvroSpecificRecord.class */
    public static class AvroSpecificRecord extends SpecificRecordBase {
        public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(KafkaTableSourceTestBase.FIELD_NAMES, KafkaTableSourceTestBase.FIELD_TYPES);
        public Long mylong;
        public String mystring;
        public Boolean myboolean;
        public Double mydouble;
        public Long missingField;

        public Schema getSchema() {
            return null;
        }

        public Object get(int i) {
            return null;
        }

        public void put(int i, Object obj) {
        }
    }

    @Test
    public void testKafkaTableSource() {
        KafkaTableSource kafkaTableSource = (KafkaTableSource) Mockito.spy(createTableSource());
        StreamExecutionEnvironment streamExecutionEnvironment = (StreamExecutionEnvironment) Mockito.mock(StreamExecutionEnvironment.class);
        kafkaTableSource.getDataStream(streamExecutionEnvironment);
        ((StreamExecutionEnvironment) Mockito.verify(streamExecutionEnvironment)).addSource((SourceFunction) Matchers.any(getFlinkKafkaConsumer()));
        ((KafkaTableSource) Mockito.verify(kafkaTableSource)).getKafkaConsumer((String) Matchers.eq(TOPIC), (Properties) Matchers.eq(PROPERTIES), (DeserializationSchema) Matchers.any(getDeserializationSchema()));
    }

    protected abstract KafkaTableSource createTableSource(String str, Properties properties, TypeInformation<Row> typeInformation);

    protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema();

    protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer();

    private KafkaTableSource createTableSource() {
        return createTableSource(TOPIC, PROPERTIES, Types.ROW(FIELD_NAMES, FIELD_TYPES));
    }

    private static Properties createSourceProperties() {
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", "dummy");
        properties.setProperty("group.id", "dummy");
        return properties;
    }
}
