package ch.psi.bsread.impl;

import ch.psi.bsread.ConfigIReceiver;
import ch.psi.bsread.MessageExtractor;
import ch.psi.bsread.converter.ValueConverter;
import ch.psi.bsread.message.ChannelConfig;
import ch.psi.bsread.message.DataHeader;
import ch.psi.bsread.message.MainHeader;
import ch.psi.bsread.message.Message;
import ch.psi.bsread.message.Timestamp;
import ch.psi.bsread.message.Value;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import zmq.Msg;

/* loaded from: input_file:ch/psi/bsread/impl/AbstractMessageExtractor.class */
public abstract class AbstractMessageExtractor<V> implements MessageExtractor<V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMessageExtractor.class.getName());
    private DataHeader dataHeader;
    private ValueConverter valueConverter;

    public AbstractMessageExtractor(ValueConverter valueConverter) {
        this.valueConverter = valueConverter;
    }

    @Override // ch.psi.bsread.MessageExtractor
    public Message<V> extractMessage(ConfigIReceiver<V> configIReceiver, ZMQ.Socket socket, MainHeader mainHeader, Set<String> set) {
        Timestamp globalTimestamp;
        Message<V> message = new Message<>();
        message.setMainHeader(mainHeader);
        message.setDataHeader(this.dataHeader);
        Map<String, Value<V>> values = message.getValues();
        Iterator<ChannelConfig> it = this.dataHeader.getChannels().iterator();
        while (it.hasNext() && socket.hasReceiveMore()) {
            ChannelConfig next = it.next();
            if (set == null || set.contains(next.getName())) {
                ByteOrder byteOrder = next.getByteOrder();
                if (!socket.hasReceiveMore()) {
                    String format = String.format("There is no data for channel '%s'.", next.getName());
                    LOGGER.error(format);
                    throw new RuntimeException(format);
                }
                ByteBuffer order = receiveMsg(socket).buf().order(byteOrder);
                if (!socket.hasReceiveMore()) {
                    String format2 = String.format("There is no timestamp for channel '%s'.", next.getName());
                    LOGGER.error(format2);
                    throw new RuntimeException(format2);
                }
                ByteBuffer buf = receiveMsg(socket).buf();
                if (order != null && order.remaining() > 0) {
                    if (buf == null || buf.remaining() != 16) {
                        globalTimestamp = mainHeader.getGlobalTimestamp();
                    } else {
                        ByteBuffer order2 = buf.order(byteOrder);
                        globalTimestamp = new Timestamp(order2.getLong(order2.position()), order2.getLong(order2.position() + 8));
                    }
                    values.put(next.getName(), this.valueConverter.getMessageValue(mainHeader, this.dataHeader, next, order, globalTimestamp));
                }
            } else {
                if (!socket.hasReceiveMore()) {
                    String format3 = String.format("There is no data for channel '%s'.", next.getName());
                    LOGGER.error(format3);
                    throw new RuntimeException(format3);
                }
                receiveMsg(socket);
                if (!socket.hasReceiveMore()) {
                    String format4 = String.format("There is no timestamp for channel '%s'.", next.getName());
                    LOGGER.error(format4);
                    throw new RuntimeException(format4);
                }
                receiveMsg(socket);
            }
        }
        if (it.hasNext()) {
            LOGGER.warn("'{}' provided less values '{}' than specified in DataHeader '{}'. Message will be ignored.", configIReceiver.getReceiverConfig().getAddress(), Integer.valueOf(values.size()), Integer.valueOf(this.dataHeader.getChannels().size()));
            message = null;
        } else if (socket.hasReceiveMore()) {
            LOGGER.warn("'{}' provided more values '{}' than specified in DataHeader '{}'. Message will be ignored.", configIReceiver.getReceiverConfig().getAddress(), Double.valueOf(values.size() + (configIReceiver.drain() / 2.0d)), Integer.valueOf(this.dataHeader.getChannels().size()));
            message = null;
        }
        return message;
    }

    @Override // java.util.function.Consumer
    public void accept(DataHeader dataHeader) {
        this.dataHeader = dataHeader;
    }

    public static Msg receiveMsg(ZMQ.Socket socket) {
        Msg recv = socket.base().recv(0);
        if (recv == null) {
            mayRaise(socket);
        }
        return recv;
    }

    private static void mayRaise(ZMQ.Socket socket) {
        int errno = socket.base().errno();
        if (errno != 0 && errno != 35) {
            throw new ZMQException(errno);
        }
    }
}
