From dccc9f86652e32f7bd36e2888edce53534d0280f Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sun, 28 Sep 2008 15:01:21 +0000 Subject: [PATCH] * Improved the throughput of the server-side accept operation * Added FastQueue.isEmpty() --- .../nio/NioServerSocketPipelineSink.java | 4 - .../netty/channel/socket/nio/NioWorker.java | 155 ++++++++++-------- .../java/org/jboss/netty/util/FastQueue.java | 5 + 3 files changed, 92 insertions(+), 72 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index aac20ff2b8..9c8e6c840a 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -202,10 +202,6 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { for (;;) { try { SocketChannel acceptedSocket = channel.socket.accept(); - if (acceptedSocket == null) { - continue; - } - try { ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline(); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 383b09c8ad..b7d5fda3df 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -32,7 +32,6 @@ import java.nio.channels.ScatteringByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; -import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -72,11 +71,15 @@ class NioWorker implements Runnable { private final int id; private final Executor executor; private final AtomicBoolean started = new AtomicBoolean(); - volatile Thread thread; - volatile Selector selector; - final AtomicBoolean wakenUp = new AtomicBoolean(); - final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); - final Queue taskQueue = new ConcurrentLinkedQueue(); + private volatile Thread thread; + private volatile Selector selector; + private final AtomicBoolean wakenUp = new AtomicBoolean(); + private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); + private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock(); + //private final FastQueue taskQueue = new FastQueue(); + //private final ConcurrentFastQueue taskQueue = new ConcurrentFastQueue(); + private final FastQueue registerTaskQueue = new FastQueue(); + private final ConcurrentLinkedQueue writeTaskQueue = new ConcurrentLinkedQueue(); NioWorker(int bossId, int id, Executor executor) { this.bossId = bossId; @@ -104,60 +107,24 @@ class NioWorker implements Runnable { } } + boolean server = !(channel instanceof NioClientSocketChannel); + Runnable registerTask = new RegisterTask(selector, channel, future, server); if (firstChannel) { - try { - channel.socket.register(selector, SelectionKey.OP_READ, channel); - if (future != null) { - future.setSuccess(); - } - } catch (ClosedChannelException e) { - future.setFailure(e); - throw new ChannelException( - "Failed to register a socket to the selector.", e); - } - - boolean server = !(channel instanceof NioClientSocketChannel); - if (server) { - fireChannelOpen(channel); - fireChannelBound(channel, channel.getLocalAddress()); - } else if (!((NioClientSocketChannel) channel).boundManually) { - fireChannelBound(channel, channel.getLocalAddress()); - } - fireChannelConnected(channel, channel.getRemoteAddress()); - + registerTask.run(); String threadName = (server ? "New I/O server worker #" : "New I/O client worker #") + bossId + '-' + id; executor.execute(new ThreadRenamingRunnable(this, threadName)); } else { - selectorGuard.readLock().lock(); + shutdownLock.readLock().lock(); try { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - - try { - channel.socket.register(selector, SelectionKey.OP_READ, channel); - if (future != null) { - future.setSuccess(); - } - } catch (ClosedChannelException e) { - future.setFailure(e); - throw new ChannelException( - "Failed to register a socket to the selector.", e); - } - - boolean server = !(channel instanceof NioClientSocketChannel); - if (server) { - fireChannelOpen(channel); - fireChannelBound(channel, channel.getLocalAddress()); - } else if (!((NioClientSocketChannel) channel).boundManually) { - fireChannelBound(channel, channel.getLocalAddress()); - } - fireChannelConnected(channel, channel.getRemoteAddress()); + registerTaskQueue.offer(registerTask); } finally { - selectorGuard.readLock().unlock(); + shutdownLock.readLock().unlock(); + } + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); } } } @@ -170,15 +137,18 @@ class NioWorker implements Runnable { for (;;) { wakenUp.set(false); - selectorGuard.writeLock().lock(); - // This empty synchronization block prevents the selector - // from acquiring its lock. - selectorGuard.writeLock().unlock(); + if (CONSTRAINT_LEVEL != 0) { + selectorGuard.writeLock().lock(); + // This empty synchronization block prevents the selector + // from acquiring its lock. + selectorGuard.writeLock().unlock(); + } try { int selectedKeyCount = selector.select(500); - processTaskQueue(); + processRegisterTaskQueue(); + processWriteTaskQueue(); if (selectedKeyCount > 0) { processSelectedKeys(selector.selectedKeys()); @@ -193,9 +163,9 @@ class NioWorker implements Runnable { if (shutdown || executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { - selectorGuard.writeLock().lock(); + shutdownLock.writeLock().lock(); try { - if (selector.keys().isEmpty()) { + if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { try { selector.close(); } catch (IOException e) { @@ -210,7 +180,7 @@ class NioWorker implements Runnable { shutdown = false; } } finally { - selectorGuard.writeLock().unlock(); + shutdownLock.writeLock().unlock(); } } else { // Give one more second. @@ -233,9 +203,20 @@ class NioWorker implements Runnable { } } - private void processTaskQueue() { + private void processRegisterTaskQueue() { for (;;) { - final Runnable task = taskQueue.poll(); + final Runnable task = registerTaskQueue.poll(); + if (task == null) { + break; + } + + task.run(); + } + } + + private void processWriteTaskQueue() { + for (;;) { + final Runnable task = writeTaskQueue.poll(); if (task == null) { break; } @@ -390,7 +371,7 @@ class NioWorker implements Runnable { Thread workerThread = worker.thread; if (workerThread != null && Thread.currentThread() != workerThread) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { - worker.taskQueue.offer(channel.writeTask); + worker.writeTaskQueue.offer(channel.writeTask); } if (worker.wakenUp.compareAndSet(false, true)) { worker.selector.wakeup(); @@ -434,11 +415,11 @@ class NioWorker implements Runnable { int bufIdx; synchronized (channel.writeLock) { - FastQueue internalWriteBuffer = channel.writeBuffer; + FastQueue writeBuffer = channel.writeBuffer; evt = channel.currentWriteEvent; for (;;) { if (evt == null) { - evt = internalWriteBuffer.poll(); + evt = writeBuffer.poll(); if (evt == null) { channel.currentWriteEvent = null; removeOpWrite = true; @@ -510,11 +491,11 @@ class NioWorker implements Runnable { int writtenBytes = 0; synchronized (channel.writeLock) { - FastQueue internalWriteBuffer = channel.writeBuffer; + FastQueue writeBuffer = channel.writeBuffer; evt = channel.currentWriteEvent; for (;;) { if (evt == null) { - evt = internalWriteBuffer.poll(); + evt = writeBuffer.poll(); if (evt == null) { channel.currentWriteEvent = null; removeOpWrite = true; @@ -758,9 +739,9 @@ class NioWorker implements Runnable { fireExceptionCaught(channel, cause); } - FastQueue internalWriteBuffer = channel.writeBuffer; + FastQueue writeBuffer = channel.writeBuffer; for (;;) { - evt = internalWriteBuffer.poll(); + evt = writeBuffer.poll(); if (evt == null) { break; } @@ -837,4 +818,42 @@ class NioWorker implements Runnable { fireExceptionCaught(channel, t); } } + + private class RegisterTask implements Runnable { + private final Selector selector; + private final NioSocketChannel channel; + private final ChannelFuture future; + private final boolean server; + + RegisterTask( + Selector selector, + NioSocketChannel channel, ChannelFuture future, boolean server) { + + this.selector = selector; + this.channel = channel; + this.future = future; + this.server = server; + } + + public void run() { + try { + channel.socket.register(selector, SelectionKey.OP_READ, channel); + if (future != null) { + future.setSuccess(); + } + } catch (ClosedChannelException e) { + future.setFailure(e); + throw new ChannelException( + "Failed to register a socket to the selector.", e); + } + + if (server) { + fireChannelOpen(channel); + fireChannelBound(channel, channel.getLocalAddress()); + } else if (!((NioClientSocketChannel) channel).boundManually) { + fireChannelBound(channel, channel.getLocalAddress()); + } + fireChannelConnected(channel, channel.getRemoteAddress()); + } + } } \ No newline at end of file diff --git a/src/main/java/org/jboss/netty/util/FastQueue.java b/src/main/java/org/jboss/netty/util/FastQueue.java index 293ccd112d..0b94b9054a 100644 --- a/src/main/java/org/jboss/netty/util/FastQueue.java +++ b/src/main/java/org/jboss/netty/util/FastQueue.java @@ -71,6 +71,11 @@ public class FastQueue { return null; } + public synchronized boolean isEmpty() { + return offeredElements == null && + (drainedElements == null || drainedElements.isEmpty()); + } + @SuppressWarnings("unchecked") private E cast(Object o) { return (E) o;