package backtype.storm.messaging.netty;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.Utils;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:backtype/storm/messaging/netty/Server.class */
public class Server implements IConnection {
    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
    Map storm_conf;
    int port;
    final ChannelFactory factory;
    final ServerBootstrap bootstrap;
    volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
    private LinkedBlockingQueue<TaskMessage> message_queue = new LinkedBlockingQueue<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public Server(Map map, int i) {
        this.storm_conf = map;
        this.port = i;
        int intValue = Utils.getInt(map.get("storm.messaging.netty.buffer_size")).intValue();
        int intValue2 = Utils.getInt(map.get("storm.messaging.netty.server_worker_threads")).intValue();
        if (intValue2 > 0) {
            this.factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), intValue2);
        } else {
            this.factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
        }
        this.bootstrap = new ServerBootstrap(this.factory);
        this.bootstrap.setOption("child.tcpNoDelay", true);
        this.bootstrap.setOption("child.receiveBufferSize", Integer.valueOf(intValue));
        this.bootstrap.setOption("child.keepAlive", true);
        this.bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
        this.allChannels.add(this.bootstrap.bind(new InetSocketAddress(i)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void enqueue(TaskMessage taskMessage) throws InterruptedException {
        this.message_queue.put(taskMessage);
        LOG.debug("message received with task: {}, payload size: {}", Integer.valueOf(taskMessage.task()), Integer.valueOf(taskMessage.message().length));
    }

    public TaskMessage recv(int i) {
        if ((i & 1) == 1) {
            return this.message_queue.poll();
        }
        try {
            TaskMessage take = this.message_queue.take();
            LOG.debug("request to be processed: {}", take);
            return take;
        } catch (InterruptedException e) {
            LOG.info("exception within msg receiving", e);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addChannel(Channel channel) {
        this.allChannels.add(channel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeChannel(Channel channel) {
        channel.close().awaitUninterruptibly();
        this.allChannels.remove(channel);
    }

    public synchronized void close() {
        if (this.allChannels != null) {
            this.allChannels.close().awaitUninterruptibly();
            this.factory.releaseExternalResources();
            this.allChannels = null;
        }
    }

    public void send(int i, byte[] bArr) {
        throw new RuntimeException("Server connection should not send any messages");
    }
}
