package com.adtsw.jchannels.messaging.queue;

import com.adtsw.jcommons.execution.BlockingThreadPoolExecutor;
import com.adtsw.jcommons.execution.ThreadPoolStats;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/adtsw/jchannels/messaging/queue/InMemoryMessageQueue.class */
public class InMemoryMessageQueue<I> implements MessageQueue<I> {
    private static final Logger logger = LogManager.getLogger(InMemoryMessageQueue.class);
    private final Map<String, List<MessageListener<I>>> listeners;
    private final Map<Integer, BlockingThreadPoolExecutor> executors;
    private final String queueName;
    private final int numPartitions;
    private final QueueFullAction queueFullAction;
    private final long timeout;
    private final TimeUnit unit;

    public InMemoryMessageQueue(String str, int i, int i2, QueueFullAction queueFullAction) {
        this(str, i, i2, queueFullAction, 1L, TimeUnit.SECONDS);
    }

    public InMemoryMessageQueue(String str, int i, int i2, QueueFullAction queueFullAction, long j, TimeUnit timeUnit) {
        this.listeners = new HashMap();
        this.queueFullAction = queueFullAction;
        this.executors = new HashMap();
        for (int i3 = 0; i3 < i; i3++) {
            this.executors.put(Integer.valueOf(i3), new BlockingThreadPoolExecutor(str + "-" + i3, i2, i2));
        }
        this.queueName = str;
        this.numPartitions = i;
        this.timeout = j;
        this.unit = timeUnit;
    }

    public InMemoryMessageQueue(String str, QueueFullAction queueFullAction) {
        this(str, 1, 10, queueFullAction);
    }

    public InMemoryMessageQueue(String str) {
        this(str, 1, 10, QueueFullAction.BLOCK);
    }

    @Override // com.adtsw.jchannels.messaging.queue.MessageQueue
    public void registerTopic(String str) {
        if (this.listeners.containsKey(str)) {
            return;
        }
        this.listeners.put(str, new ArrayList());
    }

    @Override // com.adtsw.jchannels.messaging.queue.MessageQueue
    public void addListener(String str, MessageListener<I> messageListener) {
        registerTopic(str);
        this.listeners.get(str).add(messageListener);
    }

    @Override // com.adtsw.jchannels.messaging.queue.MessageQueue
    public void removeListener(String str, MessageListener<I> messageListener) {
        this.listeners.get(str).remove(messageListener);
    }

    @Override // com.adtsw.jchannels.messaging.queue.MessageQueue
    public boolean pushMessage(String str, I i) {
        return pushMessage(str, i, 0);
    }

    @Override // com.adtsw.jchannels.messaging.queue.MessageQueue
    public boolean pushMessage(String str, I i, int i2) {
        boolean z = false;
        try {
            int max = Math.max(0, Math.min(i2, this.numPartitions));
            switch (this.queueFullAction) {
                case BLOCK:
                    this.executors.get(Integer.valueOf(max)).executeButBlockIfFull(() -> {
                        pushMessageToListeners(str, i);
                    });
                    z = true;
                    break;
                case REJECT:
                    z = this.executors.get(Integer.valueOf(max)).executeButRejectIfFull(() -> {
                        pushMessageToListeners(str, i);
                    });
                    break;
                case BLOCK_WITH_TIMEOUT:
                    z = this.executors.get(Integer.valueOf(max)).executeButBlockWithTimeoutIfFull(() -> {
                        pushMessageToListeners(str, i);
                    }, this.timeout, this.unit);
                    break;
            }
            return z;
        } catch (InterruptedException e) {
            logger.warn(this.queueName + " - Push message task interrupted " + e.getMessage());
            throw new RuntimeException(e);
        }
    }

    private void pushMessageToListeners(String str, I i) {
        List<MessageListener<I>> list = this.listeners.get(str);
        if (CollectionUtils.isNotEmpty(list)) {
            list.forEach(messageListener -> {
                messageListener.onMessage(i);
            });
        } else {
            logger.warn("Sending message to topic without listener " + str);
        }
    }

    @Override // com.adtsw.jchannels.messaging.queue.MessageQueue
    public MessageQueueStats getStats() {
        MessageQueueStats messageQueueStats = new MessageQueueStats();
        for (int i = 0; i < this.numPartitions; i++) {
            ThreadPoolStats stats = this.executors.get(Integer.valueOf(i)).getStats();
            messageQueueStats.add(this.queueName + "_" + i + "_semaphoreQueueLength", Integer.valueOf(stats.getSemaphoreQueueLength()));
            messageQueueStats.add(this.queueName + "_" + i + "_semaphorePermitsAvailable", Integer.valueOf(stats.getSemaphorePermitsAvailable()));
            messageQueueStats.add(this.queueName + "_" + i + "_workerPoolSize", Integer.valueOf(stats.getWorkerPoolSize()));
            messageQueueStats.add(this.queueName + "_" + i + "_taskQueueLength", Integer.valueOf(stats.getTaskQueueLength()));
            messageQueueStats.add(this.queueName + "_" + i + "_taskQueueRemainingCapacity", Integer.valueOf(stats.getTaskQueueRemainingCapacity()));
        }
        return messageQueueStats;
    }
}
