Allow to serve more then one bound port per Thread. This fix the problem that you was not able to bound thousends of ports without huge amount of threads.

This commit is contained in:
Norman Maurer 2012-11-09 15:36:20 +01:00
parent 5c2b6ffa25
commit 95074e3677
4 changed files with 318 additions and 136 deletions

View File

@ -19,10 +19,7 @@ import static org.jboss.netty.channel.Channels.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.channel.AbstractServerChannel;
import org.jboss.netty.channel.ChannelException;
@ -41,16 +38,17 @@ class NioServerSocketChannel extends AbstractServerChannel
InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
final ServerSocketChannel socket;
final Lock shutdownLock = new ReentrantLock();
volatile Selector selector;
final Runnable boss;
private final ServerSocketChannelConfig config;
NioServerSocketChannel(
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink) {
ChannelSink sink, Runnable boss) {
super(factory, pipeline, sink);
this.boss = boss;
try {
socket = ServerSocketChannel.open();

View File

@ -22,12 +22,10 @@ import java.util.concurrent.RejectedExecutionException;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.socket.ServerSocketChannel;
import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil;
/**
* A {@link ServerSocketChannelFactory} which creates a server-side NIO-based
@ -87,7 +85,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
final Executor bossExecutor;
private final WorkerPool<NioWorker> workerPool;
private final ChannelSink sink;
private final NioServerSocketPipelineSink sink;
/**
* Create a new {@link NioServerSocketChannelFactory} using {@link Executors#newCachedThreadPool()}
@ -128,7 +126,25 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
public NioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int workerCount) {
this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount));
this(bossExecutor, 1, workerExecutor, workerCount);
}
/**
* Create a new instance.
*
* @param bossExecutor
* the {@link Executor} which will execute the boss threads
* @param bossCount
* the number of boss threads
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
* @param workerCount
* the maximum number of I/O worker threads
*/
public NioServerSocketChannelFactory(
Executor bossExecutor, int bossCount, Executor workerExecutor,
int workerCount) {
this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount));
}
/**
@ -142,6 +158,22 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
*/
public NioServerSocketChannelFactory(
Executor bossExecutor, WorkerPool<NioWorker> workerPool) {
this(bossExecutor, 1 , workerPool);
}
/**
* Create a new instance.
*
* @param bossExecutor
* the {@link Executor} which will execute the boss threads
* @param bossCount
* the number of boss threads
* @param workerPool
* the {@link WorkerPool} which will be used to obtain the {@link NioWorker} that execute
* the I/O worker threads
*/
public NioServerSocketChannelFactory(
Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) {
if (bossExecutor == null) {
throw new NullPointerException("bossExecutor");
}
@ -151,15 +183,15 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
this.bossExecutor = bossExecutor;
this.workerPool = workerPool;
sink = new NioServerSocketPipelineSink(workerPool);
sink = new NioServerSocketPipelineSink(bossExecutor, bossCount, workerPool);
}
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return new NioServerSocketChannel(this, pipeline, sink);
return new NioServerSocketChannel(this, pipeline, sink, sink.nextBoss());
}
public void releaseExternalResources() {
ExecutorUtil.terminate(bossExecutor);
sink.releaseExternalResources();
if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources();
}

View File

@ -22,15 +22,22 @@ import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState;
@ -38,10 +45,12 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.ExecutorUtil;
class NioServerSocketPipelineSink extends AbstractNioChannelSink {
class NioServerSocketPipelineSink extends AbstractNioChannelSink implements ExternalResourceReleasable {
private static final AtomicInteger nextId = new AtomicInteger();
@ -49,11 +58,17 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
final int id = nextId.incrementAndGet();
private final Boss[] bosses;
private final AtomicInteger bossIndex = new AtomicInteger();
private final WorkerPool<NioWorker> workerPool;
NioServerSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
NioServerSocketPipelineSink(Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) {
this.workerPool = workerPool;
this.bosses = new Boss[bossCount];
for (int i = 0; i < bossCount; i++) {
this.bosses[i] = new Boss(bossExecutor);
}
}
@ -82,14 +97,14 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
close(channel, future);
((Boss) channel.boss).close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
((Boss) channel.boss).bind(channel, future, (SocketAddress) value);
} else {
close(channel, future);
((Boss) channel.boss).close(channel, future);
}
break;
default:
@ -130,53 +145,102 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
}
}
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
NioWorker nextWorker() {
return workerPool.nextWorker();
}
boolean bound = false;
boolean bossStarted = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
Boss nextBoss() {
return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
}
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start(bossExecutor,
new ThreadRenamingRunnable(new Boss(channel),
"New I/O server boss #" + id + " (" + channel + ')'));
bossStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bossStarted && bound) {
close(channel, future);
}
public void releaseExternalResources() {
for (Boss boss: bosses) {
ExecutorUtil.terminate(boss.bossExecutor);
}
}
private static void close(NioServerSocketChannel channel, ChannelFuture future) {
boolean bound = channel.isBound();
try {
if (channel.socket.isOpen()) {
channel.socket.close();
Selector selector = channel.selector;
if (selector != null) {
private final class Boss implements Runnable {
volatile Selector selector;
private final Executor bossExecutor;
/**
* Queue of channel registration tasks.
*/
private final Queue<Runnable> bindTaskQueue = new ConcurrentLinkedQueue<Runnable>();
/**
* Monitor object used to synchronize selector open/close.
*/
private final Object startStopLock = new Object();
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeone for
* the select method and the select method will block for that time unless
* waken up.
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
private Thread currentThread;
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
Boss(Executor bossExecutor) {
this.bossExecutor = bossExecutor;
openSelector();
}
void bind(final NioServerSocketChannel channel, final ChannelFuture future,
final SocketAddress localAddress) {
synchronized (startStopLock) {
if (selector == null) {
// the selector was null this means the Worker has already been shutdown.
throw new RejectedExecutionException("Worker has already been shutdown");
}
boolean offered = bindTaskQueue.offer(new Runnable() {
public void run() {
boolean bound = false;
boolean registered = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);
registered = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!registered && bound) {
close(channel, future);
}
}
}
});
assert offered;
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}
void close(NioServerSocketChannel channel, ChannelFuture future) {
boolean bound = channel.isBound();
// Make sure the boss thread is not running so that that the future
// is notified after a new connection cannot be accepted anymore.
// See NETTY-256 for more information.
channel.shutdownLock.lock();
try {
channel.socket.close();
cancelledKeys ++;
if (channel.setClosed()) {
future.setSuccess();
if (bound) {
fireChannelUnbound(channel);
}
@ -184,101 +248,198 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
} else {
future.setSuccess();
}
} finally {
channel.shutdownLock.unlock();
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
NioWorker nextWorker() {
return workerPool.nextWorker();
}
private final class Boss implements Runnable {
private final Selector selector;
private final NioServerSocketChannel channel;
Boss(NioServerSocketChannel channel) throws IOException {
this.channel = channel;
selector = Selector.open();
boolean registered = false;
private void openSelector() {
try {
channel.socket.register(selector, SelectionKey.OP_ACCEPT);
registered = true;
selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException("Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(bossExecutor, new ThreadRenamingRunnable(this,
"New I/O server boss #" + id));
success = true;
} finally {
if (!registered) {
closeSelector();
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
selector = null;
// The method will return to the caller at this point.
}
}
channel.selector = selector;
assert selector != null && selector.isOpen();
}
public void run() {
final Thread currentThread = Thread.currentThread();
currentThread = Thread.currentThread();
boolean shutdown = false;
for (;;) {
wakenUp.set(false);
channel.shutdownLock.lock();
try {
for (;;) {
try {
// Just do a blocking select without any timeout
// as this thread does not execute anything else.
selector.select();
// There was something selected if we reach this point, so clear
// the selected keys
selector.selectedKeys().clear();
try {
// Just do a blocking select without any timeout
// as this thread does not execute anything else.
selector.select();
// accept connections in a for loop until no new connection is ready
for (;;) {
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket == null) {
break;
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
processBindTaskQueue();
processSelectedKeys(selector.selectedKeys());
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connections are registered in a one-by-one manner instead of
// concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown || bossExecutor instanceof ExecutorService &&
((ExecutorService) bossExecutor).isShutdown()) {
synchronized (startStopLock) {
if (selector.keys().isEmpty()) {
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
} finally {
this.selector = null;
}
break;
} else {
shutdown = false;
}
}
registerAcceptedChannel(acceptedSocket, currentThread);
}
} else {
shutdown = false;
}
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to accept a connection.", e);
}
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedSelectorException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedChannelException e) {
// Closed as requested.
break;
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to accept a connection.", e);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
} finally {
channel.shutdownLock.unlock();
closeSelector();
}
}
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
private void processBindTaskQueue() throws IOException {
for (;;) {
final Runnable task = bindTaskQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
private boolean cleanUpCancelledKeys() throws IOException {
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
selector.selectNow();
return true;
}
return false;
}
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();
try {
// accept connections in a for loop until no new connection is ready
for (;;) {
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket == null) {
break;
}
registerAcceptedChannel(channel, acceptedSocket, currentThread);
}
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
k.cancel();
channel.close();
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (ClosedChannelException e) {
// Closed as requested.
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to accept a connection.", t);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
}
private void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,
Thread currentThread) {
try {
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
parent.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = nextWorker();
worker.register(new NioAcceptedSocketChannel(
channel.getFactory(), pipeline, channel,
parent.getFactory(), pipeline, parent,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) {
@ -299,16 +460,5 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
}
}
}
private void closeSelector() {
channel.selector = null;
try {
selector.close();
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a selector.", e);
}
}
}
}
}

View File

@ -66,6 +66,8 @@ public class SslCloseTest {
Assert.assertTrue(cc.getCloseFuture().awaitUninterruptibly(5000));
serverChannel.close();
cb.releaseExternalResources();
sb.releaseExternalResources();
}