package ch.psi.bsread;

import ch.psi.bsread.message.DataHeader;
import ch.psi.bsread.message.MainHeader;
import ch.psi.bsread.message.Value;
import ch.psi.bsread.monitors.ConnectionCounterMonitor;
import ch.psi.bsread.monitors.MonitorConfig;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;

/* loaded from: input_file:ch/psi/bsread/Receiver.class */
public class Receiver<V> implements ConfigIReceiver<V>, IntConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) Receiver.class);
    private final AtomicBoolean isRunning;
    private final AtomicBoolean isCleaned;
    private ZMQ.Socket socket;
    private final List<Consumer<MainHeader>> mainHeaderHandlers;
    private final List<Consumer<DataHeader>> dataHeaderHandlers;
    private final List<Consumer<Map<String, Value<V>>>> valueHandlers;
    private final Set<IntConsumer> connectionCountHandlers;
    private final AtomicInteger connectionCountUpdate;
    private final AtomicInteger connectionCountPrev;
    private final AtomicBoolean reconnecting;
    private final Set<Consumer<Boolean>> connectionIdleHandlers;
    private final AtomicReference<Boolean> connectionIdleUpdate;
    private final AtomicReference<Boolean> connectionIdlePrev;
    private final Set<Consumer<Boolean>> connectionInactiveHandlers;
    private final AtomicReference<Boolean> connectionInactiveUpdate;
    private final AtomicReference<Boolean> connectionInactivePrev;
    private ReceiverConfig<V> receiverConfig;
    private volatile ReceiverState receiverState;
    private ConnectionCounterMonitor connectionMonitor;
    private CompletableFuture<Void> mainLoopExitSync;
    private volatile Thread receivingThread;

    public Receiver() {
        this(new ReceiverConfig());
    }

    public Receiver(ReceiverConfig<V> receiverConfig) {
        this.isRunning = new AtomicBoolean();
        this.isCleaned = new AtomicBoolean();
        this.mainHeaderHandlers = new ArrayList();
        this.dataHeaderHandlers = new ArrayList();
        this.valueHandlers = new ArrayList();
        this.connectionCountHandlers = new LinkedHashSet();
        this.connectionCountUpdate = new AtomicInteger(Integer.MIN_VALUE);
        this.connectionCountPrev = new AtomicInteger(this.connectionCountUpdate.get());
        this.reconnecting = new AtomicBoolean(false);
        this.connectionIdleHandlers = new LinkedHashSet();
        this.connectionIdleUpdate = new AtomicReference<>(Boolean.TRUE);
        this.connectionIdlePrev = new AtomicReference<>(Boolean.valueOf(!this.connectionIdleUpdate.get().booleanValue()));
        this.connectionInactiveHandlers = new LinkedHashSet();
        this.connectionInactiveUpdate = new AtomicReference<>(Boolean.TRUE);
        this.connectionInactivePrev = new AtomicReference<>(Boolean.valueOf(!this.connectionIdleUpdate.get().booleanValue()));
        this.receiverState = new ReceiverState();
        this.receiverConfig = receiverConfig;
        this.dataHeaderHandlers.add(this.receiverConfig.getMessageExtractor());
    }

    @Override // ch.psi.bsread.IReceiver
    public void connect() {
        if (this.isRunning.compareAndSet(false, true)) {
            this.receivingThread = null;
            this.receiverState = new ReceiverState();
            this.isCleaned.set(false);
            this.mainLoopExitSync = new CompletableFuture<>();
            this.socket = this.receiverConfig.getContext().socket(this.receiverConfig.getSocketType());
            this.socket.setRcvHWM(this.receiverConfig.getHighWaterMark());
            this.socket.setLinger(this.receiverConfig.getLinger());
            this.socket.setReceiveTimeOut(this.receiverConfig.getReceiveTimeout());
            if (this.receiverConfig.getReceiveBufferSize() > 0) {
                this.socket.setReceiveBufferSize(this.receiverConfig.getReceiveBufferSize());
            }
            if (this.receiverConfig.getMsgAllocator() != null) {
                this.socket.base().setSocketOpt(1003, this.receiverConfig.getMsgAllocator());
            }
            if (!this.connectionCountHandlers.isEmpty()) {
                if (this.connectionMonitor == null) {
                    this.connectionMonitor = new ConnectionCounterMonitor();
                    this.connectionMonitor.addHandler(this);
                }
                this.connectionMonitor.start(new MonitorConfig(this.receiverConfig.getContext(), this.socket, this.receiverConfig.getObjectMapper(), this.receiverConfig.getSocketType(), false, false, UUID.randomUUID().toString()));
            }
            Utils.connect(this.socket, this.receiverConfig.getAddress(), this.receiverConfig.getSocketType());
            LOGGER.info("Receiver '{}' connected", this.receiverConfig.getAddress());
        }
    }

    @Override // ch.psi.bsread.IReceiver, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isRunning.compareAndSet(true, false)) {
            LOGGER.info("Receiver '{}' stopping...", this.receiverConfig.getAddress());
            if (Thread.currentThread().equals(this.receivingThread)) {
                cleanup();
                return;
            }
            try {
                this.mainLoopExitSync.get((long) Math.max(1000.0d, 1.5d * this.receiverConfig.getReceiveTimeout()), TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                LOGGER.warn("Could not stop '{}' within timelimits. Do cleanup in closing thread (this might lead to inconsistent state but still better than no cleanup).", this.receiverConfig.getAddress());
                cleanup();
            }
        }
    }

    protected boolean cleanup() {
        if (!this.isCleaned.compareAndSet(false, true)) {
            return false;
        }
        this.isRunning.set(false);
        if (this.connectionMonitor != null) {
            this.connectionMonitor.disableUpdate();
            this.connectionMonitor.stop();
        }
        if (this.socket != null) {
            this.socket.close();
            this.socket = null;
        }
        this.mainLoopExitSync.complete(null);
        this.receivingThread = null;
        LOGGER.info("Receiver '{}' stopped.", this.receiverConfig.getAddress());
        return true;
    }

    protected void reconnect() {
        this.reconnecting.set(true);
        if (cleanup()) {
            connect();
        } else {
            this.isRunning.set(false);
        }
        this.reconnecting.set(false);
        this.receivingThread = Thread.currentThread();
    }

    /* JADX WARN: Removed duplicated region for block: B:48:0x01c9  */
    @Override // ch.psi.bsread.IReceiver
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public ch.psi.bsread.message.Message<V> receive() throws java.lang.RuntimeException {
        /*
            Method dump skipped, instructions count: 577
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: ch.psi.bsread.Receiver.receive():ch.psi.bsread.message.Message");
    }

    @Override // ch.psi.bsread.ConfigIReceiver
    public int drain() {
        int i = 0;
        while (this.socket.hasReceiveMore()) {
            this.socket.base().recv(0);
            i++;
        }
        return i;
    }

    @Override // java.util.function.IntConsumer
    public void accept(int i) {
        if (this.reconnecting.get()) {
            return;
        }
        this.connectionCountUpdate.set(i);
        if (i <= 0) {
            this.receiverState = new ReceiverState();
            setConnectionIdle(true);
            setConnectionInactive(true);
        }
    }

    protected void handleConnectionCountChanges() {
        int andSet = this.connectionCountUpdate.getAndSet(Integer.MIN_VALUE);
        if (andSet == Integer.MIN_VALUE || this.connectionCountPrev.getAndSet(andSet) == andSet) {
            return;
        }
        if (this.receiverConfig.isParallelHandlerProcessing()) {
            getConnectionCountHandlers().parallelStream().forEach(intConsumer -> {
                intConsumer.accept(andSet);
            });
            return;
        }
        Iterator<IntConsumer> it = getConnectionCountHandlers().iterator();
        while (it.hasNext()) {
            it.next().accept(andSet);
        }
    }

    public void setConnectionIdle(boolean z) {
        this.connectionIdleUpdate.set(Boolean.valueOf(z));
    }

    protected void handleConnectionIdleChanges(boolean z) {
        setConnectionIdle(z);
        handleConnectionIdleChanges();
    }

    protected void handleConnectionIdleChanges() {
        Boolean andSet = this.connectionIdleUpdate.getAndSet(null);
        if (andSet == null || this.connectionIdlePrev.getAndSet(andSet).booleanValue() == andSet.booleanValue()) {
            return;
        }
        if (this.receiverConfig.isParallelHandlerProcessing()) {
            getConnectionIdleHandlers().parallelStream().forEach(consumer -> {
                consumer.accept(andSet);
            });
            return;
        }
        Iterator<Consumer<Boolean>> it = getConnectionIdleHandlers().iterator();
        while (it.hasNext()) {
            it.next().accept(andSet);
        }
    }

    public void setConnectionInactive(boolean z) {
        this.connectionInactiveUpdate.set(Boolean.valueOf(z));
    }

    protected void handleConnectionInactiveChanges(boolean z) {
        setConnectionInactive(z);
        handleConnectionInactiveChanges();
    }

    protected void handleConnectionInactiveChanges() {
        Boolean andSet = this.connectionInactiveUpdate.getAndSet(null);
        if (andSet == null || this.connectionInactivePrev.getAndSet(andSet).booleanValue() == andSet.booleanValue()) {
            return;
        }
        if (this.receiverConfig.isParallelHandlerProcessing()) {
            getConnectionInactiveHandlers().parallelStream().forEach(consumer -> {
                consumer.accept(andSet);
            });
            return;
        }
        Iterator<Consumer<Boolean>> it = getConnectionInactiveHandlers().iterator();
        while (it.hasNext()) {
            it.next().accept(andSet);
        }
    }

    @Override // ch.psi.bsread.ConfigIReceiver
    public ZMQ.Socket getSocket() {
        return this.socket;
    }

    @Override // ch.psi.bsread.ConfigIReceiver
    public ReceiverConfig<V> getReceiverConfig() {
        return this.receiverConfig;
    }

    @Override // ch.psi.bsread.ConfigIReceiver
    public ReceiverState getReceiverState() {
        return this.receiverState;
    }

    @Override // ch.psi.bsread.IReceiver
    public Collection<Consumer<Map<String, Value<V>>>> getValueHandlers() {
        return this.valueHandlers;
    }

    public void addValueHandler(Consumer<Map<String, Value<V>>> consumer) {
        this.valueHandlers.add(consumer);
    }

    public void removeValueHandler(Consumer<Map<String, Value<V>>> consumer) {
        this.valueHandlers.remove(consumer);
    }

    @Override // ch.psi.bsread.IReceiver
    public Collection<Consumer<MainHeader>> getMainHeaderHandlers() {
        return this.mainHeaderHandlers;
    }

    public void addMainHeaderHandler(Consumer<MainHeader> consumer) {
        this.mainHeaderHandlers.add(consumer);
    }

    public void removeMainHeaderHandler(Consumer<MainHeader> consumer) {
        this.mainHeaderHandlers.remove(consumer);
    }

    @Override // ch.psi.bsread.IReceiver
    public Collection<Consumer<DataHeader>> getDataHeaderHandlers() {
        return this.dataHeaderHandlers;
    }

    public void addDataHeaderHandler(Consumer<DataHeader> consumer) {
        this.dataHeaderHandlers.add(consumer);
    }

    public void removeDataHeaderHandler(Consumer<DataHeader> consumer) {
        this.dataHeaderHandlers.remove(consumer);
    }

    @Override // ch.psi.bsread.IReceiver
    public Collection<IntConsumer> getConnectionCountHandlers() {
        return this.connectionCountHandlers;
    }

    public void addConnectionCountHandler(IntConsumer intConsumer) {
        this.connectionCountHandlers.add(intConsumer);
    }

    public void removeConnectionCountHandler(IntConsumer intConsumer) {
        this.connectionCountHandlers.remove(intConsumer);
    }

    @Override // ch.psi.bsread.IReceiver
    public Collection<Consumer<Boolean>> getConnectionIdleHandlers() {
        return this.connectionIdleHandlers;
    }

    public void addConnectionIdleHandler(Consumer<Boolean> consumer) {
        this.connectionIdleHandlers.add(consumer);
    }

    public void removeConnectionIdleHandler(Consumer<Boolean> consumer) {
        this.connectionIdleHandlers.remove(consumer);
    }

    @Override // ch.psi.bsread.IReceiver
    public Collection<Consumer<Boolean>> getConnectionInactiveHandlers() {
        return this.connectionInactiveHandlers;
    }

    public void addConnectionInactiveHandler(Consumer<Boolean> consumer) {
        this.connectionInactiveHandlers.add(consumer);
    }

    public void removeConnectionInactiveHandler(Consumer<Boolean> consumer) {
        this.connectionInactiveHandlers.remove(consumer);
    }
}
