package org.apache.flink.streaming.connectors.kafka;

import java.util.HashMap;
import java.util.Properties;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.formats.json.JsonSchemaConverter;
import org.apache.flink.streaming.connectors.kafka.KafkaJsonTableSource;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
import org.apache.flink.table.sources.TableSourceFactoryService;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.class */
public abstract class KafkaJsonTableSourceFactoryTestBase {
    private static final String JSON_SCHEMA = "{  'title': 'Fruit',  'type': 'object',  'properties': {    'name': {      'type': 'string'    },    'count': {      'type': 'integer'    },    'time': {      'description': 'row time',      'type': 'string',      'format': 'date-time'    }  },  'required': ['name', 'count', 'time']}";
    private static final String TOPIC = "test-topic";

    protected abstract String version();

    protected abstract KafkaJsonTableSource.Builder builder();

    @Test
    public void testTableSourceFromJsonSchema() {
        testTableSource(new Json().jsonSchema(JSON_SCHEMA).failOnMissingField(true));
    }

    @Test
    public void testTableSourceDerivedSchema() {
        testTableSource(new Json().deriveSchema().failOnMissingField(true));
    }

    private void testTableSource(FormatDescriptor formatDescriptor) {
        HashMap hashMap = new HashMap();
        hashMap.put("name", "name");
        hashMap.put("fruit-name", "name");
        hashMap.put("count", "count");
        hashMap.put("time", "time");
        Properties properties = new Properties();
        properties.put("group.id", "test-group");
        properties.put("bootstrap.servers", "localhost:1234");
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new KafkaTopicPartition(TOPIC, 0), 100L);
        hashMap2.put(new KafkaTopicPartition(TOPIC, 1), 123L);
        KafkaTableSource build = builder().forJsonSchema(TableSchema.fromTypeInfo(JsonSchemaConverter.convert(JSON_SCHEMA))).failOnMissingField(true).withTableToJsonMapping(hashMap).withKafkaProperties(properties).forTopic(TOPIC).fromSpecificOffsets(hashMap2).withSchema(TableSchema.builder().field("fruit-name", Types.STRING).field("count", Types.BIG_INT).field("event-time", Types.SQL_TIMESTAMP).field("proc-time", Types.SQL_TIMESTAMP).build()).withProctimeAttribute("proc-time").withRowtimeAttribute("event-time", new ExistingField("time"), PreserveWatermarks.INSTANCE()).build();
        HashMap hashMap3 = new HashMap();
        hashMap3.put(0, 100L);
        hashMap3.put(1, 123L);
        Assert.assertEquals(build, TableSourceFactoryService.findAndCreateTableSource(new TestTableSourceDescriptor(new Kafka().version(version()).topic(TOPIC).properties(properties).startFromSpecificOffsets(hashMap3)).addFormat(formatDescriptor).addSchema(new Schema().field("fruit-name", Types.STRING).from("name").field("count", Types.BIG_INT).field("event-time", Types.SQL_TIMESTAMP).rowtime(new Rowtime().timestampsFromField("time").watermarksFromSource()).field("proc-time", Types.SQL_TIMESTAMP).proctime())));
    }
}
