package ch.psi.bsread.monitors;

import ch.psi.bsread.common.concurrent.executor.CommonExecutors;
import ch.psi.bsread.message.commands.StopCommand;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
import zmq.ZMQ;

/* loaded from: input_file:ch/psi/bsread/monitors/ConnectionCounterMonitor.class */
public class ConnectionCounterMonitor implements Monitor {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ConnectionCounterMonitor.class);
    private Set<IntConsumer> handlers = new LinkedHashSet();
    private AtomicLong runIdProvider = new AtomicLong();
    private final AtomicInteger connectionCounter = new AtomicInteger();
    private MonitorConfig monitorConfig;

    @Override // ch.psi.bsread.monitors.Monitor
    public void start(MonitorConfig monitorConfig) {
        ZMQ.Socket socket;
        String str = "inproc://" + monitorConfig.getMonitorItentifier();
        try {
            monitorConfig.getSocket().base().monitor(str, 1569);
            socket = monitorConfig.getContext().socket(0);
            socket.connect(str);
        } catch (Exception e) {
            socket = null;
            LOGGER.warn("Could not establish a connection monitor for identifier '{}'.", monitorConfig.getMonitorItentifier());
        }
        if (socket != null) {
            long incrementAndGet = this.runIdProvider.incrementAndGet();
            ExecutorService newSingleThreadExecutor = CommonExecutors.newSingleThreadExecutor(monitorConfig.getMonitorItentifier());
            this.monitorConfig = monitorConfig;
            this.connectionCounter.set(0);
            updateHandlers(incrementAndGet, 0);
            ZMQ.Socket socket2 = socket;
            newSingleThreadExecutor.execute(() -> {
                boolean z = true;
                while (z) {
                    try {
                        try {
                            ZMQ.Event read = ZMQ.Event.read(socket2.base());
                            if (read != null && monitorConfig.getSocket().base().errno() != 156384765) {
                                switch (read.event) {
                                    case 1:
                                    case 32:
                                        updateHandlers(incrementAndGet, this.connectionCounter.incrementAndGet());
                                        break;
                                    case 512:
                                        updateHandlers(incrementAndGet, this.connectionCounter.decrementAndGet());
                                        break;
                                    case 1024:
                                        z = false;
                                        break;
                                    default:
                                        LOGGER.info("Unexpected event '{}' received for identifier '{} monitoring '{}'", Integer.valueOf(read.event), monitorConfig.getMonitorItentifier(), read.addr);
                                        break;
                                }
                            }
                            LOGGER.debug("Stop ConnectionCounter for identifier '{}'.", monitorConfig.getMonitorItentifier());
                            if (socket2 != null) {
                                socket2.close();
                            }
                            updateHandlers(incrementAndGet, 0);
                            newSingleThreadExecutor.shutdown();
                        } catch (Throwable th) {
                            LOGGER.warn("Monitoring zmq connections failed for identifier '{}'.", monitorConfig.getMonitorItentifier(), th);
                            LOGGER.debug("Stop ConnectionCounter for identifier '{}'.", monitorConfig.getMonitorItentifier());
                            if (socket2 != null) {
                                socket2.close();
                            }
                            updateHandlers(incrementAndGet, 0);
                            newSingleThreadExecutor.shutdown();
                            return;
                        }
                    } catch (Throwable th2) {
                        LOGGER.debug("Stop ConnectionCounter for identifier '{}'.", monitorConfig.getMonitorItentifier());
                        if (socket2 != null) {
                            socket2.close();
                        }
                        updateHandlers(incrementAndGet, 0);
                        newSingleThreadExecutor.shutdown();
                        throw th2;
                    }
                }
                LOGGER.debug("Stop ConnectionCounter for identifier '{}'.", monitorConfig.getMonitorItentifier());
                if (socket2 != null) {
                }
                updateHandlers(incrementAndGet, 0);
                newSingleThreadExecutor.shutdown();
            });
        }
    }

    @Override // ch.psi.bsread.monitors.Monitor
    public synchronized void stop(long j) {
        if (this.monitorConfig != null) {
            sendStopMessage(this.monitorConfig.getSocket(), this, j, this.connectionCounter.get());
        }
    }

    @Override // ch.psi.bsread.monitors.Monitor
    public void stop() {
        stop(-1L);
    }

    public void disableUpdate() {
        this.runIdProvider.incrementAndGet();
    }

    public synchronized void addHandler(IntConsumer intConsumer) {
        this.handlers.add(intConsumer);
    }

    public synchronized void removeHandler(IntConsumer intConsumer) {
        this.handlers.remove(intConsumer);
    }

    private synchronized void updateHandlers(long j, int i) {
        if (j != this.runIdProvider.get()) {
            LOGGER.debug("Do not update connection count handlers since there is a new runId.");
            return;
        }
        Iterator<IntConsumer> it = this.handlers.iterator();
        while (it.hasNext()) {
            it.next().accept(i);
        }
    }

    public static void sendStopMessage(ZMQ.Socket socket, ConnectionCounterMonitor connectionCounterMonitor, long j, int i) {
        try {
            MonitorConfig monitorConfig = connectionCounterMonitor.monitorConfig;
            if (monitorConfig != null && monitorConfig.isSendStopMessage()) {
                byte[] asBytes = StopCommand.getAsBytes(monitorConfig.getObjectMapper(), j);
                int i2 = monitorConfig.getSocketType() == 8 ? i : 1;
                int i3 = monitorConfig.isBlockingSend() ? 0 : 1;
                for (int i4 = 0; i4 < i2; i4++) {
                    socket.send(asBytes, i3);
                }
            }
        } catch (Exception e) {
            LOGGER.warn("Could not send stop command.", (Throwable) e);
        }
    }
}
