NioWorker optimization

* Use of read write lock
* Split write into two versions (fair and unfair)
This commit is contained in:
Trustin Lee 2008-08-31 02:59:54 +00:00
parent c0b5d93b0a
commit 243264efb0

View File

@ -35,12 +35,15 @@ import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.ThreadRenamingRunnable;
@ -59,7 +62,7 @@ class NioWorker implements Runnable {
volatile Thread thread; volatile Thread thread;
volatile Selector selector; volatile Selector selector;
final AtomicBoolean wakenUp = new AtomicBoolean(); final AtomicBoolean wakenUp = new AtomicBoolean();
final Object selectorGuard = new Object(); final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
NioWorker(int bossId, int id, Executor executor) { NioWorker(int bossId, int id, Executor executor) {
this.bossId = bossId; this.bossId = bossId;
@ -113,7 +116,8 @@ class NioWorker implements Runnable {
executor.execute(new ThreadRenamingRunnable(this, threadName)); executor.execute(new ThreadRenamingRunnable(this, threadName));
} else { } else {
synchronized (selectorGuard) { selectorGuard.readLock().lock();
try {
if (wakenUp.compareAndSet(false, true)) { if (wakenUp.compareAndSet(false, true)) {
selector.wakeup(); selector.wakeup();
} }
@ -132,6 +136,8 @@ class NioWorker implements Runnable {
fireChannelOpen(channel); fireChannelOpen(channel);
fireChannelBound(channel, channel.getLocalAddress()); fireChannelBound(channel, channel.getLocalAddress());
fireChannelConnected(channel, channel.getRemoteAddress()); fireChannelConnected(channel, channel.getRemoteAddress());
} finally {
selectorGuard.readLock().unlock();
} }
} }
} }
@ -143,10 +149,12 @@ class NioWorker implements Runnable {
Selector selector = this.selector; Selector selector = this.selector;
for (;;) { for (;;) {
wakenUp.set(false); wakenUp.set(false);
synchronized (selectorGuard) {
selectorGuard.writeLock().lock();
// This empty synchronization block prevents the selector // This empty synchronization block prevents the selector
// from acquiring its lock. // from acquiring its lock.
} selectorGuard.writeLock().unlock();
try { try {
int selectedKeyCount = selector.select(500); int selectedKeyCount = selector.select(500);
if (selectedKeyCount > 0) { if (selectedKeyCount > 0) {
@ -161,7 +169,9 @@ class NioWorker implements Runnable {
if (selector.keys().isEmpty()) { if (selector.keys().isEmpty()) {
if (shutdown || if (shutdown ||
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
synchronized (selectorGuard) {
selectorGuard.writeLock().lock();
try {
if (selector.keys().isEmpty()) { if (selector.keys().isEmpty()) {
try { try {
selector.close(); selector.close();
@ -176,6 +186,8 @@ class NioWorker implements Runnable {
} else { } else {
shutdown = false; shutdown = false;
} }
} finally {
selectorGuard.writeLock().unlock();
} }
} else { } else {
// Give one more second. // Give one more second.
@ -272,69 +284,149 @@ class NioWorker implements Runnable {
} }
static void write(NioSocketChannel channel, boolean mightNeedWakeup) { static void write(NioSocketChannel channel, boolean mightNeedWakeup) {
if (channel.writeBuffer.isEmpty() && channel.currentWriteEvent == null) { final NioSocketChannelConfig cfg = channel.getConfig();
return; final int writeSpinCount = cfg.getWriteSpinCount();
}
boolean addOpWrite = false;
boolean removeOpWrite = false;
final int maxWrittenBytes; final int maxWrittenBytes;
if (channel.getConfig().isReadWriteFair()) { if (cfg.isReadWriteFair()) {
// Set limitation for the number of written bytes for read-write // Set limitation for the number of written bytes for read-write
// fairness. I used maxReadBufferSize * 3 / 2, which yields best // fairness. I used maxReadBufferSize * 3 / 2, which yields best
// performance in my experience while not breaking fairness much. // performance in my experience while not breaking fairness much.
int previousReceiveBufferSize = int previousReceiveBufferSize =
channel.getConfig().getReceiveBufferSizePredictor().nextReceiveBufferSize(); cfg.getReceiveBufferSizePredictor().nextReceiveBufferSize();
maxWrittenBytes = previousReceiveBufferSize + previousReceiveBufferSize >>> 1; maxWrittenBytes = previousReceiveBufferSize + previousReceiveBufferSize >>> 1;
writeFair(channel, mightNeedWakeup, writeSpinCount, maxWrittenBytes);
} else { } else {
maxWrittenBytes = Integer.MAX_VALUE; writeUnfair(channel, mightNeedWakeup, writeSpinCount);
} }
int writtenBytes = 0; }
private static void writeUnfair(NioSocketChannel channel,
boolean mightNeedWakeup, final int writeSpinCount) {
boolean addOpWrite = false;
boolean removeOpWrite = false;
MessageEvent evt;
ChannelBuffer buf;
int bufIdx;
synchronized (channel.writeBuffer) { synchronized (channel.writeBuffer) {
evt = channel.currentWriteEvent;
for (;;) { for (;;) {
if (channel.currentWriteEvent == null && channel.writeBuffer.isEmpty()) { if (evt == null) {
removeOpWrite = true; evt = channel.writeBuffer.poll();
break; if (evt == null) {
} channel.currentWriteEvent = null;
removeOpWrite = true;
ChannelBuffer a; break;
if (channel.currentWriteEvent == null) { }
channel.currentWriteEvent = channel.writeBuffer.poll(); buf = (ChannelBuffer) evt.getMessage();
a = (ChannelBuffer) channel.currentWriteEvent.getMessage(); bufIdx = buf.readerIndex();
channel.currentWriteIndex = a.readerIndex();
} else { } else {
a = (ChannelBuffer) channel.currentWriteEvent.getMessage(); buf = (ChannelBuffer) evt.getMessage();
bufIdx = channel.currentWriteIndex;
} }
int localWrittenBytes = 0;
try { try {
for (int i = channel.getConfig().getWriteSpinCount(); i > 0; i --) { for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = a.getBytes( int localWrittenBytes = buf.getBytes(
channel.currentWriteIndex, bufIdx,
channel.socket, channel.socket,
Math.min(maxWrittenBytes - writtenBytes, a.writerIndex() - channel.currentWriteIndex)); buf.writerIndex() - bufIdx);
if (localWrittenBytes != 0) { if (localWrittenBytes != 0) {
bufIdx += localWrittenBytes;
break; break;
} }
} }
if (bufIdx == buf.writerIndex()) {
// Successful write - proceed to the next message.
evt.getFuture().setSuccess();
evt = null;
} else {
// Not written fully - perhaps the kernel buffer is full.
channel.currentWriteEvent = evt;
channel.currentWriteIndex = bufIdx;
addOpWrite = true;
break;
}
} catch (Throwable t) { } catch (Throwable t) {
channel.currentWriteEvent.getFuture().setFailure(t); evt.getFuture().setFailure(t);
evt = null;
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
} }
}
}
writtenBytes += localWrittenBytes; if (addOpWrite) {
channel.currentWriteIndex += localWrittenBytes; setOpWrite(channel, true, mightNeedWakeup);
if (channel.currentWriteIndex == a.writerIndex()) { } else if (removeOpWrite) {
// Successful write - proceed to the next message. setOpWrite(channel, false, mightNeedWakeup);
channel.currentWriteEvent.getFuture().setSuccess(); }
channel.currentWriteEvent = null; }
private static void writeFair(NioSocketChannel channel,
boolean mightNeedWakeup, final int writeSpinCount,
final int maxWrittenBytes) {
boolean addOpWrite = false;
boolean removeOpWrite = false;
int writtenBytes = 0;
MessageEvent evt;
ChannelBuffer buf;
int bufIdx;
synchronized (channel.writeBuffer) {
evt = channel.currentWriteEvent;
for (;;) {
if (evt == null) {
evt = channel.writeBuffer.poll();
if (evt == null) {
channel.currentWriteEvent = null;
removeOpWrite = true;
break;
}
buf = (ChannelBuffer) evt.getMessage();
bufIdx = buf.readerIndex();
} else { } else {
// Not written fully - perhaps the kernel buffer is full. buf = (ChannelBuffer) evt.getMessage();
addOpWrite = true; bufIdx = channel.currentWriteIndex;
break; }
try {
for (int i = writeSpinCount; i > 0; i --) {
int localWrittenBytes = buf.getBytes(
bufIdx,
channel.socket,
Math.min(
maxWrittenBytes - writtenBytes,
buf.writerIndex() - bufIdx));
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
bufIdx += localWrittenBytes;
break;
}
}
if (bufIdx == buf.writerIndex()) {
// Successful write - proceed to the next message.
evt.getFuture().setSuccess();
evt = null;
} else {
// Not written fully - perhaps the kernel buffer is full.
channel.currentWriteEvent = evt;
channel.currentWriteIndex = bufIdx;
addOpWrite = true;
break;
}
} catch (Throwable t) {
evt.getFuture().setFailure(t);
evt = null;
fireExceptionCaught(channel, t);
} }
} }
} }
@ -398,13 +490,16 @@ class NioWorker implements Runnable {
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} else { } else {
synchronized (worker.selectorGuard) { worker.selectorGuard.readLock().lock();
try {
if (worker.wakenUp.compareAndSet(false, true)) { if (worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup(); selector.wakeup();
} }
interestOps |= SelectionKey.OP_WRITE; interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} finally {
worker.selectorGuard.readLock().unlock();
} }
} }
} }
@ -444,13 +539,16 @@ class NioWorker implements Runnable {
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} else { } else {
synchronized (worker.selectorGuard) { worker.selectorGuard.readLock().lock();
try {
if (worker.wakenUp.compareAndSet(false, true)) { if (worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup(); selector.wakeup();
} }
interestOps &= ~SelectionKey.OP_WRITE; interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} finally {
worker.selectorGuard.readLock().unlock();
} }
} }
} }
@ -541,12 +639,15 @@ class NioWorker implements Runnable {
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} else { } else {
synchronized (worker.selectorGuard) { worker.selectorGuard.readLock().lock();
try {
if (worker.wakenUp.compareAndSet(false, true)) { if (worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup(); selector.wakeup();
} }
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} finally {
worker.selectorGuard.readLock().unlock();
} }
} }
} }