Even more optimization in NioWorker.write()

This commit is contained in:
Trustin Lee 2008-08-20 01:38:21 +00:00
parent 96e1cf0385
commit e9e1b0ebe6
3 changed files with 82 additions and 63 deletions

View File

@ -107,7 +107,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
MessageEvent event = (MessageEvent) e; MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel(); NioSocketChannel channel = (NioSocketChannel) event.getChannel();
channel.writeBuffer.offer(event); channel.writeBuffer.offer(event);
NioWorker.write(channel); NioWorker.write(channel, true);
} }
} }

View File

@ -127,7 +127,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
MessageEvent event = (MessageEvent) e; MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel(); NioSocketChannel channel = (NioSocketChannel) event.getChannel();
channel.writeBuffer.offer(event); channel.writeBuffer.offer(event);
NioWorker.write(channel); NioWorker.write(channel, true);
} }
} }

View File

@ -260,7 +260,7 @@ class NioWorker implements Runnable {
private static void write(SelectionKey k) { private static void write(SelectionKey k) {
NioSocketChannel ch = (NioSocketChannel) k.attachment(); NioSocketChannel ch = (NioSocketChannel) k.attachment();
write(ch); write(ch, false);
} }
private static void close(SelectionKey k) { private static void close(SelectionKey k) {
@ -268,7 +268,7 @@ class NioWorker implements Runnable {
close(ch, ch.getSucceededFuture()); close(ch, ch.getSucceededFuture());
} }
static void write(NioSocketChannel channel) { static void write(NioSocketChannel channel, boolean mightNeedWakeup) {
if (channel.writeBuffer.isEmpty() && channel.currentWriteEvent == null) { if (channel.writeBuffer.isEmpty() && channel.currentWriteEvent == null) {
return; return;
} }
@ -337,13 +337,14 @@ class NioWorker implements Runnable {
} }
if (addOpWrite) { if (addOpWrite) {
setOpWrite(channel, true); setOpWrite(channel, true, mightNeedWakeup);
} else if (removeOpWrite) { } else if (removeOpWrite) {
setOpWrite(channel, false); setOpWrite(channel, false, mightNeedWakeup);
} }
} }
private static void setOpWrite(NioSocketChannel channel, boolean opWrite) { private static void setOpWrite(
NioSocketChannel channel, boolean opWrite, boolean mightNeedWakeup) {
NioWorker worker = channel.getWorker(); NioWorker worker = channel.getWorker();
if (worker == null) { if (worker == null) {
IllegalStateException cause = IllegalStateException cause =
@ -364,104 +365,122 @@ class NioWorker implements Runnable {
int interestOps; int interestOps;
boolean changed = false; boolean changed = false;
if (opWrite) { if (opWrite) {
switch (CONSTRAINT_LEVEL) { if (!mightNeedWakeup) {
case 0:
interestOps = key.interestOps(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) { if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE; interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} }
break; } else {
case 1: switch (CONSTRAINT_LEVEL) {
interestOps = key.interestOps(); case 0:
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
if (Thread.currentThread() == worker.thread) {
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
} else {
synchronized (worker.selectorGuard) {
selector.wakeup();
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
}
}
}
break;
case 2:
if (Thread.currentThread() == worker.thread) {
interestOps = key.interestOps(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) { if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE; interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} }
} else { break;
synchronized (worker.selectorGuard) { case 1:
selector.wakeup(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
if (Thread.currentThread() == worker.thread) {
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
} else {
synchronized (worker.selectorGuard) {
selector.wakeup();
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
}
}
}
break;
case 2:
if (Thread.currentThread() == worker.thread) {
interestOps = key.interestOps(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) { if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE; interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} }
} else {
synchronized (worker.selectorGuard) {
selector.wakeup();
interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
}
}
} }
break;
default:
throw new Error();
} }
break;
default:
throw new Error();
} }
} else { } else {
switch (CONSTRAINT_LEVEL) { if (!mightNeedWakeup) {
case 0:
interestOps = key.interestOps(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) { if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE; interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} }
break; } else {
case 1: switch (CONSTRAINT_LEVEL) {
interestOps = key.interestOps(); case 0:
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
if (Thread.currentThread() == worker.thread) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
} else {
synchronized (worker.selectorGuard) {
selector.wakeup();
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
}
}
}
break;
case 2:
if (Thread.currentThread() == worker.thread) {
interestOps = key.interestOps(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) { if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE; interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} }
} else { break;
synchronized (worker.selectorGuard) { case 1:
selector.wakeup(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
if (Thread.currentThread() == worker.thread) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
} else {
synchronized (worker.selectorGuard) {
selector.wakeup();
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
}
}
}
break;
case 2:
if (Thread.currentThread() == worker.thread) {
interestOps = key.interestOps(); interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) { if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE; interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} }
} else {
synchronized (worker.selectorGuard) {
selector.wakeup();
interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
}
}
} }
break;
default:
throw new Error();
} }
break;
default:
throw new Error();
} }
} }