package net.cnri.microservices;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/cnri/microservices/KafkaMessageConsumer.class */
public class KafkaMessageConsumer implements MessageConsumer {
    private static Logger logger = LoggerFactory.getLogger(KafkaMessageConsumer.class);
    private final ExecutorService exec;
    private final KafkaConsumer<String, String> consumer;
    private volatile boolean subscribed;
    private Consumer<String> callback;
    private String currentTopic;
    private String groupId;

    public KafkaMessageConsumer(String str, String str2) {
        this(str, str2, null);
    }

    public KafkaMessageConsumer(String str, String str2, Map<String, String> map) {
        this.subscribed = false;
        this.exec = Executors.newSingleThreadExecutor();
        this.consumer = constructConsumer(str, str2, map);
        this.groupId = str2;
    }

    private static KafkaConsumer<String, String> constructConsumer(String str, String str2, Map<String, String> map) {
        Properties properties = new Properties();
        if (map != null) {
            properties.putAll(map);
        }
        properties.putIfAbsent("bootstrap.servers", str);
        if (str2 != null) {
            properties.putIfAbsent("group.id", str2);
        }
        properties.putIfAbsent("key.deserializer", StringDeserializer.class.getName());
        properties.putIfAbsent("value.deserializer", StringDeserializer.class.getName());
        properties.putIfAbsent("enable.auto.commit", "false");
        properties.putIfAbsent("auto.offset.reset", "earliest");
        properties.putIfAbsent("metadata.max.age.ms", 5000);
        properties.putIfAbsent("client.id", "consumer-" + UUID.randomUUID());
        return new KafkaConsumer<>(properties);
    }

    @Override // net.cnri.microservices.MessageConsumer
    public void start(String str, Consumer<String> consumer) {
        if (this.subscribed) {
            throw new IllegalStateException();
        }
        logger.info("Subscribing to " + str);
        this.callback = consumer;
        submitAndWait(() -> {
            this.consumer.subscribe(Arrays.asList(str));
        });
        this.subscribed = true;
        this.currentTopic = str;
        this.exec.execute(this::runAndLogErrors);
    }

    public void stop() {
        this.subscribed = false;
        KafkaConsumer<String, String> kafkaConsumer = this.consumer;
        Objects.requireNonNull(kafkaConsumer);
        submitAndWait(kafkaConsumer::unsubscribe);
    }

    private void submitAndWait(Runnable runnable) {
        try {
            this.exec.submit(runnable).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            if (!(e2.getCause() instanceof RuntimeException)) {
                throw new RuntimeException(e2.getCause());
            }
            throw ((RuntimeException) e2.getCause());
        }
    }

    @Override // net.cnri.microservices.MessageConsumer
    public String getTopic() {
        return this.currentTopic;
    }

    private void runAndLogErrors() {
        try {
            run();
        } catch (WakeupException e) {
        } catch (Throwable th) {
            logger.error("Fatal error in KafkaMessageConsumer", th);
        }
    }

    private void run() {
        logger.info("Consumer " + this.groupId + " starts, topic " + this.currentTopic);
        while (this.subscribed) {
            ConsumerRecords poll = this.consumer.poll(Duration.of(100L, ChronoUnit.MILLIS));
            Iterator it = poll.partitions().iterator();
            while (it.hasNext()) {
                for (ConsumerRecord consumerRecord : poll.records((TopicPartition) it.next())) {
                    if (!this.subscribed) {
                        return;
                    }
                    this.callback.accept((String) consumerRecord.value());
                }
                this.consumer.commitSync();
            }
        }
    }

    public void readAndDiscardAllMessages() {
        this.consumer.subscribe(Pattern.compile(".*"));
        while (!this.consumer.poll(Duration.of(0L, ChronoUnit.MILLIS)).isEmpty()) {
            try {
                this.consumer.commitSync();
            } finally {
                this.consumer.unsubscribe();
            }
        }
    }

    @Override // net.cnri.microservices.MessageConsumer
    public synchronized void shutdown() {
        this.subscribed = false;
        this.consumer.wakeup();
        ExecutorService executorService = this.exec;
        KafkaConsumer<String, String> kafkaConsumer = this.consumer;
        Objects.requireNonNull(kafkaConsumer);
        executorService.execute(kafkaConsumer::close);
        this.exec.shutdown();
        try {
            this.exec.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
