package ch.psi.bsread.stream;

import ch.psi.bsread.Receiver;
import ch.psi.bsread.ReceiverConfig;
import ch.psi.bsread.common.concurrent.executor.CommonExecutors;
import ch.psi.bsread.configuration.Channel;
import ch.psi.bsread.converter.ValueConverter;
import ch.psi.bsread.impl.DirectByteBufferValueConverter;
import ch.psi.bsread.impl.StandardMessageExtractor;
import ch.psi.bsread.message.DataHeader;
import ch.psi.bsread.message.Message;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQException;
import zmq.msg.MsgAllocator;

/* loaded from: input_file:ch/psi/bsread/stream/MessageStreamer.class */
public class MessageStreamer<Value, Mapped> implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MessageStreamer.class);
    private List<Receiver<Value>> receivers;
    private ExecutorService executor;
    private List<Future<?>> executorFutures;
    private AtomicBoolean isRunning;
    private Stream<StreamSection<Mapped>> stream;
    private AsyncTransferSpliterator<Mapped> spliterator;

    public MessageStreamer(int i, String str, Collection<Channel> collection, int i2, int i3, Function<Message<Value>, Mapped> function) {
        this(i, str, collection, i2, i3, new DirectByteBufferValueConverter(), function);
    }

    public MessageStreamer(int i, String str, Collection<Channel> collection, int i2, int i3, ValueConverter valueConverter, Function<Message<Value>, Mapped> function) {
        this(i, str, collection, i2, i3, valueConverter, (MsgAllocator) null, function);
    }

    public MessageStreamer(int i, String str, Collection<Channel> collection, int i2, int i3, ValueConverter valueConverter, Function<Message<Value>, Mapped> function, Consumer<DataHeader> consumer) {
        this(i, str, collection, i2, i3, Integer.MAX_VALUE, valueConverter, (MsgAllocator) null, function, consumer);
    }

    public MessageStreamer(int i, String str, Collection<Channel> collection, int i2, int i3, ValueConverter valueConverter, Function<Message<Value>, Mapped> function, Consumer<DataHeader> consumer, Integer num) {
        this(i, str, collection, i2, i3, Integer.MAX_VALUE, valueConverter, null, function, consumer, num);
    }

    public MessageStreamer(int i, String str, int i2, Collection<Channel> collection, int i3, int i4, ValueConverter valueConverter, Function<Message<Value>, Mapped> function, Consumer<DataHeader> consumer, Integer num) {
        this(i, str, i2, collection, i3, i4, Integer.MAX_VALUE, valueConverter, null, function, consumer, num);
    }

    public MessageStreamer(int i, String str, Collection<Channel> collection, int i2, int i3, ValueConverter valueConverter, MsgAllocator msgAllocator, Function<Message<Value>, Mapped> function) {
        this(i, str, collection, i2, i3, Integer.MAX_VALUE, valueConverter, msgAllocator, function);
    }

    public MessageStreamer(int i, String str, Collection<Channel> collection, int i2, int i3, int i4, ValueConverter valueConverter, Function<Message<Value>, Mapped> function) {
        this(i, str, collection, i2, i3, i4, valueConverter, (MsgAllocator) null, function, (Consumer<DataHeader>) null);
    }

    public MessageStreamer(int i, String str, Collection<Channel> collection, int i2, int i3, int i4, ValueConverter valueConverter, MsgAllocator msgAllocator, Function<Message<Value>, Mapped> function) {
        this(i, str, collection, i2, i3, i4, valueConverter, msgAllocator, function, (Consumer<DataHeader>) null);
    }

    public MessageStreamer(int i, String str, Collection<Channel> collection, int i2, int i3, int i4, ValueConverter valueConverter, MsgAllocator msgAllocator, Function<Message<Value>, Mapped> function, Consumer<DataHeader> consumer) {
        this(i, str, collection, i2, i3, i4, valueConverter, msgAllocator, function, consumer, 0);
    }

    public MessageStreamer(int i, String str, Collection<Channel> collection, int i2, int i3, int i4, ValueConverter valueConverter, MsgAllocator msgAllocator, Function<Message<Value>, Mapped> function, Consumer<DataHeader> consumer, Integer num) {
        this(i, str, 1, collection, i2, i3, i4, valueConverter, msgAllocator, function, consumer, num);
    }

    public MessageStreamer(int i, String str, int i2, Collection<Channel> collection, int i3, int i4, int i5, ValueConverter valueConverter, MsgAllocator msgAllocator, Function<Message<Value>, Mapped> function, Consumer<DataHeader> consumer, Integer num) {
        this.receivers = new ArrayList();
        this.executorFutures = new ArrayList();
        this.isRunning = new AtomicBoolean(true);
        if (i2 > 1 && i != 7) {
            String format = String.format("Stream splits bigger than 1 ('%d') without using push/pull ('%d') will result in duplicates.", Integer.valueOf(i2), Integer.valueOf(i));
            LOGGER.error(format);
            throw new IllegalStateException(format);
        }
        this.executor = CommonExecutors.newFixedThreadPool(i2, "MessageStreamer for " + str);
        this.spliterator = new AsyncTransferSpliterator<>(i3, i4, i5);
        ReceiverConfig receiverConfig = new ReceiverConfig(str, false, true, new StandardMessageExtractor(valueConverter), msgAllocator);
        receiverConfig.setHighWaterMark(10000);
        receiverConfig.setSocketType(i);
        if (collection != null) {
            receiverConfig.setRequestedChannels(collection);
        }
        if (num != null && num.intValue() > 0) {
            receiverConfig.setReceiveBufferSize(num.intValue());
        }
        SynchronizedDataHeaderConsumer synchronizedDataHeaderConsumer = consumer != null ? new SynchronizedDataHeaderConsumer(consumer) : null;
        for (int i6 = 0; i6 < i2; i6++) {
            Receiver<Value> receiver = new Receiver<>(receiverConfig);
            this.receivers.add(receiver);
            if (synchronizedDataHeaderConsumer != null) {
                receiver.addDataHeaderHandler(synchronizedDataHeaderConsumer);
            }
            receiver.connect();
            this.executorFutures.add(this.executor.submit(() -> {
                while (true) {
                    try {
                        Message receive = receiver.receive();
                        if (receive == null) {
                            break;
                        } else {
                            this.spliterator.onAvailable((AsyncTransferSpliterator<Mapped>) receive, (Function<AsyncTransferSpliterator<Mapped>, Mapped>) function);
                        }
                    } catch (ZMQException e) {
                        LOGGER.debug("Close streamer since ZMQ stream closed.", (Throwable) e);
                    } catch (Exception e2) {
                        LOGGER.error("Close streamer since Receiver encountered a problem.", (Throwable) e2);
                    }
                }
                try {
                    close();
                } catch (Exception e3) {
                    LOGGER.warn("Exception while closing streamer.", (Throwable) e3);
                }
            }));
        }
    }

    public Stream<StreamSection<Mapped>> getStream() {
        if (this.stream == null) {
            this.stream = StreamSupport.stream(this.spliterator, false);
            this.stream.onClose(() -> {
                close();
            });
        }
        return this.stream;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isRunning.compareAndSet(true, false)) {
            if (this.receivers != null) {
                Iterator<Receiver<Value>> it = this.receivers.iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
                this.receivers = null;
            }
            if (this.executorFutures != null) {
                Iterator<Future<?>> it2 = this.executorFutures.iterator();
                while (it2.hasNext()) {
                    it2.next().cancel(true);
                }
                this.executorFutures = null;
            }
            if (this.executor != null) {
                this.executor.shutdown();
                this.executor = null;
            }
            if (this.spliterator != null) {
                this.spliterator.onClose();
                this.spliterator = null;
            }
            if (this.stream != null) {
                this.stream.close();
                this.stream = null;
            }
        }
    }

    public String toString() {
        return this.spliterator.toString();
    }
}
