package org.apache.flink.statefun.flink.io.kafka;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.statefun.flink.io.spi.SourceProvider;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressSpec;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;

/* loaded from: input_file:org/apache/flink/statefun/flink/io/kafka/KafkaSourceProvider.class */
public class KafkaSourceProvider implements SourceProvider {
    @Override // org.apache.flink.statefun.flink.io.spi.SourceProvider
    public <T> SourceFunction<T> forSpec(IngressSpec<T> ingressSpec) {
        KafkaIngressSpec<T> asKafkaSpec = asKafkaSpec(ingressSpec);
        FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(asKafkaSpec.topics(), deserializationSchemaFromSpec(asKafkaSpec), asKafkaSpec.properties());
        configureStartupPosition(flinkKafkaConsumer, asKafkaSpec.startupPosition());
        return flinkKafkaConsumer;
    }

    private static <T> KafkaIngressSpec<T> asKafkaSpec(IngressSpec<T> ingressSpec) {
        if (ingressSpec instanceof KafkaIngressSpec) {
            return (KafkaIngressSpec) ingressSpec;
        }
        if (ingressSpec == null) {
            throw new NullPointerException("Unable to translate a NULL spec");
        }
        throw new IllegalArgumentException(String.format("Wrong type %s", ingressSpec.type()));
    }

    private static <T> void configureStartupPosition(FlinkKafkaConsumer<T> flinkKafkaConsumer, KafkaIngressStartupPosition kafkaIngressStartupPosition) {
        if (kafkaIngressStartupPosition.isGroupOffsets()) {
            flinkKafkaConsumer.setStartFromGroupOffsets();
            return;
        }
        if (kafkaIngressStartupPosition.isEarliest()) {
            flinkKafkaConsumer.setStartFromEarliest();
            return;
        }
        if (kafkaIngressStartupPosition.isLatest()) {
            flinkKafkaConsumer.setStartFromLatest();
        } else if (kafkaIngressStartupPosition.isSpecificOffsets()) {
            flinkKafkaConsumer.setStartFromSpecificOffsets(convertKafkaTopicPartitionMap(kafkaIngressStartupPosition.asSpecificOffsets().specificOffsets()));
        } else {
            if (!kafkaIngressStartupPosition.isDate()) {
                throw new IllegalStateException("Safe guard; should not occur");
            }
            flinkKafkaConsumer.setStartFromTimestamp(kafkaIngressStartupPosition.asDate().epochMilli());
        }
    }

    private <T> KafkaDeserializationSchema<T> deserializationSchemaFromSpec(KafkaIngressSpec<T> kafkaIngressSpec) {
        return new KafkaDeserializationSchemaDelegate(kafkaIngressSpec.deserializer());
    }

    private static Map<KafkaTopicPartition, Long> convertKafkaTopicPartitionMap(Map<org.apache.flink.statefun.sdk.kafka.KafkaTopicPartition, Long> map) {
        HashMap hashMap = new HashMap(map.size());
        for (Map.Entry<org.apache.flink.statefun.sdk.kafka.KafkaTopicPartition, Long> entry : map.entrySet()) {
            hashMap.put(convertKafkaTopicPartition(entry.getKey()), entry.getValue());
        }
        return hashMap;
    }

    private static KafkaTopicPartition convertKafkaTopicPartition(org.apache.flink.statefun.sdk.kafka.KafkaTopicPartition kafkaTopicPartition) {
        return new KafkaTopicPartition(kafkaTopicPartition.topic(), kafkaTopicPartition.partition());
    }
}
