From 5f1ecc9022cd62d64cc715d25b8fe714ed43d7f4 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 26 Sep 2008 03:02:31 +0000 Subject: [PATCH] Fixed throughput issue when messages are written from different threads than I/O worker threads --- .../channel/socket/nio/NioSocketChannel.java | 12 +++++++ .../netty/channel/socket/nio/NioWorker.java | 31 ++++++++++++++++++- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java index 6037a232fc..d08d4b6b0a 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java @@ -49,6 +49,7 @@ abstract class NioSocketChannel extends AbstractChannel final SocketChannel socket; private final NioSocketChannelConfig config; + final Runnable writeTask = new WriteTask(); final Queue writeBuffer = new ConcurrentLinkedQueue(); MessageEvent currentWriteEvent; @@ -110,4 +111,15 @@ abstract class NioSocketChannel extends AbstractChannel return getUnsupportedOperationFuture(); } } + + private class WriteTask implements Runnable { + + WriteTask() { + super(); + } + + public void run() { + NioWorker.write(NioSocketChannel.this, false); + } + } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 631a6607f5..8cea738452 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -32,7 +32,9 @@ import java.nio.channels.ScatteringByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -72,6 +74,7 @@ class NioWorker implements Runnable { volatile Selector selector; final AtomicBoolean wakenUp = new AtomicBoolean(); final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); + final Queue taskQueue = new ConcurrentLinkedQueue(); NioWorker(int bossId, int id, Executor executor) { this.bossId = bossId; @@ -172,6 +175,9 @@ class NioWorker implements Runnable { try { int selectedKeyCount = selector.select(500); + + processTaskQueue(); + if (selectedKeyCount > 0) { processSelectedKeys(selector.selectedKeys()); } @@ -225,6 +231,17 @@ class NioWorker implements Runnable { } } + private void processTaskQueue() { + for (;;) { + final Runnable task = taskQueue.poll(); + if (task == null) { + break; + } + + task.run(); + } + } + private static void processSelectedKeys(Set selectedKeys) { for (Iterator i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); @@ -298,7 +315,19 @@ class NioWorker implements Runnable { close(ch, ch.getSucceededFuture()); } - static void write(NioSocketChannel channel, boolean mightNeedWakeup) { + // FIXME I/O 스레드냐 아니냐에 따라서 task queue 에 안넣거나 넣거나 IoSocketHandler, IoSocketDispatcher + static void write(final NioSocketChannel channel, boolean mightNeedWakeup) { + if (mightNeedWakeup) { + NioWorker worker = channel.getWorker(); + if (worker != null && Thread.currentThread() != worker.thread) { + worker.taskQueue.offer(channel.writeTask); + if (worker.wakenUp.compareAndSet(false, true)) { + worker.selector.wakeup(); + } + return; + } + } + if (!channel.isConnected()) { cleanUpWriteBuffer(channel); return;