package org.apache.flink.streaming.connectors.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.TestTableDescriptor;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.utils.TestDeserializationSchema;
import org.apache.flink.table.factories.utils.TestSerializationSchema;
import org.apache.flink.table.factories.utils.TestTableFormat;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.TableSourceUtil;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.class */
public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
    private static final String TOPIC = "myTopic";
    private static final int PARTITION_0 = 0;
    private static final long OFFSET_0 = 100;
    private static final int PARTITION_1 = 1;
    private static final long OFFSET_1 = 123;
    private static final String FRUIT_NAME = "fruit-name";
    private static final String NAME = "name";
    private static final String COUNT = "count";
    private static final String TIME = "time";
    private static final String EVENT_TIME = "event-time";
    private static final String PROC_TIME = "proc-time";
    private static final Properties KAFKA_PROPERTIES = new Properties();
    private static final Map<Integer, Long> OFFSETS;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase$DataStreamMock.class */
    private static class DataStreamMock extends DataStream<Row> {
        public SinkFunction<?> sinkFunction;

        public DataStreamMock(StreamExecutionEnvironment streamExecutionEnvironment, TypeInformation<Row> typeInformation) {
            super(streamExecutionEnvironment, new StreamTransformationMock(KafkaTableSourceSinkFactoryTestBase.NAME, typeInformation, KafkaTableSourceSinkFactoryTestBase.PARTITION_1));
        }

        public DataStreamSink<Row> addSink(SinkFunction<Row> sinkFunction) {
            this.sinkFunction = sinkFunction;
            return super.addSink(sinkFunction);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase$StreamExecutionEnvironmentMock.class */
    private static class StreamExecutionEnvironmentMock extends StreamExecutionEnvironment {
        public SourceFunction<?> sourceFunction;

        private StreamExecutionEnvironmentMock() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction) {
            this.sourceFunction = sourceFunction;
            return super.addSource(sourceFunction);
        }

        public JobExecutionResult execute(String str) {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase$StreamTransformationMock.class */
    private static class StreamTransformationMock extends StreamTransformation<Row> {
        public StreamTransformationMock(String str, TypeInformation<Row> typeInformation, int i) {
            super(str, typeInformation, i);
        }

        public void setChainingStrategy(ChainingStrategy chainingStrategy) {
        }

        public Collection<StreamTransformation<?>> getTransitivePredecessors() {
            return null;
        }
    }

    @Test
    public void testTableSource() {
        TableSchema build = TableSchema.builder().field(FRUIT_NAME, Types.STRING()).field(COUNT, Types.DECIMAL()).field(EVENT_TIME, Types.SQL_TIMESTAMP()).field(PROC_TIME, Types.SQL_TIMESTAMP()).build();
        List<RowtimeAttributeDescriptor> singletonList = Collections.singletonList(new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps()));
        HashMap hashMap = new HashMap();
        hashMap.put(FRUIT_NAME, NAME);
        hashMap.put(NAME, NAME);
        hashMap.put(COUNT, COUNT);
        hashMap.put(TIME, TIME);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new KafkaTopicPartition(TOPIC, PARTITION_0), Long.valueOf(OFFSET_0));
        hashMap2.put(new KafkaTopicPartition(TOPIC, PARTITION_1), Long.valueOf(OFFSET_1));
        KafkaTableSource expectedKafkaTableSource = getExpectedKafkaTableSource(build, Optional.of(PROC_TIME), singletonList, hashMap, TOPIC, KAFKA_PROPERTIES, new TestDeserializationSchema(TableSchema.builder().field(NAME, Types.STRING()).field(COUNT, Types.DECIMAL()).field(TIME, Types.SQL_TIMESTAMP()).build().toRowType()), StartupMode.SPECIFIC_OFFSETS, hashMap2);
        TableSourceUtil.validateTableSource(expectedKafkaTableSource);
        Map javaMap = DescriptorProperties.toJavaMap(new TestTableDescriptor(new Kafka().version(getKafkaVersion()).topic(TOPIC).properties(KAFKA_PROPERTIES).sinkPartitionerRoundRobin().startFromSpecificOffsets(OFFSETS)).withFormat(new TestTableFormat()).withSchema(new Schema().field(FRUIT_NAME, Types.STRING()).from(NAME).field(COUNT, Types.DECIMAL()).field(EVENT_TIME, Types.SQL_TIMESTAMP()).rowtime(new Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending()).field(PROC_TIME, Types.SQL_TIMESTAMP()).proctime()).inAppendMode());
        KafkaTableSource createStreamTableSource = ((StreamTableSourceFactory) TableFactoryService.find(StreamTableSourceFactory.class, javaMap)).createStreamTableSource(javaMap);
        Assert.assertEquals(expectedKafkaTableSource, createStreamTableSource);
        KafkaTableSource kafkaTableSource = createStreamTableSource;
        StreamExecutionEnvironmentMock streamExecutionEnvironmentMock = new StreamExecutionEnvironmentMock();
        kafkaTableSource.getDataStream(streamExecutionEnvironmentMock);
        Assert.assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(streamExecutionEnvironmentMock.sourceFunction.getClass()));
    }

    @Test
    public void testTableSink() {
        TableSchema build = TableSchema.builder().field(FRUIT_NAME, Types.STRING()).field(COUNT, Types.DECIMAL()).field(EVENT_TIME, Types.SQL_TIMESTAMP()).build();
        KafkaTableSink expectedKafkaTableSink = getExpectedKafkaTableSink(build, TOPIC, KAFKA_PROPERTIES, Optional.of(new FlinkFixedPartitioner()), new TestSerializationSchema(build.toRowType()));
        Map javaMap = DescriptorProperties.toJavaMap(new TestTableDescriptor(new Kafka().version(getKafkaVersion()).topic(TOPIC).properties(KAFKA_PROPERTIES).sinkPartitionerFixed().startFromSpecificOffsets(OFFSETS)).withFormat(new TestTableFormat()).withSchema(new Schema().field(FRUIT_NAME, Types.STRING()).field(COUNT, Types.DECIMAL()).field(EVENT_TIME, Types.SQL_TIMESTAMP())).inAppendMode());
        KafkaTableSink createStreamTableSink = ((StreamTableSinkFactory) TableFactoryService.find(StreamTableSinkFactory.class, javaMap)).createStreamTableSink(javaMap);
        Assert.assertEquals(expectedKafkaTableSink, createStreamTableSink);
        KafkaTableSink kafkaTableSink = createStreamTableSink;
        DataStreamMock dataStreamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), build.toRowType());
        kafkaTableSink.emitDataStream(dataStreamMock);
        Assert.assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(dataStreamMock.sinkFunction.getClass()));
    }

    protected abstract String getKafkaVersion();

    protected abstract Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer();

    protected abstract Class<?> getExpectedFlinkKafkaProducer();

    protected abstract KafkaTableSource getExpectedKafkaTableSource(TableSchema tableSchema, Optional<String> optional, List<RowtimeAttributeDescriptor> list, Map<String, String> map, String str, Properties properties, DeserializationSchema<Row> deserializationSchema, StartupMode startupMode, Map<KafkaTopicPartition, Long> map2);

    protected abstract KafkaTableSink getExpectedKafkaTableSink(TableSchema tableSchema, String str, Properties properties, Optional<FlinkKafkaPartitioner<Row>> optional, SerializationSchema<Row> serializationSchema);

    static {
        KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy");
        KAFKA_PROPERTIES.setProperty("group.id", "dummy");
        KAFKA_PROPERTIES.setProperty("bootstrap.servers", "dummy");
        OFFSETS = new HashMap();
        OFFSETS.put(Integer.valueOf(PARTITION_0), Long.valueOf(OFFSET_0));
        OFFSETS.put(Integer.valueOf(PARTITION_1), Long.valueOf(OFFSET_1));
    }
}
