package com.adtsw.jchannels.messaging.queue;

import com.adtsw.jcommons.ds.BlockingThreadPoolExecutor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/adtsw/jchannels/messaging/queue/InMemoryMessageQueue.class */
public class InMemoryMessageQueue<I> implements MessageQueue<I> {
    private static final Logger logger = LogManager.getLogger(InMemoryMessageQueue.class);
    private final Map<String, List<MessageListener<I>>> listeners;
    private final BlockingThreadPoolExecutor executorService;
    private final String queueName;

    public InMemoryMessageQueue(String str, int i) {
        this.listeners = new HashMap();
        this.executorService = new BlockingThreadPoolExecutor(str, i, i);
        this.queueName = str;
    }

    public InMemoryMessageQueue(String str) {
        this(str, 10);
    }

    @Override // com.adtsw.jchannels.messaging.queue.MessageQueue
    public void registerTopic(String str) {
        if (this.listeners.containsKey(str)) {
            return;
        }
        this.listeners.put(str, new ArrayList());
    }

    @Override // com.adtsw.jchannels.messaging.queue.MessageQueue
    public void addListener(String str, MessageListener<I> messageListener) {
        registerTopic(str);
        this.listeners.get(str).add(messageListener);
    }

    @Override // com.adtsw.jchannels.messaging.queue.MessageQueue
    public void pushMessage(String str, I i) {
        try {
            this.executorService.executeButBlockIfFull(() -> {
                List<MessageListener<I>> list = this.listeners.get(str);
                if (CollectionUtils.isNotEmpty(list)) {
                    list.forEach(messageListener -> {
                        messageListener.onMessage(i);
                    });
                } else {
                    logger.warn("Sending message to topic without listener " + str);
                }
            });
        } catch (InterruptedException e) {
            logger.warn(this.queueName + " - Push message task interrupted " + e.getMessage());
        }
    }
}
