package ch.psi.bsread.sync;

import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ch/psi/bsread/sync/MessageSynchronizerCompleteLatestLockFree.class */
public class MessageSynchronizerCompleteLatestLockFree<Msg> extends AbstractMessageSynchronizerLockFree<Msg> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MessageSynchronizerCompleteLatestLockFree.class);
    private final int maxNumberOfMessagesToKeep;
    private final boolean sendIncompleteMessages;
    private final NavigableMap<Long, Boolean> wakeupPulseIds;
    private final Function<Msg, String> channelNameProvider;
    private final ToLongFunction<Msg> pulseIdProvider;

    public MessageSynchronizerCompleteLatestLockFree(int i, boolean z, boolean z2, Collection<? extends SyncChannel> collection, Function<Msg, String> function, ToLongFunction<Msg> toLongFunction) {
        this(i, Long.MAX_VALUE, z, z2, collection, function, toLongFunction);
    }

    public MessageSynchronizerCompleteLatestLockFree(long j, boolean z, boolean z2, Collection<? extends SyncChannel> collection, Function<Msg, String> function, ToLongFunction<Msg> toLongFunction) {
        this(Integer.MAX_VALUE, j, z, z2, collection, function, toLongFunction);
    }

    public MessageSynchronizerCompleteLatestLockFree(int i, long j, boolean z, boolean z2, Collection<? extends SyncChannel> collection, Function<Msg, String> function, ToLongFunction<Msg> toLongFunction) {
        super(j, collection);
        this.wakeupPulseIds = new ConcurrentSkipListMap();
        this.maxNumberOfMessagesToKeep = i;
        this.sendIncompleteMessages = z;
        this.channelNameProvider = function;
        this.pulseIdProvider = toLongFunction;
    }

    @Override // ch.psi.bsread.sync.MessageSynchronizer
    public void addMessage(Msg msg) {
        if (!this.isRunning.get()) {
            LOGGER.warn("'{}' stopped running.", getClass());
            return;
        }
        onFirstMessage();
        long applyAsLong = this.pulseIdProvider.applyAsLong(msg);
        String apply = this.channelNameProvider.apply(msg);
        updateSmallestEverReceivedPulseId(applyAsLong);
        long j = this.lastSentOrDeletedPulseId.get();
        long currentTimeMillis = System.currentTimeMillis();
        if (applyAsLong > j) {
            SyncChannel syncChannel = this.channelConfigs.get(apply);
            if (syncChannel == null) {
                LOGGER.debug("Received message from channel '{}' but that channel is not part of the configuration.", apply);
            } else if (isRequestedPulseId(applyAsLong, syncChannel)) {
                this.sortedMap.computeIfAbsent(Long.valueOf(applyAsLong), l -> {
                    return new TimedMessages(currentTimeMillis, this.channelConfigs.size());
                }).getMessagesMap().put(apply, msg);
            } else {
                LOGGER.debug("Drop message of pulse '{}' from channel '{}' that does not match modulo/offset '{}'", Long.valueOf(applyAsLong), apply, syncChannel);
            }
        } else {
            LOGGER.debug("Drop message of pulse '{}' from channel '{}' since it is smaller than the last send/deleted pulse '{}'", Long.valueOf(applyAsLong), apply, Long.valueOf(j));
        }
        checkForCompleteMessages(applyAsLong, currentTimeMillis);
    }

    private void checkForCompleteMessages(long j, long j2) {
        Map.Entry<Long, TimedMessages<Msg>> firstEntry = this.sortedMap.firstEntry();
        TimedMessages<Msg> timedMessages = this.sortedMap.get(Long.valueOf(j));
        if (firstEntry != null && j == firstEntry.getKey().longValue()) {
            firstEntry = null;
        }
        if (timedMessages != null && timedMessages.availableChannels() >= getNumberOfExpectedChannels(j)) {
            onComplete(Long.valueOf(j));
        }
        if (this.messageSendTimeoutMillis < Long.MAX_VALUE && firstEntry != null && j2 - firstEntry.getValue().getSubmitTime() >= this.messageSendTimeoutMillis) {
            onWakup(firstEntry.getKey());
            firstEntry = null;
        }
        if (this.maxNumberOfMessagesToKeep < Integer.MAX_VALUE && firstEntry != null && this.sortedMap.size() > this.maxNumberOfMessagesToKeep) {
            onWakup(firstEntry.getKey());
            firstEntry = null;
        }
        if (firstEntry == null || firstEntry.getValue().availableChannels() < getNumberOfExpectedChannels(firstEntry.getKey().longValue())) {
            return;
        }
        onWakup(firstEntry.getKey());
    }

    protected void onComplete(Long l) {
        if (this.completePulseIds.putIfAbsent(l, Boolean.TRUE) == null) {
            onWakup(l);
        }
    }

    private void onWakup(Long l) {
        if (this.wakeupPulseIds.putIfAbsent(l, Boolean.TRUE) == null) {
            unparkAll();
        }
    }

    @Override // ch.psi.bsread.sync.MessageSynchronizer
    public Map<String, Msg> nextMessage() {
        Map.Entry<Long, Boolean> firstEntry;
        Map<String, Msg> map = null;
        while (this.isRunning.get() && map == null) {
            if (!this.sendIncompleteMessages && (firstEntry = this.completePulseIds.firstEntry()) != null) {
                clearHead(this.sortedMap, firstEntry.getKey(), false);
            }
            long currentTimeMillis = System.currentTimeMillis();
            boolean z = false;
            Map.Entry<Long, TimedMessages<Msg>> firstEntry2 = this.sortedMap.firstEntry();
            if (firstEntry2 != null) {
                long longValue = firstEntry2.getKey().longValue();
                int numberOfExpectedChannels = getNumberOfExpectedChannels(firstEntry2.getKey().longValue());
                if ((this.messageSendTimeoutMillis >= Long.MAX_VALUE || currentTimeMillis - firstEntry2.getValue().getSubmitTime() < this.messageSendTimeoutMillis) && (this.maxNumberOfMessagesToKeep >= Integer.MAX_VALUE || this.sortedMap.size() <= this.maxNumberOfMessagesToKeep)) {
                    Map.Entry<Long, Boolean> firstEntry3 = this.completePulseIds.firstEntry();
                    if (firstEntry3 != null) {
                        if (longValue <= firstEntry3.getKey().longValue()) {
                            updateLastSentOrDeletedPulseId(longValue);
                            if (this.sortedMap.remove(Long.valueOf(longValue)) != null) {
                                clearHead(this.wakeupPulseIds, Long.valueOf(longValue), true);
                                clearHead(this.completePulseIds, Long.valueOf(longValue), true);
                                clearHead(this.sortedMap, Long.valueOf(longValue), true);
                                if (firstEntry2.getValue().availableChannels() >= numberOfExpectedChannels) {
                                    map = firstEntry2.getValue().getMessagesMap();
                                } else if (this.sendIncompleteMessages) {
                                    LOGGER.debug("Send incomplete pulse '{}'.", firstEntry2.getKey());
                                    map = firstEntry2.getValue().getMessagesMap();
                                } else {
                                    LOGGER.debug("Drop messages for pulse '{}'. Requested number of channels '{}' but got only '{}'.", firstEntry2.getKey(), Integer.valueOf(numberOfExpectedChannels), Integer.valueOf(firstEntry2.getValue().getMessagesMap().size()));
                                    z = true;
                                }
                            }
                        } else {
                            z = true;
                        }
                    }
                } else {
                    updateLastSentOrDeletedPulseId(longValue);
                    if (this.sortedMap.remove(firstEntry2.getKey()) != null) {
                        clearHead(this.wakeupPulseIds, Long.valueOf(longValue), true);
                        clearHead(this.completePulseIds, Long.valueOf(longValue), true);
                        clearHead(this.sortedMap, Long.valueOf(longValue), true);
                        if (firstEntry2.getValue().availableChannels() >= numberOfExpectedChannels) {
                            LOGGER.debug("Send complete pulse '{}' due to eviction.", firstEntry2.getKey());
                            map = firstEntry2.getValue().getMessagesMap();
                        } else if (this.sendIncompleteMessages) {
                            LOGGER.debug("Send incomplete pulse '{}' due to eviction.", firstEntry2.getKey());
                            map = firstEntry2.getValue().getMessagesMap();
                        } else {
                            LOGGER.debug("Drop messages for pulse '{}' due to eviction. Requested number of channels '{}' but got only '{}'.", firstEntry2.getKey(), Integer.valueOf(numberOfExpectedChannels), Integer.valueOf(firstEntry2.getValue().getMessagesMap().size()));
                            z = true;
                        }
                    }
                }
            }
            parkIfNeeded(map, z);
        }
        return map;
    }
}
