package ch.psi.bsread;

import ch.psi.bsread.command.Command;
import ch.psi.bsread.common.helper.ByteBufferHelper;
import ch.psi.bsread.compression.Compression;
import ch.psi.bsread.message.DataHeader;
import ch.psi.bsread.message.MainHeader;
import ch.psi.bsread.message.Type;
import ch.psi.bsread.monitors.Monitor;
import ch.psi.bsread.monitors.MonitorConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;

/* loaded from: input_file:ch/psi/bsread/Sender.class */
public class Sender {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) Sender.class);
    private ZMQ.Socket socket;
    private SenderConfig senderConfig;
    private MainHeader mainHeader;
    private byte[] dataHeaderBytes;
    private String dataHeaderMD5;
    private long sentMessages;
    private List<DataChannel<?>> channels;

    public Sender() {
        this(new SenderConfig());
    }

    public Sender(SenderConfig senderConfig) {
        this.dataHeaderMD5 = "";
        this.sentMessages = 0L;
        this.channels = new ArrayList();
        this.senderConfig = senderConfig;
        this.mainHeader = senderConfig.getMainHeaderSupplier() != null ? senderConfig.getMainHeaderSupplier().get() : new MainHeader();
    }

    public void connect() {
        ZMQ.Context context = this.senderConfig.getContext();
        this.socket = context.socket(this.senderConfig.getSocketType());
        this.socket.setSndHWM(this.senderConfig.getHighWaterMark());
        this.socket.setLinger(this.senderConfig.getLinger());
        this.socket.setSendBufferSize(this.senderConfig.getSendBufferSize());
        Monitor monitor = this.senderConfig.getMonitor();
        if (monitor != null) {
            monitor.start(new MonitorConfig(context, this.socket, this.senderConfig.getObjectMapper(), this.senderConfig.getSocketType(), this.senderConfig.isBlockingSend()));
        }
        Utils.connect(this.socket, this.senderConfig.getAddress(), this.senderConfig.getSocketType());
        try {
            TimeUnit.MILLISECONDS.sleep(100L);
        } catch (InterruptedException e) {
            LOGGER.error("Interrupted while sleeping.");
        }
    }

    public void close() {
        Monitor monitor = this.senderConfig.getMonitor();
        if (monitor != null) {
            monitor.stop(this.sentMessages);
        }
        if (this.socket != null) {
            this.socket.close();
        }
        try {
            TimeUnit.MILLISECONDS.sleep(100L);
        } catch (InterruptedException e) {
            LOGGER.error("Interrupted while sleeping.");
        }
    }

    public void send() {
        long nextPulseId = this.senderConfig.getPulseIdProvider().getNextPulseId();
        boolean z = false;
        for (int i = 0; i < this.channels.size() && !z; i++) {
            z = isSendNeeded(nextPulseId, this.channels.get(i));
        }
        if (z) {
            this.mainHeader.setPulseId(nextPulseId);
            this.mainHeader.setGlobalTimestamp(this.senderConfig.getGlobalTimeProvider().getTime(nextPulseId));
            this.mainHeader.setHash(this.dataHeaderMD5);
            this.mainHeader.setDataHeaderCompression(this.senderConfig.getDataHeaderCompression());
            int blockingFlag = this.senderConfig.getBlockingFlag();
            try {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Send message for pulse '{}' and channels '{}'.", Long.valueOf(this.mainHeader.getPulseId()), this.channels.stream().map(dataChannel -> {
                        return dataChannel.getConfig().getName();
                    }).collect(Collectors.joining(", ")));
                }
                this.socket.send(this.senderConfig.getObjectMapper().writeValueAsBytes(this.mainHeader), blockingFlag | 2);
                this.socket.send(this.dataHeaderBytes, blockingFlag | 2);
                for (int i2 = 0; i2 < this.channels.size(); i2++) {
                    DataChannel<?> dataChannel2 = this.channels.get(i2);
                    ByteOrder byteOrder = dataChannel2.getConfig().getByteOrder();
                    int i3 = i2 + 1 < this.channels.size() ? blockingFlag | 2 : blockingFlag;
                    if (isSendNeeded(nextPulseId, dataChannel2)) {
                        ByteBuffer bytes = this.senderConfig.getByteConverter().getBytes(dataChannel2.getValue(nextPulseId), dataChannel2.getConfig().getType(), byteOrder, this.senderConfig.getValueAllocator());
                        this.socket.sendByteBuffer(dataChannel2.getConfig().getCompression().getCompressor().compressData(bytes, bytes.position(), bytes.remaining(), 0, this.senderConfig.getCompressedValueAllocator(), dataChannel2.getConfig().getType().getBytes()), 2 | blockingFlag);
                        this.socket.sendByteBuffer(this.senderConfig.getByteConverter().getBytes(dataChannel2.getTime(nextPulseId).getAsLongArray(), Type.Int64, byteOrder, this.senderConfig.getValueAllocator()), i3);
                    } else {
                        this.socket.send((byte[]) null, 2 | blockingFlag);
                        this.socket.send((byte[]) null, i3);
                    }
                }
                this.sentMessages++;
            } catch (JsonProcessingException e) {
                throw new IllegalStateException("Unable to serialize message", e);
            }
        }
    }

    private boolean isSendNeeded(long j, DataChannel<?> dataChannel) {
        return (j - ((long) dataChannel.getConfig().getOffset())) % ((long) dataChannel.getConfig().getModulo()) == 0;
    }

    private void generateDataHeader() {
        DataHeader dataHeader = new DataHeader();
        Iterator<DataChannel<?>> it = this.channels.iterator();
        while (it.hasNext()) {
            dataHeader.addChannel(it.next().getConfig());
        }
        try {
            this.dataHeaderBytes = this.senderConfig.getObjectMapper().writeValueAsBytes(dataHeader);
            if (!Compression.none.equals(this.senderConfig.getDataHeaderCompression())) {
                this.dataHeaderBytes = ByteBufferHelper.copyToByteArray(this.senderConfig.getDataHeaderCompression().getCompressor().compressDataHeader(ByteBuffer.wrap(this.dataHeaderBytes), this.senderConfig.getCompressedValueAllocator()));
            }
            this.dataHeaderMD5 = Utils.computeMD5(this.dataHeaderBytes);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Unable to generate data header", e);
        }
    }

    public void sendCommand(Command command) {
        try {
            this.socket.send(this.senderConfig.getObjectMapper().writeValueAsBytes(command), 1);
        } catch (JsonProcessingException e) {
            LOGGER.error("Could not send command.", (Throwable) e);
            throw new RuntimeException("Could not send command.", e);
        }
    }

    public void addSource(DataChannel<?> dataChannel) {
        this.channels.add(dataChannel);
        generateDataHeader();
    }

    public void removeSource(DataChannel<?> dataChannel) {
        this.channels.remove(dataChannel);
        generateDataHeader();
    }

    public List<DataChannel<?>> getChannels() {
        return Collections.unmodifiableList(this.channels);
    }

    public SenderConfig getSenderConfig() {
        return this.senderConfig;
    }
}
