From 243264efb0adb7e1b4924c160820843d9c188376 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sun, 31 Aug 2008 02:59:54 +0000 Subject: [PATCH] NioWorker optimization * Use of read write lock * Split write into two versions (fair and unfair) --- .../netty/channel/socket/nio/NioWorker.java | 191 +++++++++++++----- 1 file changed, 146 insertions(+), 45 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 866f903f73..19c33e4a05 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -35,12 +35,15 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; @@ -59,7 +62,7 @@ class NioWorker implements Runnable { volatile Thread thread; volatile Selector selector; final AtomicBoolean wakenUp = new AtomicBoolean(); - final Object selectorGuard = new Object(); + final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); NioWorker(int bossId, int id, Executor executor) { this.bossId = bossId; @@ -113,7 +116,8 @@ class NioWorker implements Runnable { executor.execute(new ThreadRenamingRunnable(this, threadName)); } else { - synchronized (selectorGuard) { + selectorGuard.readLock().lock(); + try { if (wakenUp.compareAndSet(false, true)) { selector.wakeup(); } @@ -132,6 +136,8 @@ class NioWorker implements Runnable { fireChannelOpen(channel); fireChannelBound(channel, channel.getLocalAddress()); fireChannelConnected(channel, channel.getRemoteAddress()); + } finally { + selectorGuard.readLock().unlock(); } } } @@ -143,10 +149,12 @@ class NioWorker implements Runnable { Selector selector = this.selector; for (;;) { wakenUp.set(false); - synchronized (selectorGuard) { + + selectorGuard.writeLock().lock(); // This empty synchronization block prevents the selector // from acquiring its lock. - } + selectorGuard.writeLock().unlock(); + try { int selectedKeyCount = selector.select(500); if (selectedKeyCount > 0) { @@ -161,7 +169,9 @@ class NioWorker implements Runnable { if (selector.keys().isEmpty()) { if (shutdown || executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { - synchronized (selectorGuard) { + + selectorGuard.writeLock().lock(); + try { if (selector.keys().isEmpty()) { try { selector.close(); @@ -176,6 +186,8 @@ class NioWorker implements Runnable { } else { shutdown = false; } + } finally { + selectorGuard.writeLock().unlock(); } } else { // Give one more second. @@ -272,69 +284,149 @@ class NioWorker implements Runnable { } static void write(NioSocketChannel channel, boolean mightNeedWakeup) { - if (channel.writeBuffer.isEmpty() && channel.currentWriteEvent == null) { - return; - } - - boolean addOpWrite = false; - boolean removeOpWrite = false; - + final NioSocketChannelConfig cfg = channel.getConfig(); + final int writeSpinCount = cfg.getWriteSpinCount(); final int maxWrittenBytes; - if (channel.getConfig().isReadWriteFair()) { + if (cfg.isReadWriteFair()) { // Set limitation for the number of written bytes for read-write // fairness. I used maxReadBufferSize * 3 / 2, which yields best // performance in my experience while not breaking fairness much. int previousReceiveBufferSize = - channel.getConfig().getReceiveBufferSizePredictor().nextReceiveBufferSize(); + cfg.getReceiveBufferSizePredictor().nextReceiveBufferSize(); maxWrittenBytes = previousReceiveBufferSize + previousReceiveBufferSize >>> 1; + writeFair(channel, mightNeedWakeup, writeSpinCount, maxWrittenBytes); } else { - maxWrittenBytes = Integer.MAX_VALUE; + writeUnfair(channel, mightNeedWakeup, writeSpinCount); } - int writtenBytes = 0; + } + + private static void writeUnfair(NioSocketChannel channel, + boolean mightNeedWakeup, final int writeSpinCount) { + + boolean addOpWrite = false; + boolean removeOpWrite = false; + + MessageEvent evt; + ChannelBuffer buf; + int bufIdx; synchronized (channel.writeBuffer) { + evt = channel.currentWriteEvent; for (;;) { - if (channel.currentWriteEvent == null && channel.writeBuffer.isEmpty()) { - removeOpWrite = true; - break; - } - - ChannelBuffer a; - if (channel.currentWriteEvent == null) { - channel.currentWriteEvent = channel.writeBuffer.poll(); - a = (ChannelBuffer) channel.currentWriteEvent.getMessage(); - channel.currentWriteIndex = a.readerIndex(); + if (evt == null) { + evt = channel.writeBuffer.poll(); + if (evt == null) { + channel.currentWriteEvent = null; + removeOpWrite = true; + break; + } + buf = (ChannelBuffer) evt.getMessage(); + bufIdx = buf.readerIndex(); } else { - a = (ChannelBuffer) channel.currentWriteEvent.getMessage(); + buf = (ChannelBuffer) evt.getMessage(); + bufIdx = channel.currentWriteIndex; } - int localWrittenBytes = 0; try { - for (int i = channel.getConfig().getWriteSpinCount(); i > 0; i --) { - localWrittenBytes = a.getBytes( - channel.currentWriteIndex, + for (int i = writeSpinCount; i > 0; i --) { + int localWrittenBytes = buf.getBytes( + bufIdx, channel.socket, - Math.min(maxWrittenBytes - writtenBytes, a.writerIndex() - channel.currentWriteIndex)); + buf.writerIndex() - bufIdx); + if (localWrittenBytes != 0) { + bufIdx += localWrittenBytes; break; } } + + if (bufIdx == buf.writerIndex()) { + // Successful write - proceed to the next message. + evt.getFuture().setSuccess(); + evt = null; + } else { + // Not written fully - perhaps the kernel buffer is full. + channel.currentWriteEvent = evt; + channel.currentWriteIndex = bufIdx; + addOpWrite = true; + break; + } } catch (Throwable t) { - channel.currentWriteEvent.getFuture().setFailure(t); + evt.getFuture().setFailure(t); + evt = null; fireExceptionCaught(channel, t); } + } + } - writtenBytes += localWrittenBytes; - channel.currentWriteIndex += localWrittenBytes; - if (channel.currentWriteIndex == a.writerIndex()) { - // Successful write - proceed to the next message. - channel.currentWriteEvent.getFuture().setSuccess(); - channel.currentWriteEvent = null; + if (addOpWrite) { + setOpWrite(channel, true, mightNeedWakeup); + } else if (removeOpWrite) { + setOpWrite(channel, false, mightNeedWakeup); + } + } + + private static void writeFair(NioSocketChannel channel, + boolean mightNeedWakeup, final int writeSpinCount, + final int maxWrittenBytes) { + + boolean addOpWrite = false; + boolean removeOpWrite = false; + + int writtenBytes = 0; + MessageEvent evt; + ChannelBuffer buf; + int bufIdx; + + synchronized (channel.writeBuffer) { + evt = channel.currentWriteEvent; + for (;;) { + if (evt == null) { + evt = channel.writeBuffer.poll(); + if (evt == null) { + channel.currentWriteEvent = null; + removeOpWrite = true; + break; + } + buf = (ChannelBuffer) evt.getMessage(); + bufIdx = buf.readerIndex(); } else { - // Not written fully - perhaps the kernel buffer is full. - addOpWrite = true; - break; + buf = (ChannelBuffer) evt.getMessage(); + bufIdx = channel.currentWriteIndex; + } + + try { + for (int i = writeSpinCount; i > 0; i --) { + int localWrittenBytes = buf.getBytes( + bufIdx, + channel.socket, + Math.min( + maxWrittenBytes - writtenBytes, + buf.writerIndex() - bufIdx)); + + if (localWrittenBytes != 0) { + writtenBytes += localWrittenBytes; + bufIdx += localWrittenBytes; + break; + } + } + + if (bufIdx == buf.writerIndex()) { + // Successful write - proceed to the next message. + evt.getFuture().setSuccess(); + evt = null; + } else { + // Not written fully - perhaps the kernel buffer is full. + channel.currentWriteEvent = evt; + channel.currentWriteIndex = bufIdx; + addOpWrite = true; + break; + } + } catch (Throwable t) { + evt.getFuture().setFailure(t); + evt = null; + fireExceptionCaught(channel, t); } } } @@ -398,13 +490,16 @@ class NioWorker implements Runnable { key.interestOps(interestOps); changed = true; } else { - synchronized (worker.selectorGuard) { + worker.selectorGuard.readLock().lock(); + try { if (worker.wakenUp.compareAndSet(false, true)) { selector.wakeup(); } interestOps |= SelectionKey.OP_WRITE; key.interestOps(interestOps); changed = true; + } finally { + worker.selectorGuard.readLock().unlock(); } } } @@ -444,13 +539,16 @@ class NioWorker implements Runnable { key.interestOps(interestOps); changed = true; } else { - synchronized (worker.selectorGuard) { + worker.selectorGuard.readLock().lock(); + try { if (worker.wakenUp.compareAndSet(false, true)) { selector.wakeup(); } interestOps &= ~SelectionKey.OP_WRITE; key.interestOps(interestOps); changed = true; + } finally { + worker.selectorGuard.readLock().unlock(); } } } @@ -541,12 +639,15 @@ class NioWorker implements Runnable { key.interestOps(interestOps); changed = true; } else { - synchronized (worker.selectorGuard) { + worker.selectorGuard.readLock().lock(); + try { if (worker.wakenUp.compareAndSet(false, true)) { selector.wakeup(); } key.interestOps(interestOps); changed = true; + } finally { + worker.selectorGuard.readLock().unlock(); } } }