package ch.psi.bsread.message.commands;

import ch.psi.bsread.ConfigIReceiver;
import ch.psi.bsread.ReceiverConfig;
import ch.psi.bsread.ReceiverState;
import ch.psi.bsread.command.Command;
import ch.psi.bsread.common.helper.ByteBufferHelper;
import ch.psi.bsread.compression.Compression;
import ch.psi.bsread.configuration.Channel;
import ch.psi.bsread.message.DataHeader;
import ch.psi.bsread.message.MainHeader;
import ch.psi.bsread.message.Message;
import ch.psi.bsread.message.Value;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;

/* loaded from: input_file:ch/psi/bsread/message/commands/MainHeaderCommand.class */
public class MainHeaderCommand extends MainHeader implements Command {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MainHeaderCommand.class);
    private static final long serialVersionUID = -2505074745338960088L;

    @Override // ch.psi.bsread.command.Command
    public <V> Message<V> process(ConfigIReceiver<V> configIReceiver) {
        DataHeader dataHeader;
        ReceiverConfig<V> receiverConfig = configIReceiver.getReceiverConfig();
        HashSet hashSet = null;
        Collection<Channel> requestedChannels = configIReceiver.getReceiverConfig().getRequestedChannels();
        if (requestedChannels != null && !requestedChannels.isEmpty()) {
            hashSet = new HashSet();
            for (Channel channel : requestedChannels) {
                if (isRequestedChannel(getPulseId(), channel)) {
                    hashSet.add(channel.getName());
                }
            }
        }
        if (hashSet != null && hashSet.isEmpty()) {
            configIReceiver.drain();
            return null;
        }
        ReceiverState receiverState = configIReceiver.getReceiverState();
        ZMQ.Socket socket = configIReceiver.getSocket();
        boolean z = false;
        LOGGER.debug("Receive pulse-id '{}' from '{}'.", Long.valueOf(getPulseId()), receiverConfig.getAddress());
        try {
            if (!getHtype().startsWith(MainHeader.HTYPE_VALUE_NO_VERSION)) {
                String format = String.format("Expect 'bsr_d-[version]' for 'htype' but was '%s'. Skip messge for '%s'", getHtype(), receiverConfig.getAddress());
                LOGGER.error(format);
                configIReceiver.drain();
                throw new RuntimeException(format);
            }
            if (receiverConfig.isParallelHandlerProcessing()) {
                configIReceiver.getMainHeaderHandlers().parallelStream().forEach(consumer -> {
                    consumer.accept(this);
                });
            } else {
                Iterator<Consumer<MainHeader>> it = configIReceiver.getMainHeaderHandlers().iterator();
                while (it.hasNext()) {
                    it.next().accept(this);
                }
            }
            if (!socket.hasReceiveMore()) {
                String format2 = String.format("There is no data header for '%s'. Skip complete message.", receiverConfig.getAddress());
                LOGGER.error(format2);
                configIReceiver.drain();
                throw new RuntimeException(format2);
            }
            if (getHash().equals(receiverState.getDataHeaderHash())) {
                dataHeader = receiverState.getDataHeader();
                socket.base().recv(0);
            } else {
                z = true;
                byte[] recv = socket.recv();
                Compression dataHeaderCompression = getDataHeaderCompression();
                if (dataHeaderCompression != null) {
                    recv = ByteBufferHelper.copyToByteArray(dataHeaderCompression.getCompressor().decompressDataHeader(ByteBuffer.wrap(recv), receiverState.getDataHeaderAllocator()));
                }
                try {
                    dataHeader = (DataHeader) receiverConfig.getObjectMapper().readValue(recv, DataHeader.class);
                    receiverState.setDataHeader(dataHeader);
                    receiverState.setDataHeaderHash(getHash());
                    if (receiverConfig.isParallelHandlerProcessing()) {
                        configIReceiver.getDataHeaderHandlers().parallelStream().forEach(consumer2 -> {
                            consumer2.accept(dataHeader);
                        });
                    } else {
                        Iterator<Consumer<DataHeader>> it2 = configIReceiver.getDataHeaderHandlers().iterator();
                        while (it2.hasNext()) {
                            it2.next().accept(dataHeader);
                        }
                    }
                } catch (JsonParseException | JsonMappingException e) {
                    String format3 = String.format("Could not parse DataHeader of '%s'.", receiverConfig.getAddress());
                    LOGGER.error(format3, e);
                    LOGGER.info("DataHeader was '{}'", new String(recv, StandardCharsets.UTF_8));
                    throw new RuntimeException(format3, e);
                }
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Receive message from '{}' with pulse '{}' and channels '{}'.", receiverConfig.getAddress(), Long.valueOf(getPulseId()), dataHeader.getChannels().stream().map(channelConfig -> {
                    return channelConfig.getName();
                }).collect(Collectors.joining(", ")));
            }
            Message<V> extractMessage = receiverConfig.getMessageExtractor().extractMessage(configIReceiver, socket, this, hashSet);
            if (extractMessage != null) {
                extractMessage.setDataHeaderChanged(z);
                Map<String, Value<V>> values = extractMessage.getValues();
                if (!values.isEmpty()) {
                    if (receiverConfig.isParallelHandlerProcessing()) {
                        configIReceiver.getValueHandlers().parallelStream().forEach(consumer3 -> {
                            consumer3.accept(values);
                        });
                    } else {
                        Iterator<Consumer<Map<String, Value<V>>>> it3 = configIReceiver.getValueHandlers().iterator();
                        while (it3.hasNext()) {
                            it3.next().accept(values);
                        }
                    }
                }
            }
            return extractMessage;
        } catch (IOException e2) {
            throw new RuntimeException(String.format("Unable to deserialize message for '%s'", receiverConfig.getAddress()), e2);
        }
    }

    private boolean isRequestedChannel(long j, Channel channel) {
        return (j - ((long) channel.getOffset())) % ((long) channel.getModulo()) == 0;
    }
}
