package com.neemre.btcdcli4j.daemon.notification;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.neemre.btcdcli4j.core.client.BtcdClient;
import com.neemre.btcdcli4j.core.common.Errors;
import com.neemre.btcdcli4j.core.util.StringUtils;
import com.neemre.btcdcli4j.daemon.NotificationHandlerException;
import com.neemre.btcdcli4j.daemon.Notifications;
import com.neemre.btcdcli4j.daemon.notification.worker.NotificationWorker;
import com.neemre.btcdcli4j.daemon.notification.worker.NotificationWorkerFactory;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/neemre/btcdcli4j/daemon/notification/NotificationMonitor.class */
public class NotificationMonitor extends Observable implements Observer, Callable<Void> {
    private static final Logger LOG = LoggerFactory.getLogger(NotificationMonitor.class);
    private static final int WORKER_MIN_COUNT = 1;
    private static final int WORKER_MAX_COUNT = 10;
    private static final int TASK_QUEUE_LENGTH = 100;
    private static final int IDLE_WORKER_TIMEOUT = 60000;
    private static final int IDLE_SOCKET_TIMEOUT = 5000;
    private Notifications type;
    private int serverPort;
    private ServerSocket serverSocket;
    private volatile boolean isActive;

    @Nullable
    private BtcdClient client;

    @Nullable
    private Consumer<Throwable> errorHandler;
    private ThreadPoolExecutor executor;
    private ListeningExecutorService workerPool;

    public NotificationMonitor(Notifications notifications, int i, @Nullable BtcdClient btcdClient) {
        this(notifications, i, btcdClient, null);
    }

    public NotificationMonitor(Notifications notifications, int i, @Nullable BtcdClient btcdClient, @Nullable Consumer<Throwable> consumer) {
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = notifications.name();
        objArr[WORKER_MIN_COUNT] = Integer.valueOf(i);
        objArr[2] = btcdClient == null ? "no" : "yes";
        logger.info("** NotificationMonitor(): launching new '{}' notification monitor (port: '{}', RPC-capable: '{}')", objArr);
        this.errorHandler = consumer;
        this.type = notifications;
        this.serverPort = i;
        this.client = btcdClient;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Void call() throws NotificationHandlerException {
        activate();
        LOG.info("-- run(..): started listening for '{}' notifications on port '{}'", this.type.name(), Integer.valueOf(this.serverSocket.getLocalPort()));
        while (this.isActive) {
            try {
                try {
                    NotificationWorker createWorker = NotificationWorkerFactory.createWorker(this.type, this.serverSocket.accept(), this.client);
                    createWorker.addObserver(this);
                    Futures.addCallback(this.workerPool.submit(createWorker), new FutureCallback<Void>() { // from class: com.neemre.btcdcli4j.daemon.notification.NotificationMonitor.1
                        public void onSuccess(Void r2) {
                        }

                        public void onFailure(Throwable th) {
                            if (NotificationMonitor.this.errorHandler != null) {
                                NotificationMonitor.this.errorHandler.accept(th);
                            }
                        }
                    });
                    LOG.trace("-- run(..): total no. of '{}' notifications received: '{}', task queue occupancy: '{}/{}'", new Object[]{this.type.name(), Long.valueOf(this.executor.getTaskCount()), Integer.valueOf(this.executor.getQueue().size()), Integer.valueOf(TASK_QUEUE_LENGTH)});
                    if (Thread.interrupted()) {
                        deactivate();
                    }
                } catch (SocketTimeoutException e) {
                    LOG.trace("-- run(..): polling '{}' notification monitor for interrupts (socket idle for {}ms)", this.type.name(), Integer.valueOf(IDLE_SOCKET_TIMEOUT));
                    if (Thread.interrupted()) {
                        deactivate();
                    }
                } catch (IOException e2) {
                    Thread.currentThread().interrupt();
                    throw new NotificationHandlerException(Errors.IO_SOCKET_UNINITIALIZED, e2);
                } catch (Throwable th) {
                    Thread.currentThread().interrupt();
                    throw new NotificationHandlerException(Errors.IO_SOCKET_UNINITIALIZED, th);
                }
            } catch (Throwable th2) {
                if (Thread.interrupted()) {
                    deactivate();
                }
                throw th2;
            }
        }
        return null;
    }

    @Override // java.util.Observer
    public synchronized void update(Observable observable, Object obj) {
        LOG.info(">> update(..): worker finished, informing listener(s) of new '{}' notification: '{}'", this.type.name(), obj);
        observable.deleteObserver(this);
        setChanged();
        notifyObservers(obj);
    }

    public boolean isActive() {
        return this.isActive;
    }

    private void activate() throws NotificationHandlerException {
        Thread.currentThread().setName(getUniqueName());
        this.isActive = true;
        try {
            this.serverSocket = new ServerSocket(this.serverPort);
            this.serverSocket.setSoTimeout(IDLE_SOCKET_TIMEOUT);
        } catch (IOException e) {
            try {
                this.serverSocket = new ServerSocket(0);
                this.serverSocket.setSoTimeout(IDLE_SOCKET_TIMEOUT);
                LOG.warn("-- activate(..): failed to create server socket (monitor: '{}', port: '{}'), reverting to unused port '{}'", new Object[]{this.type.name(), Integer.valueOf(this.serverPort), Integer.valueOf(this.serverSocket.getLocalPort())});
            } catch (IOException e2) {
                throw new NotificationHandlerException(Errors.IO_SERVERSOCKET_UNINITIALIZED, e);
            }
        }
        this.executor = new ThreadPoolExecutor(WORKER_MIN_COUNT, WORKER_MAX_COUNT, 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(TASK_QUEUE_LENGTH));
        this.executor.allowCoreThreadTimeOut(true);
        this.executor.setRejectedExecutionHandler((runnable, threadPoolExecutor) -> {
            LOG.error("RejectedExecutionHandler called");
        });
        this.workerPool = MoreExecutors.listeningDecorator(this.executor);
    }

    private void deactivate() {
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = this.type.name();
        objArr[WORKER_MIN_COUNT] = Integer.valueOf(this.serverSocket.getLocalPort());
        objArr[2] = this.client == null ? "no" : "yes";
        logger.info(">> deactivate(..): attempting to shut down '{}' notification monitor (port: '{}', RPC-capable: '{}')", objArr);
        this.isActive = false;
        try {
            this.serverSocket.close();
        } catch (IOException e) {
            LOG.warn("-- deactivate(..): failed to close server socket (monitor: '{}', port: '{}'), message was: '{}'", new Object[]{this.type.name(), Integer.valueOf(this.serverSocket.getLocalPort()), e.getMessage()});
        }
        this.workerPool.shutdown();
    }

    private String getUniqueName() {
        return "NotificationMonitor[" + StringUtils.capitalize(this.type.name().toLowerCase()) + "]-" + StringUtils.random(4, "0123456789");
    }
}
