From 3828b3754a54310fcf3f2fe0940e1bdb43041309 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 19 Aug 2008 13:21:22 +0000 Subject: [PATCH] Relates issue: NETTY-18 (Performance degradation when Channel.write() is called from outside an I/O thread (NIO transport) * The bottleneck was too frequent wakeups. I found that recent NIO implementations don't require wakeups to get / set interestOps. --- .../netty/channel/socket/nio/NioWorker.java | 138 ++++++++++++++++-- 1 file changed, 124 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index c8b9485e09..fef1288121 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -46,7 +46,25 @@ import org.jboss.netty.util.ThreadRenamingRunnable; class NioWorker implements Runnable { - private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioWorker.class); + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(NioWorker.class); + + /** + * FIXME Auto-detect the level + * + * 0 - no need to wake up to get / set interestOps + * 1 - no need to wake up to get interestOps, but need to wake up to set. + * 2 - need to wake up to get / set interestOps + */ + private static final int WAKEUP_REQUIREMENT_LEVEL = 0; + + static { + if (WAKEUP_REQUIREMENT_LEVEL < 0 || WAKEUP_REQUIREMENT_LEVEL > 2) { + throw new Error( + "Unexpected wakeup requirement level: " + + WAKEUP_REQUIREMENT_LEVEL + ", please report this error."); + } + } private final int bossId; private final int id; @@ -284,11 +302,12 @@ class NioWorker implements Runnable { } else { maxWrittenBytes = Integer.MAX_VALUE; } + int writtenBytes = 0; synchronized (channel.writeBuffer) { for (;;) { - if (channel.writeBuffer.isEmpty() && channel.currentWriteEvent == null) { + if (channel.currentWriteEvent == null && channel.writeBuffer.isEmpty()) { removeOpWrite = true; break; } @@ -350,6 +369,9 @@ class NioWorker implements Runnable { Selector selector = worker.selector; SelectionKey key = channel.socket.keyFor(selector); + if (key == null) { + return; + } if (!key.isValid()) { close(key); return; @@ -357,42 +379,104 @@ class NioWorker implements Runnable { int interestOps; boolean changed = false; if (opWrite) { - if (Thread.currentThread() == worker.thread) { + switch (WAKEUP_REQUIREMENT_LEVEL) { + case 0: interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { interestOps |= SelectionKey.OP_WRITE; key.interestOps(interestOps); changed = true; } - } else { - synchronized (worker.selectorGuard) { - selector.wakeup(); + break; + case 1: + interestOps = key.interestOps(); + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + if (Thread.currentThread() == worker.thread) { + interestOps |= SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; + } else { + synchronized (worker.selectorGuard) { + selector.wakeup(); + interestOps |= SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; + } + } + } + break; + case 2: + if (Thread.currentThread() == worker.thread) { interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { interestOps |= SelectionKey.OP_WRITE; key.interestOps(interestOps); changed = true; } + } else { + synchronized (worker.selectorGuard) { + selector.wakeup(); + interestOps = key.interestOps(); + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + interestOps |= SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; + } + } } + break; + default: + throw new Error(); } } else { - if (Thread.currentThread() == worker.thread) { + switch (WAKEUP_REQUIREMENT_LEVEL) { + case 0: interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) != 0) { interestOps &= ~SelectionKey.OP_WRITE; key.interestOps(interestOps); changed = true; } - } else { - synchronized (worker.selectorGuard) { - selector.wakeup(); + break; + case 1: + interestOps = key.interestOps(); + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + if (Thread.currentThread() == worker.thread) { + interestOps &= ~SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; + } else { + synchronized (worker.selectorGuard) { + selector.wakeup(); + interestOps &= ~SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; + } + } + } + break; + case 2: + if (Thread.currentThread() == worker.thread) { interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) != 0) { interestOps &= ~SelectionKey.OP_WRITE; key.interestOps(interestOps); changed = true; } + } else { + synchronized (worker.selectorGuard) { + selector.wakeup(); + interestOps = key.interestOps(); + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + interestOps &= ~SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; + } + } } + break; + default: + throw new Error(); } } @@ -458,19 +542,45 @@ class NioWorker implements Runnable { boolean changed = false; try { - if (Thread.currentThread() == worker.thread) { + switch (WAKEUP_REQUIREMENT_LEVEL) { + case 0: if (key.interestOps() != interestOps) { key.interestOps(interestOps); changed = true; } - } else { - synchronized (worker.selectorGuard) { - selector.wakeup(); + break; + case 1: + if (key.interestOps() != interestOps) { + if (Thread.currentThread() == worker.thread) { + key.interestOps(interestOps); + changed = true; + } else { + synchronized (worker.selectorGuard) { + selector.wakeup(); + key.interestOps(interestOps); + changed = true; + } + } + } + break; + case 2: + if (Thread.currentThread() == worker.thread) { if (key.interestOps() != interestOps) { key.interestOps(interestOps); changed = true; } + } else { + synchronized (worker.selectorGuard) { + selector.wakeup(); + if (key.interestOps() != interestOps) { + key.interestOps(interestOps); + changed = true; + } + } } + break; + default: + throw new Error(); } future.setSuccess();