diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java index 9fef92fe90..cfd6e83949 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java @@ -29,26 +29,21 @@ final class DirectBufferPool { private static final int POOL_SIZE = 4; - private static final ThreadLocal[]> pool = - new ThreadLocal[]>() { - @Override - @SuppressWarnings("unchecked") - protected SoftReference[] initialValue() { - return new SoftReference[POOL_SIZE]; - } - - }; + @SuppressWarnings("unchecked") + private final SoftReference[] pool = new SoftReference[POOL_SIZE]; + DirectBufferPool() { + super(); + } - static final ByteBuffer acquire(ChannelBuffer src) { + final ByteBuffer acquire(ChannelBuffer src) { ByteBuffer dst = acquire(src.readableBytes()); src.getBytes(src.readerIndex(), dst); dst.rewind(); return dst; } - static final ByteBuffer acquire(int size) { - final SoftReference[] pool = DirectBufferPool.pool.get(); + final ByteBuffer acquire(int size) { for (int i = 0; i < POOL_SIZE; i ++) { SoftReference ref = pool[i]; if (ref == null) { @@ -77,8 +72,7 @@ final class DirectBufferPool { return buf; } - static final void release(ByteBuffer buffer) { - final SoftReference[] pool = DirectBufferPool.pool.get(); + final void release(ByteBuffer buffer) { for (int i = 0; i < POOL_SIZE; i ++) { SoftReference ref = pool[i]; if (ref == null || ref.get() == null) { @@ -110,8 +104,4 @@ final class DirectBufferPool { // but it becomes 8192 to keep the calculation simplistic. return (capacity & 0xfffff000) + 0x1000; } - - private DirectBufferPool() { - super(); - } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 6828c11e0b..40866a70f8 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -114,7 +114,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { NioSocketChannel channel = (NioSocketChannel) event.getChannel(); boolean offered = channel.writeBuffer.offer(event); assert offered; - channel.worker.write(channel); + channel.worker.write(channel, true); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java index 29b09f3fcf..933c137155 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java @@ -39,7 +39,6 @@ import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.socket.DatagramChannelConfig; import org.jboss.netty.util.internal.LinkedTransferQueue; -import org.jboss.netty.util.internal.NonReentrantLock; import org.jboss.netty.util.internal.ThreadLocalBoolean; /** @@ -75,9 +74,9 @@ class NioDatagramChannel extends AbstractChannel final Object interestOpsLock = new Object(); /** - * Synchronizes access to the {@link WriteBufferQueue}. + * Monitor object for synchronizing access to the {@link WriteBufferQueue}. */ - final NonReentrantLock writeLock = new NonReentrantLock(); + final Object writeLock = new Object(); /** * WriteTask that performs write operations. @@ -112,6 +111,11 @@ class NioDatagramChannel extends AbstractChannel ByteBuffer currentWriteBuffer; boolean currentWriteBufferIsPooled; + /** + * Boolean that indicates that write operation is in progress. + */ + volatile boolean inWriteNowLoop; + private volatile InetSocketAddress localAddress; volatile InetSocketAddress remoteAddress; @@ -312,7 +316,7 @@ class NioDatagramChannel extends AbstractChannel public void run() { writeTaskInTaskQueue.set(false); - worker.write(NioDatagramChannel.this); + worker.write(NioDatagramChannel.this, false); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java index 19a38261bc..55fa3bfbe5 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -108,7 +108,7 @@ class NioDatagramPipelineSink extends AbstractChannelSink { final MessageEvent event = (MessageEvent) e; final boolean offered = channel.writeBufferQueue.offer(event); assert offered; - channel.worker.write(channel); + channel.worker.write(channel, true); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java index 4043570123..cb72c35c35 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java @@ -47,7 +47,6 @@ import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.internal.LinkedTransferQueue; -import org.jboss.netty.util.internal.NonReentrantLock; /** * A class responsible for registering channels with {@link Selector}. @@ -128,6 +127,8 @@ class NioDatagramWorker implements Runnable { private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation + private final DirectBufferPool directBufferPool = new DirectBufferPool(); + /** * Sole constructor. * @@ -349,7 +350,7 @@ class NioDatagramWorker implements Runnable { } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { - write((NioDatagramChannel) k.attachment()); + write(k); } } catch (CancelledKeyException e) { close(k); @@ -370,6 +371,10 @@ class NioDatagramWorker implements Runnable { return false; } + private void write(SelectionKey k) { + write((NioDatagramChannel) k.attachment(), false); + } + /** * Read is called when a Selector has been notified that the underlying channel * was something to be read. The channel would previously have registered its interest @@ -379,9 +384,9 @@ class NioDatagramWorker implements Runnable { */ private boolean read(final SelectionKey key) { final NioDatagramChannel channel = (NioDatagramChannel) key.attachment(); - final NioDatagramChannelConfig cfg = channel.getConfig(); - final ReceiveBufferSizePredictor predictor = cfg.getReceiveBufferSizePredictor(); - final ChannelBufferFactory bufferFactory = cfg.getBufferFactory(); + ReceiveBufferSizePredictor predictor = + channel.getConfig().getReceiveBufferSizePredictor(); + final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory(); final DatagramChannel nioChannel = (DatagramChannel) key.channel(); // Allocating a non-direct buffer with a max udp packge size. @@ -432,7 +437,8 @@ class NioDatagramWorker implements Runnable { close(ch, succeededFuture(ch)); } - void write(final NioDatagramChannel channel) { + void write(final NioDatagramChannel channel, + final boolean mightNeedWakeup) { /* * Note that we are not checking if the channel is connected. Connected * has a different meaning in UDP and means that the channels socket is @@ -443,25 +449,51 @@ class NioDatagramWorker implements Runnable { return; } - if (channel.writeTaskInTaskQueue.get() && Thread.currentThread() != thread) { - rescheduleWrite(channel); + if (mightNeedWakeup && scheduleWriteIfNecessary(channel)) { return; } - final NonReentrantLock writeLock = channel.writeLock; - if (!writeLock.tryLock()) { - rescheduleWrite(channel); - return; + if (channel.inWriteNowLoop) { + scheduleWriteIfNecessary(channel); + } else { + writeNow(channel, channel.getConfig().getWriteSpinCount()); + } + } + + private boolean scheduleWriteIfNecessary(final NioDatagramChannel channel) { + final Thread workerThread = thread; + if (workerThread == null || Thread.currentThread() != workerThread) { + if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { + // "add" the channels writeTask to the writeTaskQueue. + boolean offered = writeTaskQueue.offer(channel.writeTask); + assert offered; + } + + final Selector selector = this.selector; + if (selector != null) { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } + return true; } - final Queue writeBuffer = channel.writeBufferQueue; - final int writeSpinCount = channel.getConfig().getWriteSpinCount(); + return false; + } + + private void writeNow(final NioDatagramChannel channel, + final int writeSpinCount) { boolean addOpWrite = false; boolean removeOpWrite = false; + int writtenBytes = 0; - try { + Queue writeBuffer = channel.writeBufferQueue; + synchronized (channel.writeLock) { + // inform the channel that write is in-progress + channel.inWriteNowLoop = true; + // loop forever... for (;;) { MessageEvent evt = channel.currentWriteEvent; @@ -477,7 +509,7 @@ class NioDatagramWorker implements Runnable { channel.currentWriteBuffer = buf = origBuf.toByteBuffer(); channel.currentWriteBufferIsPooled = false; } else { - channel.currentWriteBuffer = buf = DirectBufferPool.acquire(origBuf); + channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf); channel.currentWriteBufferIsPooled = true; } } else { @@ -509,7 +541,7 @@ class NioDatagramWorker implements Runnable { if (localWrittenBytes > 0) { // Successful write - proceed to the next message. if (channel.currentWriteBufferIsPooled) { - DirectBufferPool.release(buf); + directBufferPool.release(buf); } ChannelFuture future = evt.getFuture(); @@ -527,7 +559,7 @@ class NioDatagramWorker implements Runnable { // Doesn't need a user attention - ignore. } catch (final Throwable t) { if (channel.currentWriteBufferIsPooled) { - DirectBufferPool.release(buf); + directBufferPool.release(buf); } ChannelFuture future = evt.getFuture(); channel.currentWriteEvent = null; @@ -538,46 +570,74 @@ class NioDatagramWorker implements Runnable { fireExceptionCaught(channel, t); } } - } finally { - writeLock.unlock(); + channel.inWriteNowLoop = false; } fireWriteComplete(channel, writtenBytes); - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. if (addOpWrite) { - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - interestOps |= SelectionKey.OP_WRITE; - setInterestOps0(channel, interestOps); - } - } + setOpWrite(channel); } else if (removeOpWrite) { - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - interestOps &= ~SelectionKey.OP_WRITE; - setInterestOps0(channel, interestOps); - } - } + clearOpWrite(channel); } } - private void rescheduleWrite(final NioDatagramChannel channel) { - if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { - // "add" the channels writeTask to the writeTaskQueue. - boolean offered = writeTaskQueue.offer(channel.writeTask); - assert offered; + private void setOpWrite(final NioDatagramChannel channel) { + Selector selector = this.selector; + SelectionKey key = channel.getDatagramChannel().keyFor(selector); + if (key == null) { + return; + } + if (!key.isValid()) { + close(key); + return; + } + int interestOps; + boolean changed = false; + + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. + synchronized (channel.interestOpsLock) { + interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + interestOps |= SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; + } } - final Selector selector = this.selector; - if (selector != null) { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); + if (changed) { + channel.setRawInterestOpsNow(interestOps); + } + } + + private void clearOpWrite(NioDatagramChannel channel) { + Selector selector = this.selector; + SelectionKey key = channel.getDatagramChannel().keyFor(selector); + if (key == null) { + return; + } + if (!key.isValid()) { + close(key); + return; + } + int interestOps; + boolean changed = false; + + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. + synchronized (channel.interestOpsLock) { + interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + interestOps &= ~SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; } } + + if (changed) { + channel.setRawInterestOpsNow(interestOps); + } } static void disconnect(NioDatagramChannel channel, ChannelFuture future) { @@ -627,8 +687,7 @@ class NioDatagramWorker implements Runnable { boolean fireExceptionCaught = false; // Clean up the stale messages in the write buffer. - channel.writeLock.lock(); - try { + synchronized (channel.writeLock) { MessageEvent evt = channel.currentWriteEvent; ByteBuffer buf = channel.currentWriteBuffer; if (evt != null) { @@ -640,7 +699,7 @@ class NioDatagramWorker implements Runnable { cause = new ClosedChannelException(); } if (channel.currentWriteBufferIsPooled) { - DirectBufferPool.release(buf); + directBufferPool.release(buf); } ChannelFuture future = evt.getFuture(); @@ -673,8 +732,6 @@ class NioDatagramWorker implements Runnable { fireExceptionCaught = true; } } - } finally { - channel.writeLock.unlock(); } if (fireExceptionCaught) { @@ -685,20 +742,72 @@ class NioDatagramWorker implements Runnable { void setInterestOps(final NioDatagramChannel channel, ChannelFuture future, int interestOps) { - // Override OP_WRITE flag - a user cannot change this flag. - interestOps &= ~Channel.OP_WRITE; - interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; - + boolean changed = false; try { // interestOps can change at any time and by any thread. // Acquire a lock to avoid possible race condition. - final boolean changed; synchronized (channel.interestOpsLock) { - changed = setInterestOps0(channel, interestOps); + final Selector selector = this.selector; + final SelectionKey key = channel.getDatagramChannel().keyFor(selector); + + if (key == null || selector == null) { + // Not registered to the worker yet. + // Set the rawInterestOps immediately; RegisterTask will pick it up. + channel.setRawInterestOpsNow(interestOps); + return; + } + + // Override OP_WRITE flag - a user cannot change this flag. + interestOps &= ~Channel.OP_WRITE; + interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; + + switch (NioProviderMetadata.CONSTRAINT_LEVEL) { + case 0: + if (channel.getRawInterestOps() != interestOps) { + // Set the interesteOps on the SelectionKey + key.interestOps(interestOps); + // If the worker thread (the one that that might possibly be blocked + // in a select() call) is not the thread executing this method wakeup + // the select() operation. + if (Thread.currentThread() != thread && + wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + changed = true; + } + break; + case 1: + case 2: + if (channel.getRawInterestOps() != interestOps) { + if (Thread.currentThread() == thread) { + // Going to set the interestOps from the same thread. + // Set the interesteOps on the SelectionKey + key.interestOps(interestOps); + changed = true; + } else { + // Going to set the interestOps from a different thread + // and some old provides will need synchronization. + selectorGuard.readLock().lock(); + try { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + key.interestOps(interestOps); + changed = true; + } finally { + selectorGuard.readLock().unlock(); + } + } + } + break; + default: + throw new Error(); + } } future.setSuccess(); if (changed) { + channel.setRawInterestOpsNow(interestOps); fireChannelInterestChanged(channel); } } catch (final CancelledKeyException e) { @@ -712,62 +821,6 @@ class NioDatagramWorker implements Runnable { } } - private boolean setInterestOps0(NioDatagramChannel channel, int interestOps) { - final Selector selector = this.selector; - final SelectionKey key = channel.getDatagramChannel().keyFor(selector); - - if (key == null || selector == null) { - // Not registered to the worker yet. - // Set the rawInterestOps immediately; RegisterTask will pick it up. - channel.setRawInterestOpsNow(interestOps); - return false; - } - - switch (NioProviderMetadata.CONSTRAINT_LEVEL) { - case 0: - if (channel.getRawInterestOps() != interestOps) { - // Set the interesteOps on the SelectionKey - key.interestOps(interestOps); - // If the worker thread (the one that that might possibly be blocked - // in a select() call) is not the thread executing this method wakeup - // the select() operation. - if (Thread.currentThread() != thread && - wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - return true; - } - break; - case 1: - case 2: - if (channel.getRawInterestOps() != interestOps) { - if (Thread.currentThread() == thread) { - // Going to set the interestOps from the same thread. - // Set the interesteOps on the SelectionKey - key.interestOps(interestOps); - return true; - } else { - // Going to set the interestOps from a different thread - // and some old provides will need synchronization. - selectorGuard.readLock().lock(); - try { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - key.interestOps(interestOps); - return true; - } finally { - selectorGuard.readLock().unlock(); - } - } - } - break; - default: - throw new Error(); - } - return false; - } - /** * RegisterTask is a task responsible for registering a channel with a * selector. diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 14e19aba6d..bcd8143b6f 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -134,7 +134,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { NioSocketChannel channel = (NioSocketChannel) event.getChannel(); boolean offered = channel.writeBuffer.offer(event); assert offered; - channel.worker.write(channel); + channel.worker.write(channel, true); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java index 5f4bc32687..8d0e4791c7 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java @@ -34,7 +34,6 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.util.internal.LinkedTransferQueue; -import org.jboss.netty.util.internal.NonReentrantLock; import org.jboss.netty.util.internal.ThreadLocalBoolean; /** @@ -60,7 +59,7 @@ class NioSocketChannel extends AbstractChannel private volatile InetSocketAddress remoteAddress; final Object interestOpsLock = new Object(); - final NonReentrantLock writeLock = new NonReentrantLock(); + final Object writeLock = new Object(); final Runnable writeTask = new WriteTask(); final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); @@ -68,6 +67,7 @@ class NioSocketChannel extends AbstractChannel final Queue writeBuffer = new WriteBuffer(); final AtomicInteger writeBufferSize = new AtomicInteger(); final AtomicInteger highWaterMarkCounter = new AtomicInteger(); + volatile boolean inWriteNowLoop; MessageEvent currentWriteEvent; ByteBuffer currentWriteBuffer; @@ -257,7 +257,7 @@ class NioSocketChannel extends AbstractChannel public void run() { writeTaskInTaskQueue.set(false); - worker.write(NioSocketChannel.this); + worker.write(NioSocketChannel.this, false); } } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 3357e22683..4faa6c1387 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -24,6 +24,7 @@ import java.nio.channels.AsynchronousCloseException; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.NotYetConnectedException; +import java.nio.channels.ScatteringByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; @@ -47,7 +48,6 @@ import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.LinkedTransferQueue; -import org.jboss.netty.util.internal.NonReentrantLock; /** * @@ -78,6 +78,7 @@ class NioWorker implements Runnable { private final Queue registerTaskQueue = new LinkedTransferQueue(); private final Queue writeTaskQueue = new LinkedTransferQueue(); private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation + private final DirectBufferPool directBufferPool = new DirectBufferPool(); NioWorker(int bossId, int id, Executor executor) { this.bossId = bossId; @@ -280,7 +281,7 @@ class NioWorker implements Runnable { } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { - write((NioSocketChannel) k.attachment()); + write(k); } } catch (CancelledKeyException e) { close(k); @@ -302,19 +303,21 @@ class NioWorker implements Runnable { } private boolean read(SelectionKey k) { - final java.nio.channels.SocketChannel nioch = - (java.nio.channels.SocketChannel) k.channel(); - final NioSocketChannel channel = (NioSocketChannel) k.attachment(); + ScatteringByteChannel ch = (ScatteringByteChannel) k.channel(); + NioSocketChannel channel = (NioSocketChannel) k.attachment(); - final NioSocketChannelConfig cfg = channel.getConfig(); - final ReceiveBufferSizePredictor predictor = cfg.getReceiveBufferSizePredictor(); - final ChannelBufferFactory bufferFactory = cfg.getBufferFactory(); + ReceiveBufferSizePredictor predictor = + channel.getConfig().getReceiveBufferSizePredictor(); + ChannelBufferFactory bufferFactory = + channel.getConfig().getBufferFactory(); + + ChannelBuffer buffer = + bufferFactory.getBuffer(predictor.nextReceiveBufferSize()); - final ChannelBuffer buffer = bufferFactory.getBuffer(predictor.nextReceiveBufferSize()); final ByteBuffer directBuffer; final boolean fromPool = !buffer.isDirect(); if (fromPool) { - directBuffer = DirectBufferPool.acquire(buffer.writableBytes()); + directBuffer = directBufferPool.acquire(buffer.writableBytes()); } else { directBuffer = buffer.toByteBuffer(); } @@ -323,7 +326,7 @@ class NioWorker implements Runnable { int readBytes = 0; boolean failure = true; try { - while ((ret = nioch.read(directBuffer)) > 0) { + while ((ret = ch.read(directBuffer)) > 0) { readBytes += ret; if (!directBuffer.hasRemaining()) { break; @@ -338,7 +341,7 @@ class NioWorker implements Runnable { if (fromPool) { directBuffer.flip(); buffer.writeBytes(directBuffer); - DirectBufferPool.release(directBuffer); + directBufferPool.release(directBuffer); } else { // no need to copy: directBuffer is just a view to buffer. buffer.writerIndex(buffer.writerIndex() + readBytes); @@ -361,37 +364,79 @@ class NioWorker implements Runnable { return true; } + private void write(SelectionKey k) { + NioSocketChannel ch = (NioSocketChannel) k.attachment(); + write(ch, false); + } + private void close(SelectionKey k) { NioSocketChannel ch = (NioSocketChannel) k.attachment(); close(ch, succeededFuture(ch)); } - void write(final NioSocketChannel channel) { + void write(final NioSocketChannel channel, boolean mightNeedWakeup) { if (!channel.isConnected()) { cleanUpWriteBuffer(channel); return; } - if (channel.writeTaskInTaskQueue.get() && Thread.currentThread() != thread) { - rescheduleWrite(channel); + if (mightNeedWakeup && scheduleWriteIfNecessary(channel)) { return; } - final NonReentrantLock writeLock = channel.writeLock; - if (!writeLock.tryLock()) { - rescheduleWrite(channel); - return; + if (channel.inWriteNowLoop) { + scheduleWriteIfNecessary(channel); + } else { + writeNow(channel, channel.getConfig().getWriteSpinCount()); + } + } + + private boolean scheduleWriteIfNecessary(final NioSocketChannel channel) { + final Thread currentThread = Thread.currentThread(); + final Thread workerThread = thread; + if (currentThread != workerThread) { + if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { + boolean offered = writeTaskQueue.offer(channel.writeTask); + assert offered; + } + + if (!(channel instanceof NioAcceptedSocketChannel) || + ((NioAcceptedSocketChannel) channel).bossThread != currentThread) { + final Selector workerSelector = selector; + if (workerSelector != null) { + if (wakenUp.compareAndSet(false, true)) { + workerSelector.wakeup(); + } + } + } else { + // A write request can be made from an acceptor thread (boss) + // when a user attempted to write something in: + // + // * channelOpen() + // * channelBound() + // * channelConnected(). + // + // In this case, there's no need to wake up the selector because + // the channel is not even registered yet at this moment. + } + + return true; } - final Queue writeBuffer = channel.writeBuffer; - final int writeSpinCount = channel.getConfig().getWriteSpinCount(); + return false; + } + + private void writeNow(NioSocketChannel channel, int writeSpinCount) { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; + int writtenBytes = 0; - try { + Queue writeBuffer = channel.writeBuffer; + synchronized (channel.writeLock) { + channel.inWriteNowLoop = true; for (;;) { MessageEvent evt = channel.currentWriteEvent; ByteBuffer buf; @@ -406,7 +451,7 @@ class NioWorker implements Runnable { channel.currentWriteBuffer = buf = origBuf.toByteBuffer(); channel.currentWriteBufferIsPooled = false; } else { - channel.currentWriteBuffer = buf = DirectBufferPool.acquire(origBuf); + channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf); channel.currentWriteBufferIsPooled = true; } } else { @@ -425,7 +470,7 @@ class NioWorker implements Runnable { if (!buf.hasRemaining()) { // Successful write - proceed to the next message. if (channel.currentWriteBufferIsPooled) { - DirectBufferPool.release(buf); + directBufferPool.release(buf); } ChannelFuture future = evt.getFuture(); @@ -443,7 +488,7 @@ class NioWorker implements Runnable { // Doesn't need a user attention - ignore. } catch (Throwable t) { if (channel.currentWriteBufferIsPooled) { - DirectBufferPool.release(buf); + directBufferPool.release(buf); } ChannelFuture future = evt.getFuture(); channel.currentWriteEvent = null; @@ -458,59 +503,75 @@ class NioWorker implements Runnable { } } } - } finally { - writeLock.unlock(); + channel.inWriteNowLoop = false; } fireWriteComplete(channel, writtenBytes); if (open) { - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. if (addOpWrite) { - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - interestOps |= SelectionKey.OP_WRITE; - setInterestOps0(channel, interestOps); - } - } + setOpWrite(channel); } else if (removeOpWrite) { - synchronized (channel.interestOpsLock) { - int interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - interestOps &= ~SelectionKey.OP_WRITE; - setInterestOps0(channel, interestOps); - } - } + clearOpWrite(channel); } } } - private void rescheduleWrite(final NioSocketChannel channel) { - if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { - boolean offered = writeTaskQueue.offer(channel.writeTask); - assert offered; + private void setOpWrite(NioSocketChannel channel) { + Selector selector = this.selector; + SelectionKey key = channel.socket.keyFor(selector); + if (key == null) { + return; + } + if (!key.isValid()) { + close(key); + return; + } + int interestOps; + boolean changed = false; + + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. + synchronized (channel.interestOpsLock) { + interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + interestOps |= SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; + } } - if (!(channel instanceof NioAcceptedSocketChannel) || - ((NioAcceptedSocketChannel) channel).bossThread != Thread.currentThread()) { - final Selector workerSelector = selector; - if (workerSelector != null) { - if (wakenUp.compareAndSet(false, true)) { - workerSelector.wakeup(); - } + if (changed) { + channel.setRawInterestOpsNow(interestOps); + } + } + + private void clearOpWrite(NioSocketChannel channel) { + Selector selector = this.selector; + SelectionKey key = channel.socket.keyFor(selector); + if (key == null) { + return; + } + if (!key.isValid()) { + close(key); + return; + } + int interestOps; + boolean changed = false; + + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. + synchronized (channel.interestOpsLock) { + interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + interestOps &= ~SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; } - } else { - // A write request can be made from an acceptor thread (boss) - // when a user attempted to write something in: - // - // * channelOpen() - // * channelBound() - // * channelConnected(). - // - // In this case, there's no need to wake up the selector because - // the channel is not even registered yet at this moment. + } + + if (changed) { + channel.setRawInterestOpsNow(interestOps); } } @@ -546,8 +607,7 @@ class NioWorker implements Runnable { boolean fireExceptionCaught = false; // Clean up the stale messages in the write buffer. - channel.writeLock.lock(); - try { + synchronized (channel.writeLock) { MessageEvent evt = channel.currentWriteEvent; ByteBuffer buf = channel.currentWriteBuffer; if (evt != null) { @@ -560,7 +620,7 @@ class NioWorker implements Runnable { } if (channel.currentWriteBufferIsPooled) { - DirectBufferPool.release(buf); + directBufferPool.release(buf); } ChannelFuture future = evt.getFuture(); @@ -593,8 +653,6 @@ class NioWorker implements Runnable { fireExceptionCaught = true; } } - } finally { - channel.writeLock.unlock(); } if (fireExceptionCaught) { @@ -604,21 +662,64 @@ class NioWorker implements Runnable { void setInterestOps( NioSocketChannel channel, ChannelFuture future, int interestOps) { - - // Override OP_WRITE flag - a user cannot change this flag. - interestOps &= ~Channel.OP_WRITE; - interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; - + boolean changed = false; try { // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. - boolean changed; synchronized (channel.interestOpsLock) { - changed = setInterestOps0(channel, interestOps); + Selector selector = this.selector; + SelectionKey key = channel.socket.keyFor(selector); + + if (key == null || selector == null) { + // Not registered to the worker yet. + // Set the rawInterestOps immediately; RegisterTask will pick it up. + channel.setRawInterestOpsNow(interestOps); + return; + } + + // Override OP_WRITE flag - a user cannot change this flag. + interestOps &= ~Channel.OP_WRITE; + interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; + + switch (CONSTRAINT_LEVEL) { + case 0: + if (channel.getRawInterestOps() != interestOps) { + key.interestOps(interestOps); + if (Thread.currentThread() != thread && + wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + changed = true; + } + break; + case 1: + case 2: + if (channel.getRawInterestOps() != interestOps) { + if (Thread.currentThread() == thread) { + key.interestOps(interestOps); + changed = true; + } else { + selectorGuard.readLock().lock(); + try { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + key.interestOps(interestOps); + changed = true; + } finally { + selectorGuard.readLock().unlock(); + } + } + } + break; + default: + throw new Error(); + } } future.setSuccess(); if (changed) { + channel.setRawInterestOpsNow(interestOps); fireChannelInterestChanged(channel); } } catch (CancelledKeyException e) { @@ -632,57 +733,6 @@ class NioWorker implements Runnable { } } - private boolean setInterestOps0(NioSocketChannel channel, int interestOps) { - Selector selector = this.selector; - SelectionKey key = channel.socket.keyFor(selector); - - if (key == null || selector == null) { - // Not registered to the worker yet. - // Set the rawInterestOps immediately; RegisterTask will pick it up. - channel.setRawInterestOpsNow(interestOps); - return false; - } - - switch (CONSTRAINT_LEVEL) { - case 0: - if (channel.getRawInterestOps() != interestOps) { - key.interestOps(interestOps); - if (Thread.currentThread() != thread && - wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - channel.setRawInterestOpsNow(interestOps); - return true; - } - break; - case 1: - case 2: - if (channel.getRawInterestOps() != interestOps) { - if (Thread.currentThread() == thread) { - key.interestOps(interestOps); - channel.setRawInterestOpsNow(interestOps); - return true; - } else { - selectorGuard.readLock().lock(); - try { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - key.interestOps(interestOps); - channel.setRawInterestOpsNow(interestOps); - return true; - } finally { - selectorGuard.readLock().unlock(); - } - } - } - break; - default: - throw new Error(); - } - return false; - } - private final class RegisterTask implements Runnable { private final NioSocketChannel channel; private final ChannelFuture future;