package ch.psi.bsread.sync;

import java.util.Collection;
import java.util.Map;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import jnr.constants.platform.darwin.RLIM;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ch/psi/bsread/sync/MessageSynchronizerCompleteAllLockFree.class */
public class MessageSynchronizerCompleteAllLockFree<Msg> extends AbstractMessageSynchronizerLockFree<Msg> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MessageSynchronizerCompleteAllLockFree.class);
    private static final long INITIAL_LAST_SENT_OR_DELETE_PULSEID = Long.MIN_VALUE;
    private final int maxNumberOfMessagesToKeep;
    private final boolean sendIncompleteMessages;
    private final Function<Msg, String> channelNameProvider;
    private final ToLongFunction<Msg> pulseIdProvider;
    private final boolean sendFirstComplete;

    public MessageSynchronizerCompleteAllLockFree(int i, boolean z, boolean z2, Collection<? extends SyncChannel> collection, Function<Msg, String> function, ToLongFunction<Msg> toLongFunction) {
        this(i, RLIM.MAX_VALUE, z, z2, collection, function, toLongFunction);
    }

    public MessageSynchronizerCompleteAllLockFree(long j, boolean z, boolean z2, Collection<? extends SyncChannel> collection, Function<Msg, String> function, ToLongFunction<Msg> toLongFunction) {
        this(Integer.MAX_VALUE, j, z, z2, collection, function, toLongFunction);
    }

    public MessageSynchronizerCompleteAllLockFree(int i, long j, boolean z, boolean z2, Collection<? extends SyncChannel> collection, Function<Msg, String> function, ToLongFunction<Msg> toLongFunction) {
        super(j, collection);
        this.maxNumberOfMessagesToKeep = i;
        this.sendIncompleteMessages = z;
        this.channelNameProvider = function;
        this.pulseIdProvider = toLongFunction;
        this.sendFirstComplete = z2;
    }

    @Override // ch.psi.bsread.sync.MessageSynchronizer
    public void addMessage(Msg msg) {
        if (!this.isRunning.get()) {
            LOGGER.warn("'{}' stopped running.", getClass());
            return;
        }
        onFirstMessage();
        long applyAsLong = this.pulseIdProvider.applyAsLong(msg);
        String apply = this.channelNameProvider.apply(msg);
        updateSmallestEverReceivedPulseId(applyAsLong);
        long j = this.lastSentOrDeletedPulseId.get();
        long currentTimeMillis = System.currentTimeMillis();
        if (applyAsLong > j) {
            SyncChannel syncChannel = this.channelConfigs.get(apply);
            if (syncChannel == null) {
                LOGGER.debug("Received message from channel '{}' but that channel is not part of the configuration.", apply);
            } else if (isRequestedPulseId(applyAsLong, syncChannel)) {
                Map<String, Msg> messagesMap = this.sortedMap.computeIfAbsent(Long.valueOf(applyAsLong), l -> {
                    return new TimedMessages(currentTimeMillis, this.channelConfigs.size());
                }).getMessagesMap();
                messagesMap.put(apply, msg);
                if ((j == Long.MIN_VALUE && this.sendFirstComplete && messagesMap.size() >= getNumberOfExpectedChannels(applyAsLong)) || applyAsLong <= this.lastSentOrDeletedPulseId.get()) {
                    updateLastSentOrDeletedPulseId(applyAsLong - 1);
                    Map.Entry<Long, TimedMessages<Msg>> firstEntry = this.sortedMap.firstEntry();
                    while (true) {
                        Map.Entry<Long, TimedMessages<Msg>> entry = firstEntry;
                        if (entry == null || entry.getKey().longValue() >= applyAsLong) {
                            break;
                        }
                        LOGGER.info("Drop message of pulse '{}' from channel '{}' as there is a later complete start.", entry.getKey(), apply);
                        this.sortedMap.remove(entry.getKey());
                        firstEntry = this.sortedMap.firstEntry();
                    }
                }
            } else {
                LOGGER.debug("Drop message of pulse '{}' from channel '{}' that does not match modulo/offset '{}'.", Long.valueOf(applyAsLong), apply, syncChannel);
            }
        } else {
            LOGGER.debug("Drop message of pulse '{}' from channel '{}' since it is smaller than the last send/deleted pulse '{}'", Long.valueOf(applyAsLong), apply, Long.valueOf(j));
        }
        checkForCompleteMessages(currentTimeMillis);
    }

    private void checkForCompleteMessages(long j) {
        Map.Entry<Long, TimedMessages<Msg>> firstEntry = this.sortedMap.firstEntry();
        if (this.messageSendTimeoutMillis < RLIM.MAX_VALUE && firstEntry != null && j - firstEntry.getValue().getSubmitTime() >= this.messageSendTimeoutMillis) {
            onComplete(firstEntry.getKey());
            return;
        }
        if (this.maxNumberOfMessagesToKeep < Integer.MAX_VALUE && firstEntry != null && this.sortedMap.size() > this.maxNumberOfMessagesToKeep) {
            onComplete(firstEntry.getKey());
            return;
        }
        if (firstEntry == null || firstEntry.getValue().availableChannels() < getNumberOfExpectedChannels(firstEntry.getKey().longValue())) {
            return;
        }
        Long key = firstEntry.getKey();
        if (isPulseIdMissing(key.longValue())) {
            return;
        }
        onComplete(key);
    }

    private void onComplete(Long l) {
        if (this.completePulseIds.putIfAbsent(l, Boolean.TRUE) == null) {
            unparkAll();
        }
    }

    @Override // ch.psi.bsread.sync.MessageSynchronizer
    public Map<String, Msg> nextMessage() {
        Map<String, Msg> map = null;
        while (this.isRunning.get() && map == null) {
            Map.Entry<Long, TimedMessages<Msg>> firstEntry = this.sortedMap.firstEntry();
            long currentTimeMillis = System.currentTimeMillis();
            boolean z = false;
            if (firstEntry != null) {
                long longValue = firstEntry.getKey().longValue();
                int numberOfExpectedChannels = getNumberOfExpectedChannels(firstEntry.getKey().longValue());
                if ((this.messageSendTimeoutMillis < RLIM.MAX_VALUE && currentTimeMillis - firstEntry.getValue().getSubmitTime() >= this.messageSendTimeoutMillis) || (this.maxNumberOfMessagesToKeep < Integer.MAX_VALUE && this.sortedMap.size() > this.maxNumberOfMessagesToKeep)) {
                    updateLastSentOrDeletedPulseId(longValue);
                    if (this.sortedMap.remove(firstEntry.getKey()) != null) {
                        clearHead(this.completePulseIds, Long.valueOf(longValue), true);
                        clearHead(this.sortedMap, Long.valueOf(longValue), true);
                        if (firstEntry.getValue().availableChannels() >= numberOfExpectedChannels) {
                            LOGGER.debug("Send complete pulse '{}' due to eviction.", firstEntry.getKey());
                            map = firstEntry.getValue().getMessagesMap();
                        } else if (this.sendIncompleteMessages) {
                            LOGGER.debug("Send incomplete pulse '{}' due to eviction.", firstEntry.getKey());
                            map = firstEntry.getValue().getMessagesMap();
                        } else {
                            LOGGER.debug("Drop messages for pulse '{}' due to eviction. Requested number of channels '{}' but got only '{}'.", firstEntry.getKey(), Integer.valueOf(numberOfExpectedChannels), Integer.valueOf(firstEntry.getValue().getMessagesMap().size()));
                            z = true;
                        }
                    }
                } else if (firstEntry.getValue().availableChannels() >= numberOfExpectedChannels && !isPulseIdMissing(longValue)) {
                    if (longValue == this.sortedMap.firstKey().longValue()) {
                        updateLastSentOrDeletedPulseId(longValue);
                        if (this.sortedMap.remove(Long.valueOf(longValue)) != null) {
                            clearHead(this.completePulseIds, Long.valueOf(longValue), true);
                            clearHead(this.sortedMap, Long.valueOf(longValue), true);
                            map = firstEntry.getValue().getMessagesMap();
                        }
                    } else {
                        z = true;
                    }
                }
            }
            parkIfNeeded(map, z);
        }
        return map;
    }
}
