package ch.psi.bsread.sync;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ch/psi/bsread/sync/MessageSynchronizerCompleteAllLocking.class */
public class MessageSynchronizerCompleteAllLocking<Msg> extends AbstractMessageSynchronizer<Msg> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MessageSynchronizerCompleteAllLocking.class);
    private final int maxNumberOfMessagesToKeep;
    private final long messageSendTimeoutMillis;
    private final boolean sendIncompleteMessages;
    private final AtomicBoolean isRunning;
    private final ReentrantLock lock;
    private final Condition condition;
    private final Queue<Map<String, Msg>> queue;
    private final ConcurrentSkipListMap<Long, TimedMessages<Msg>> sortedMap;
    private final Function<Msg, String> channelNameProvider;
    private final ToLongFunction<Msg> pulseIdProvider;
    private final boolean sendFirstComplete;

    public MessageSynchronizerCompleteAllLocking(int i, boolean z, boolean z2, Collection<? extends SyncChannel> collection, Function<Msg, String> function, ToLongFunction<Msg> toLongFunction) {
        this(i, Long.MAX_VALUE, z, z2, collection, function, toLongFunction);
    }

    public MessageSynchronizerCompleteAllLocking(long j, boolean z, boolean z2, Collection<? extends SyncChannel> collection, Function<Msg, String> function, ToLongFunction<Msg> toLongFunction) {
        this(Integer.MAX_VALUE, j, z, z2, collection, function, toLongFunction);
    }

    public MessageSynchronizerCompleteAllLocking(int i, long j, boolean z, boolean z2, Collection<? extends SyncChannel> collection, Function<Msg, String> function, ToLongFunction<Msg> toLongFunction) {
        super(collection);
        this.isRunning = new AtomicBoolean(true);
        this.lock = new ReentrantLock();
        this.condition = this.lock.newCondition();
        this.queue = new ArrayDeque();
        this.sortedMap = new ConcurrentSkipListMap<>();
        this.maxNumberOfMessagesToKeep = i;
        this.messageSendTimeoutMillis = j;
        this.sendIncompleteMessages = z;
        this.channelNameProvider = function;
        this.pulseIdProvider = toLongFunction;
        this.sendFirstComplete = z2;
    }

    @Override // ch.psi.bsread.sync.MessageSynchronizer
    public void addMessage(Msg msg) {
        if (!this.isRunning.get()) {
            LOGGER.warn("'{}' stopped running.", getClass());
            return;
        }
        onFirstMessage();
        long applyAsLong = this.pulseIdProvider.applyAsLong(msg);
        String apply = this.channelNameProvider.apply(msg);
        updateSmallestEverReceivedPulseId(applyAsLong);
        long j = this.lastSentOrDeletedPulseId.get();
        long currentTimeMillis = System.currentTimeMillis();
        if (applyAsLong > j) {
            SyncChannel syncChannel = this.channelConfigs.get(apply);
            if (syncChannel == null) {
                LOGGER.debug("Received message from channel '{}' but that channel is not part of the configuration.", apply);
            } else if (isRequestedPulseId(applyAsLong, syncChannel)) {
                Map<String, Msg> messagesMap = this.sortedMap.computeIfAbsent(Long.valueOf(applyAsLong), l -> {
                    return new TimedMessages(currentTimeMillis, this.channelConfigs.size());
                }).getMessagesMap();
                messagesMap.put(apply, msg);
                if ((j == Long.MIN_VALUE && this.sendFirstComplete && messagesMap.size() >= getNumberOfExpectedChannels(applyAsLong)) || applyAsLong <= this.lastSentOrDeletedPulseId.get()) {
                    updateLastSentOrDeletedPulseId(applyAsLong - 1);
                    Map.Entry<Long, TimedMessages<Msg>> firstEntry = this.sortedMap.firstEntry();
                    while (true) {
                        Map.Entry<Long, TimedMessages<Msg>> entry = firstEntry;
                        if (entry == null || entry.getKey().longValue() >= applyAsLong) {
                            break;
                        }
                        LOGGER.info("Drop message of pulse '{}' from channel '{}' as there is a later complete start.", entry.getKey(), apply);
                        this.sortedMap.remove(entry.getKey());
                        firstEntry = this.sortedMap.firstEntry();
                    }
                }
            } else {
                LOGGER.debug("Drop message of pulse '{}' from channel '{}' that does not match modulo/offset '{}'", Long.valueOf(applyAsLong), apply, syncChannel);
            }
        } else {
            LOGGER.debug("Drop message of pulse '{}' from channel '{}' since it is smaller than the last send/deleted pulse '{}'", Long.valueOf(applyAsLong), apply, Long.valueOf(j));
        }
        checkForCompleteMessages(currentTimeMillis);
    }

    private void checkForCompleteMessages(long j) {
        if (this.maxNumberOfMessagesToKeep < Integer.MAX_VALUE) {
            Map.Entry<Long, TimedMessages<Msg>> firstEntry = this.sortedMap.firstEntry();
            while (true) {
                Map.Entry<Long, TimedMessages<Msg>> entry = firstEntry;
                if (entry == null || this.sortedMap.size() <= this.maxNumberOfMessagesToKeep) {
                    break;
                } else if (!handleIncompleteMessages(entry)) {
                    return;
                } else {
                    firstEntry = this.sortedMap.firstEntry();
                }
            }
        }
        if (this.messageSendTimeoutMillis < Long.MAX_VALUE) {
            Map.Entry<Long, TimedMessages<Msg>> firstEntry2 = this.sortedMap.firstEntry();
            Map.Entry<Long, TimedMessages<Msg>> lastEntry = this.sortedMap.lastEntry();
            while (firstEntry2 != null && lastEntry != null && lastEntry.getValue().getSubmitTime() - firstEntry2.getValue().getSubmitTime() >= this.messageSendTimeoutMillis) {
                if (!handleIncompleteMessages(firstEntry2)) {
                    return;
                } else {
                    firstEntry2 = this.sortedMap.firstEntry();
                }
            }
        }
        Map.Entry<Long, TimedMessages<Msg>> firstEntry3 = this.sortedMap.firstEntry();
        while (true) {
            Map.Entry<Long, TimedMessages<Msg>> entry2 = firstEntry3;
            if (entry2 == null || entry2.getValue().availableChannels() < getNumberOfExpectedChannels(entry2.getKey().longValue())) {
                return;
            }
            if (isPulseIdMissing(entry2.getKey().longValue())) {
                LOGGER.debug("Keep pulse '{}' since there are pulses missing.", entry2.getKey());
                firstEntry3 = null;
            } else {
                ReentrantLock reentrantLock = this.lock;
                reentrantLock.lock();
                try {
                    TimedMessages<Msg> remove = this.sortedMap.remove(entry2.getKey());
                    if (remove != null) {
                        if (!this.queue.offer(remove.getMessagesMap())) {
                            LOGGER.warn("Had to drop messages for pulse '{}' because capacity constrained queue seems to be full.", entry2.getKey());
                        }
                        updateLastSentOrDeletedPulseId(entry2.getKey().longValue());
                        this.condition.signalAll();
                    }
                    firstEntry3 = this.sortedMap.firstEntry();
                } finally {
                    reentrantLock.unlock();
                }
            }
        }
    }

    private boolean handleIncompleteMessages(Map.Entry<Long, TimedMessages<Msg>> entry) {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            TimedMessages<Msg> remove = this.sortedMap.remove(entry.getKey());
            if (remove == null) {
                reentrantLock.unlock();
                return false;
            }
            long numberOfExpectedChannels = getNumberOfExpectedChannels(entry.getKey().longValue());
            if (entry.getValue().availableChannels() >= numberOfExpectedChannels) {
                LOGGER.debug("Send complete pulse '{}' due to eviction.", entry.getKey());
                if (!this.queue.offer(remove.getMessagesMap())) {
                    LOGGER.warn("Had to drop messages for pulse '{}' because capacity constrained queue seems to be full.", entry.getKey());
                }
                updateLastSentOrDeletedPulseId(entry.getKey().longValue());
                this.condition.signalAll();
            } else if (this.sendIncompleteMessages) {
                LOGGER.debug("Send incomplete pulse '{}' due to eviction.", entry.getKey());
                if (!this.queue.offer(remove.getMessagesMap())) {
                    LOGGER.warn("Had to drop messages for pulse '{}' because capacity constrained queue seems to be full.", entry.getKey());
                }
                updateLastSentOrDeletedPulseId(entry.getKey().longValue());
                this.condition.signalAll();
            } else {
                LOGGER.debug("Drop messages for pulse '{}' due to size eviction. Requested number of channels '{}' but got only '{}'.", entry.getKey(), Long.valueOf(numberOfExpectedChannels), Integer.valueOf(entry.getValue().getMessagesMap().size()));
                updateLastSentOrDeletedPulseId(entry.getKey().longValue());
            }
            return true;
        } finally {
            reentrantLock.unlock();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isRunning.compareAndSet(true, false)) {
            ReentrantLock reentrantLock = this.lock;
            reentrantLock.lock();
            try {
                this.condition.signalAll();
            } finally {
                reentrantLock.unlock();
            }
        }
    }

    @Override // ch.psi.bsread.sync.MessageSynchronizer
    public Map<String, Msg> nextMessage() {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            Map<String, Msg> poll = this.queue.poll();
            while (this.isRunning.get() && poll == null) {
                long j = -1;
                if (this.messageSendTimeoutMillis < Long.MAX_VALUE) {
                    Map.Entry<Long, TimedMessages<Msg>> firstEntry = this.sortedMap.firstEntry();
                    j = TimeUnit.MILLISECONDS.toNanos(firstEntry != null ? (firstEntry.getValue().getSubmitTime() + this.messageSendTimeoutMillis) - System.currentTimeMillis() : this.messageSendTimeoutMillis);
                }
                if (j >= 0) {
                    try {
                        this.condition.await(j, TimeUnit.NANOSECONDS);
                        checkForCompleteMessages(System.currentTimeMillis());
                    } catch (InterruptedException e) {
                        LOGGER.error("Interrupted while waiting!", (Throwable) e);
                    }
                } else {
                    this.condition.await();
                }
                poll = this.queue.poll();
            }
            return poll;
        } finally {
            reentrantLock.unlock();
        }
    }

    @Override // ch.psi.bsread.sync.AbstractMessageSynchronizer
    public int getBufferSize() {
        return this.sortedMap.size();
    }
}
