Relates issue: NETTY-18 (Performance degradation when Channel.write() is called from outside an I/O thread (NIO transport)

* The bottleneck was too frequent wakeups.  I found that recent NIO implementations don't require wakeups to get / set interestOps.
This commit is contained in:
Trustin Lee 2008-08-19 13:21:22 +00:00
parent 07e0bf9413
commit 3828b3754a

View File

@ -46,7 +46,25 @@ import org.jboss.netty.util.ThreadRenamingRunnable;
class NioWorker implements Runnable { class NioWorker implements Runnable {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioWorker.class); private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioWorker.class);
/**
* FIXME Auto-detect the level
*
* 0 - no need to wake up to get / set interestOps
* 1 - no need to wake up to get interestOps, but need to wake up to set.
* 2 - need to wake up to get / set interestOps
*/
private static final int WAKEUP_REQUIREMENT_LEVEL = 0;
static {
if (WAKEUP_REQUIREMENT_LEVEL < 0 || WAKEUP_REQUIREMENT_LEVEL > 2) {
throw new Error(
"Unexpected wakeup requirement level: " +
WAKEUP_REQUIREMENT_LEVEL + ", please report this error.");
}
}
private final int bossId; private final int bossId;
private final int id; private final int id;
@ -284,11 +302,12 @@ class NioWorker implements Runnable {
} else { } else {
maxWrittenBytes = Integer.MAX_VALUE; maxWrittenBytes = Integer.MAX_VALUE;
} }
int writtenBytes = 0; int writtenBytes = 0;
synchronized (channel.writeBuffer) { synchronized (channel.writeBuffer) {
for (;;) { for (;;) {
if (channel.writeBuffer.isEmpty() && channel.currentWriteEvent == null) { if (channel.currentWriteEvent == null && channel.writeBuffer.isEmpty()) {
removeOpWrite = true; removeOpWrite = true;
break; break;
} }
@ -350,6 +369,9 @@ class NioWorker implements Runnable {
Selector selector = worker.selector; Selector selector = worker.selector;
SelectionKey key = channel.socket.keyFor(selector); SelectionKey key = channel.socket.keyFor(selector);
if (key == null) {
return;
}
if (!key.isValid()) { if (!key.isValid()) {
close(key); close(key);
return; return;
@ -357,42 +379,104 @@ class NioWorker implements Runnable {
int interestOps; int interestOps;
boolean changed = false; boolean changed = false;
if (opWrite) { if (opWrite) {
if (Thread.currentThread() == worker.thread) { switch (WAKEUP_REQUIREMENT_LEVEL) {
case 0:
interestOps = key.interestOps(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) { if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE; interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} }
} else { break;
synchronized (worker.selectorGuard) { case 1:
selector.wakeup(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
if (Thread.currentThread() == worker.thread) {
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
} else {
synchronized (worker.selectorGuard) {
selector.wakeup();
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
}
}
}
break;
case 2:
if (Thread.currentThread() == worker.thread) {
interestOps = key.interestOps(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) { if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE; interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} }
} else {
synchronized (worker.selectorGuard) {
selector.wakeup();
interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
}
}
} }
break;
default:
throw new Error();
} }
} else { } else {
if (Thread.currentThread() == worker.thread) { switch (WAKEUP_REQUIREMENT_LEVEL) {
case 0:
interestOps = key.interestOps(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) { if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE; interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} }
} else { break;
synchronized (worker.selectorGuard) { case 1:
selector.wakeup(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
if (Thread.currentThread() == worker.thread) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
} else {
synchronized (worker.selectorGuard) {
selector.wakeup();
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
}
}
}
break;
case 2:
if (Thread.currentThread() == worker.thread) {
interestOps = key.interestOps(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) { if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE; interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} }
} else {
synchronized (worker.selectorGuard) {
selector.wakeup();
interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
}
}
} }
break;
default:
throw new Error();
} }
} }
@ -458,19 +542,45 @@ class NioWorker implements Runnable {
boolean changed = false; boolean changed = false;
try { try {
if (Thread.currentThread() == worker.thread) { switch (WAKEUP_REQUIREMENT_LEVEL) {
case 0:
if (key.interestOps() != interestOps) { if (key.interestOps() != interestOps) {
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} }
} else { break;
synchronized (worker.selectorGuard) { case 1:
selector.wakeup(); if (key.interestOps() != interestOps) {
if (Thread.currentThread() == worker.thread) {
key.interestOps(interestOps);
changed = true;
} else {
synchronized (worker.selectorGuard) {
selector.wakeup();
key.interestOps(interestOps);
changed = true;
}
}
}
break;
case 2:
if (Thread.currentThread() == worker.thread) {
if (key.interestOps() != interestOps) { if (key.interestOps() != interestOps) {
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} }
} else {
synchronized (worker.selectorGuard) {
selector.wakeup();
if (key.interestOps() != interestOps) {
key.interestOps(interestOps);
changed = true;
}
}
} }
break;
default:
throw new Error();
} }
future.setSuccess(); future.setSuccess();