package net.cnri.microservices;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/cnri/microservices/MultithreadedKafkaConsumer.class */
public class MultithreadedKafkaConsumer {
    private static final Logger logger = LoggerFactory.getLogger(MultithreadedKafkaConsumer.class);
    private static final int MAX_LAG = 100000;
    private final ExecutorService exec;
    private final KafkaConsumer<String, String> consumer;
    private final ConcurrentMap<TopicPartition, HandledOffsetsTracker> offsets;
    private final ConcurrentMap<TopicPartition, OffsetAndMetadata> nextToCommit;
    private final Alerter alerter;
    private final ConcurrentCountingHashMap<TopicPartition> messagesBeingProcessed;
    private final ConcurrentMap<TopicPartition, CountDownLatch> topicsBeingRevoked;
    private final RebalanceListener rebalanceListener;
    private final ExecutorService taskRunner;
    private final StripedExecutorService stripedTaskRunner;
    private final CountDownLatch startLatch;
    private volatile boolean running;
    private boolean shutdown;
    private BiConsumer<String, Long> callback;
    private Function<String, Object> stripePicker;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:net/cnri/microservices/MultithreadedKafkaConsumer$KafkaCommittableTask.class */
    public class KafkaCommittableTask {
        private final ConsumerRecord<String, String> record;
        private final TopicPartition topicPartition;

        public KafkaCommittableTask(ConsumerRecord<String, String> consumerRecord) {
            this.record = consumerRecord;
            this.topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
        }

        public String getMessage() {
            return (String) this.record.value();
        }

        public long getTimestamp() {
            return this.record.timestamp();
        }

        public void lock() {
            MultithreadedKafkaConsumer.this.messagesBeingProcessed.incrementAndGet(this.topicPartition);
        }

        public void release() {
            CountDownLatch countDownLatch;
            if (MultithreadedKafkaConsumer.this.messagesBeingProcessed.decrementAndGet(this.topicPartition) != 0 || (countDownLatch = (CountDownLatch) MultithreadedKafkaConsumer.this.topicsBeingRevoked.get(this.topicPartition)) == null) {
                return;
            }
            countDownLatch.countDown();
        }

        public boolean isCancelled() {
            return (MultithreadedKafkaConsumer.this.running && !MultithreadedKafkaConsumer.this.topicsBeingRevoked.containsKey(this.topicPartition) && MultithreadedKafkaConsumer.this.offsets.containsKey(this.topicPartition)) ? false : true;
        }

        public void commit() {
            HandledOffsetsTracker handledOffsetsTracker = (HandledOffsetsTracker) MultithreadedKafkaConsumer.this.offsets.get(this.topicPartition);
            if (handledOffsetsTracker == null) {
                MultithreadedKafkaConsumer.logger.error("Message in revoked partition " + this.topicPartition + "!  Offset: " + this.record.offset() + " Message: " + ((String) this.record.value()));
                MultithreadedKafkaConsumer.this.alerter.alert("Message in revoked partition " + this.topicPartition + "!  Offset: " + this.record.offset() + " Message: " + ((String) this.record.value()));
                return;
            }
            long handle = handledOffsetsTracker.handle(this.record.offset());
            if (handle >= this.record.offset()) {
                MultithreadedKafkaConsumer.this.nextToCommit.put(this.topicPartition, new OffsetAndMetadata(handle + 1));
            } else if (this.record.offset() > handle + 100000) {
                MultithreadedKafkaConsumer.this.nextToCommit.put(this.topicPartition, new OffsetAndMetadata(handledOffsetsTracker.skipTo(handle + 50000) + 1));
            }
        }
    }

    /* loaded from: input_file:net/cnri/microservices/MultithreadedKafkaConsumer$RebalanceListener.class */
    class RebalanceListener implements ConsumerRebalanceListener {
        RebalanceListener() {
        }

        public synchronized void onPartitionsAssigned(Collection<TopicPartition> collection) {
            for (TopicPartition topicPartition : collection) {
                MultithreadedKafkaConsumer.this.offsets.putIfAbsent(topicPartition, new HandledOffsetsTracker(MultithreadedKafkaConsumer.this.consumer.position(topicPartition) - 1, topicPartition.toString(), MultithreadedKafkaConsumer.this.alerter));
            }
        }

        public synchronized void onPartitionsRevoked(Collection<TopicPartition> collection) {
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                MultithreadedKafkaConsumer.this.topicsBeingRevoked.put(it.next(), new CountDownLatch(1));
            }
            for (TopicPartition topicPartition : collection) {
                if (MultithreadedKafkaConsumer.this.messagesBeingProcessed.get(topicPartition) == 0) {
                    MultithreadedKafkaConsumer.this.offsets.remove(topicPartition);
                    MultithreadedKafkaConsumer.this.topicsBeingRevoked.remove(topicPartition);
                } else {
                    try {
                        ((CountDownLatch) MultithreadedKafkaConsumer.this.topicsBeingRevoked.get(topicPartition)).await();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    MultithreadedKafkaConsumer.this.offsets.remove(topicPartition);
                    MultithreadedKafkaConsumer.this.topicsBeingRevoked.remove(topicPartition);
                }
            }
        }
    }

    public MultithreadedKafkaConsumer(Pattern pattern, String str, Map<String, String> map, String str2, Alerter alerter, ExecutorService executorService) {
        this(pattern, null, str, map, str2, alerter, executorService, null);
    }

    public MultithreadedKafkaConsumer(Collection<String> collection, String str, Map<String, String> map, String str2, Alerter alerter, ExecutorService executorService) {
        this(null, collection, str, map, str2, alerter, executorService, null);
    }

    public MultithreadedKafkaConsumer(Pattern pattern, String str, Map<String, String> map, String str2, Alerter alerter, StripedExecutorService stripedExecutorService) {
        this(pattern, null, str, map, str2, alerter, null, stripedExecutorService);
    }

    public MultithreadedKafkaConsumer(Collection<String> collection, String str, Map<String, String> map, String str2, Alerter alerter, StripedExecutorService stripedExecutorService) {
        this(null, collection, str, map, str2, alerter, null, stripedExecutorService);
    }

    private MultithreadedKafkaConsumer(Pattern pattern, Collection<String> collection, String str, Map<String, String> map, String str2, Alerter alerter, ExecutorService executorService, StripedExecutorService stripedExecutorService) {
        this.offsets = new ConcurrentHashMap();
        this.nextToCommit = new ConcurrentHashMap();
        this.messagesBeingProcessed = new ConcurrentCountingHashMap<>();
        this.topicsBeingRevoked = new ConcurrentHashMap();
        this.startLatch = new CountDownLatch(1);
        this.running = false;
        this.shutdown = false;
        Properties properties = new Properties();
        if (map != null) {
            properties.putAll(map);
        }
        properties.putIfAbsent("bootstrap.servers", str2);
        properties.putIfAbsent("group.id", str);
        properties.putIfAbsent("key.deserializer", StringDeserializer.class.getName());
        properties.putIfAbsent("value.deserializer", StringDeserializer.class.getName());
        properties.putIfAbsent("enable.auto.commit", "false");
        properties.putIfAbsent("auto.offset.reset", "earliest");
        properties.putIfAbsent("metadata.max.age.ms", 5000);
        properties.putIfAbsent("client.id", "consumer-" + UUID.randomUUID());
        this.consumer = new KafkaConsumer<>(properties);
        this.rebalanceListener = new RebalanceListener();
        if (pattern != null) {
            this.consumer.subscribe(pattern, this.rebalanceListener);
        } else {
            this.consumer.subscribe(collection, this.rebalanceListener);
        }
        this.exec = Executors.newSingleThreadExecutor();
        if (alerter == null) {
            this.alerter = new NoOpAlerter();
        } else {
            this.alerter = alerter;
        }
        this.taskRunner = executorService;
        this.stripedTaskRunner = stripedExecutorService;
    }

    public synchronized void start(Consumer<String> consumer) {
        start((str, l) -> {
            consumer.accept(str);
        });
    }

    public synchronized void start(BiConsumer<String, Long> biConsumer) {
        if (this.running) {
            throw new IllegalStateException("already started");
        }
        if (this.shutdown) {
            throw new IllegalStateException("already shut down");
        }
        if (this.taskRunner == null) {
            throw new IllegalStateException("striping required");
        }
        this.callback = biConsumer;
        completeStart();
    }

    public synchronized void start(Consumer<String> consumer, Function<String, Object> function) {
        start((str, l) -> {
            consumer.accept(str);
        }, function);
    }

    public synchronized void start(BiConsumer<String, Long> biConsumer, Function<String, Object> function) {
        if (this.running) {
            throw new IllegalStateException("already started");
        }
        if (this.shutdown) {
            throw new IllegalStateException("already shut down");
        }
        if (this.stripedTaskRunner == null) {
            throw new IllegalStateException("striping not supported");
        }
        this.callback = biConsumer;
        this.stripePicker = function;
        completeStart();
    }

    private void completeStart() {
        this.running = true;
        this.exec.execute(this::runAndLogErrors);
    }

    public void ensureStarted() {
        try {
            this.startLatch.await(Long.MAX_VALUE, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void runAndLogErrors() {
        try {
            run();
        } catch (WakeupException e) {
        } catch (Throwable th) {
            logger.error("Fatal error in MultithreadedKafkaConsumer", th);
            this.alerter.alert("Fatal error in MultithreadedKafkaConsumer: " + th);
        }
    }

    private void run() {
        boolean z = true;
        long j = 50;
        while (this.running) {
            ConsumerRecords poll = this.consumer.poll(Duration.of(z ? 0L : 100L, ChronoUnit.MILLIS));
            if (z) {
                this.startLatch.countDown();
                z = false;
            }
            boolean z2 = false;
            for (TopicPartition topicPartition : poll.partitions()) {
                if (!this.running) {
                    break;
                }
                Iterator it = poll.records(topicPartition).iterator();
                while (true) {
                    if (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        if (this.running) {
                            if (z2) {
                                this.consumer.seek(topicPartition, consumerRecord.offset());
                                break;
                            }
                            try {
                                KafkaCommittableTask kafkaCommittableTask = new KafkaCommittableTask(consumerRecord);
                                if (this.taskRunner != null) {
                                    this.taskRunner.submit(() -> {
                                        runTask(kafkaCommittableTask);
                                    });
                                } else if (this.stripedTaskRunner != null) {
                                    this.stripedTaskRunner.submit(pickStripe(kafkaCommittableTask), () -> {
                                        runTask(kafkaCommittableTask);
                                    });
                                }
                                j = 50;
                            } catch (RejectedExecutionException e) {
                                z2 = true;
                                this.consumer.seek(topicPartition, consumerRecord.offset());
                            }
                        }
                    }
                }
            }
            if (z2) {
                j = sleepAndIncreaseBackoff(j);
            }
            sendCommitsToKafka();
        }
        awaitLockedTasks();
        sendCommitsToKafka();
    }

    private void runTask(KafkaCommittableTask kafkaCommittableTask) {
        try {
            if (kafkaCommittableTask.isCancelled()) {
                return;
            }
            kafkaCommittableTask.lock();
            try {
                if (kafkaCommittableTask.isCancelled()) {
                    return;
                }
                this.callback.accept(kafkaCommittableTask.getMessage(), Long.valueOf(kafkaCommittableTask.getTimestamp()));
                kafkaCommittableTask.commit();
                kafkaCommittableTask.release();
            } finally {
                kafkaCommittableTask.release();
            }
        } catch (Throwable th) {
            logger.error("Unexpected error in MultithreadedKafkaConsumer processing", th);
            this.alerter.alert("Unexpected error in MultithreadedKafkaConsumer processing " + th);
        }
    }

    private Object pickStripe(KafkaCommittableTask kafkaCommittableTask) {
        return this.stripePicker == null ? new Object() : this.stripePicker.apply(kafkaCommittableTask.getMessage());
    }

    private long sleepAndIncreaseBackoff(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        long j2 = j * 2;
        if (j2 > 120000) {
            logger.error("Consumer queue at maximum backoff");
            this.alerter.alert("Consumer queue at maximum backoff");
            j2 = 120000;
        }
        return j2;
    }

    private void sendCommitsToKafka() {
        if (this.nextToCommit.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.nextToCommit.entrySet()) {
            this.nextToCommit.remove(entry.getKey(), entry.getValue());
            hashMap.put(entry.getKey(), entry.getValue());
        }
        if (hashMap.isEmpty()) {
            return;
        }
        this.consumer.commitSync(hashMap);
    }

    private void awaitLockedTasks() {
        Set<TopicPartition> keySet = this.messagesBeingProcessed.keySet();
        Iterator<TopicPartition> it = keySet.iterator();
        while (it.hasNext()) {
            this.topicsBeingRevoked.put(it.next(), new CountDownLatch(1));
        }
        for (TopicPartition topicPartition : keySet) {
            if (this.messagesBeingProcessed.get(topicPartition) > 0) {
                try {
                    this.topicsBeingRevoked.get(topicPartition).await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public synchronized void shutdown() {
        if (this.running) {
            this.shutdown = true;
            this.running = false;
            this.consumer.wakeup();
            ExecutorService executorService = this.exec;
            KafkaConsumer<String, String> kafkaConsumer = this.consumer;
            Objects.requireNonNull(kafkaConsumer);
            executorService.execute(kafkaConsumer::close);
            this.exec.shutdown();
            try {
                this.exec.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
