diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java index cfd6e83949..324ea2acf9 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java @@ -29,8 +29,16 @@ final class DirectBufferPool { private static final int POOL_SIZE = 4; - @SuppressWarnings("unchecked") - private final SoftReference[] pool = new SoftReference[POOL_SIZE]; + private final ThreadLocal[]> pool = + new ThreadLocal[]>() { + @Override + @SuppressWarnings("unchecked") + protected SoftReference[] initialValue() { + return new SoftReference[POOL_SIZE]; + } + + }; + DirectBufferPool() { super(); @@ -44,6 +52,7 @@ final class DirectBufferPool { } final ByteBuffer acquire(int size) { + final SoftReference[] pool = this.pool.get(); for (int i = 0; i < POOL_SIZE; i ++) { SoftReference ref = pool[i]; if (ref == null) { @@ -73,6 +82,7 @@ final class DirectBufferPool { } final void release(ByteBuffer buffer) { + final SoftReference[] pool = this.pool.get(); for (int i = 0; i < POOL_SIZE; i ++) { SoftReference ref = pool[i]; if (ref == null || ref.get() == null) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 40866a70f8..6828c11e0b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -114,7 +114,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { NioSocketChannel channel = (NioSocketChannel) event.getChannel(); boolean offered = channel.writeBuffer.offer(event); assert offered; - channel.worker.write(channel, true); + channel.worker.write(channel); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index bcd8143b6f..14e19aba6d 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -134,7 +134,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { NioSocketChannel channel = (NioSocketChannel) event.getChannel(); boolean offered = channel.writeBuffer.offer(event); assert offered; - channel.worker.write(channel, true); + channel.worker.write(channel); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java index 8d0e4791c7..034c3163c7 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java @@ -15,7 +15,7 @@ */ package org.jboss.netty.channel.socket.nio; -import static org.jboss.netty.channel.Channels.*; +import static org.jboss.netty.channel.Channels.fireChannelInterestChanged; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -24,6 +24,8 @@ import java.nio.channels.SocketChannel; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.AbstractChannel; @@ -59,7 +61,7 @@ class NioSocketChannel extends AbstractChannel private volatile InetSocketAddress remoteAddress; final Object interestOpsLock = new Object(); - final Object writeLock = new Object(); + final Lock writeLock = new ReentrantLock(); final Runnable writeTask = new WriteTask(); final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); @@ -67,7 +69,6 @@ class NioSocketChannel extends AbstractChannel final Queue writeBuffer = new WriteBuffer(); final AtomicInteger writeBufferSize = new AtomicInteger(); final AtomicInteger highWaterMarkCounter = new AtomicInteger(); - volatile boolean inWriteNowLoop; MessageEvent currentWriteEvent; ByteBuffer currentWriteBuffer; @@ -257,7 +258,7 @@ class NioSocketChannel extends AbstractChannel public void run() { writeTaskInTaskQueue.set(false); - worker.write(NioSocketChannel.this, false); + worker.write(NioSocketChannel.this); } } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index c02a61e05f..d9888d04a2 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -366,7 +366,7 @@ class NioWorker implements Runnable { private void write(SelectionKey k) { NioSocketChannel ch = (NioSocketChannel) k.attachment(); - write(ch, false); + write(ch); } private void close(SelectionKey k) { @@ -374,69 +374,26 @@ class NioWorker implements Runnable { close(ch, succeededFuture(ch)); } - void write(final NioSocketChannel channel, boolean mightNeedWakeup) { + void write(final NioSocketChannel channel) { if (!channel.isConnected()) { cleanUpWriteBuffer(channel); return; } - if (mightNeedWakeup && scheduleWriteIfNecessary(channel)) { + if (!channel.writeLock.tryLock()) { + rescheduleWrite(channel); return; } - - if (channel.inWriteNowLoop) { - scheduleWriteIfNecessary(channel); - } else { - writeNow(channel, channel.getConfig().getWriteSpinCount()); - } - } - - private boolean scheduleWriteIfNecessary(final NioSocketChannel channel) { - final Thread currentThread = Thread.currentThread(); - final Thread workerThread = thread; - if (currentThread != workerThread) { - if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { - boolean offered = writeTaskQueue.offer(channel.writeTask); - assert offered; - } - - if (!(channel instanceof NioAcceptedSocketChannel) || - ((NioAcceptedSocketChannel) channel).bossThread != currentThread) { - final Selector workerSelector = selector; - if (workerSelector != null) { - if (wakenUp.compareAndSet(false, true)) { - workerSelector.wakeup(); - } - } - } else { - // A write request can be made from an acceptor thread (boss) - // when a user attempted to write something in: - // - // * channelOpen() - // * channelBound() - // * channelConnected(). - // - // In this case, there's no need to wake up the selector because - // the channel is not even registered yet at this moment. - } - - return true; - } - - return false; - } - - private void writeNow(NioSocketChannel channel, int writeSpinCount) { + + final Queue writeBuffer = channel.writeBuffer; + final int writeSpinCount = channel.getConfig().getWriteSpinCount(); boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; - int writtenBytes = 0; - Queue writeBuffer = channel.writeBuffer; - synchronized (channel.writeLock) { - channel.inWriteNowLoop = true; + try { for (;;) { MessageEvent evt = channel.currentWriteEvent; ByteBuffer buf; @@ -503,75 +460,60 @@ class NioWorker implements Runnable { } } } - channel.inWriteNowLoop = false; + } finally { + channel.writeLock.unlock(); } fireWriteComplete(channel, writtenBytes); if (open) { + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. if (addOpWrite) { - setOpWrite(channel); + synchronized (channel.interestOpsLock) { + int interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + interestOps |= SelectionKey.OP_WRITE; + setInterestOps0(channel, interestOps); + } + } } else if (removeOpWrite) { - clearOpWrite(channel); + synchronized (channel.interestOpsLock) { + int interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + interestOps &= ~SelectionKey.OP_WRITE; + setInterestOps0(channel, interestOps); + } + } } } } - private void setOpWrite(NioSocketChannel channel) { - Selector selector = this.selector; - SelectionKey key = channel.socket.keyFor(selector); - if (key == null) { - return; + private void rescheduleWrite(final NioSocketChannel channel) { + final Thread currentThread = Thread.currentThread(); + if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { + boolean offered = writeTaskQueue.offer(channel.writeTask); + assert offered; } - if (!key.isValid()) { - close(key); - return; - } - int interestOps; - boolean changed = false; - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - interestOps |= SelectionKey.OP_WRITE; - key.interestOps(interestOps); - changed = true; + if (!(channel instanceof NioAcceptedSocketChannel) || + ((NioAcceptedSocketChannel) channel).bossThread != currentThread) { + final Selector workerSelector = selector; + if (workerSelector != null) { + if (wakenUp.compareAndSet(false, true)) { + workerSelector.wakeup(); + } } - } - - if (changed) { - channel.setRawInterestOpsNow(interestOps); - } - } - - private void clearOpWrite(NioSocketChannel channel) { - Selector selector = this.selector; - SelectionKey key = channel.socket.keyFor(selector); - if (key == null) { - return; - } - if (!key.isValid()) { - close(key); - return; - } - int interestOps; - boolean changed = false; - - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - interestOps &= ~SelectionKey.OP_WRITE; - key.interestOps(interestOps); - changed = true; - } - } - - if (changed) { - channel.setRawInterestOpsNow(interestOps); + } else { + // A write request can be made from an acceptor thread (boss) + // when a user attempted to write something in: + // + // * channelOpen() + // * channelBound() + // * channelConnected(). + // + // In this case, there's no need to wake up the selector because + // the channel is not even registered yet at this moment. } } @@ -607,7 +549,8 @@ class NioWorker implements Runnable { boolean fireExceptionCaught = false; // Clean up the stale messages in the write buffer. - synchronized (channel.writeLock) { + channel.writeLock.lock(); + try { MessageEvent evt = channel.currentWriteEvent; ByteBuffer buf = channel.currentWriteBuffer; if (evt != null) { @@ -653,6 +596,8 @@ class NioWorker implements Runnable { fireExceptionCaught = true; } } + } finally { + channel.writeLock.unlock(); } if (fireExceptionCaught) { @@ -662,64 +607,21 @@ class NioWorker implements Runnable { void setInterestOps( NioSocketChannel channel, ChannelFuture future, int interestOps) { - boolean changed = false; + + // Override OP_WRITE flag - a user cannot change this flag. + interestOps &= ~Channel.OP_WRITE; + interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; + try { // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. + boolean changed; synchronized (channel.interestOpsLock) { - Selector selector = this.selector; - SelectionKey key = channel.socket.keyFor(selector); - - if (key == null || selector == null) { - // Not registered to the worker yet. - // Set the rawInterestOps immediately; RegisterTask will pick it up. - channel.setRawInterestOpsNow(interestOps); - return; - } - - // Override OP_WRITE flag - a user cannot change this flag. - interestOps &= ~Channel.OP_WRITE; - interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; - - switch (CONSTRAINT_LEVEL) { - case 0: - if (channel.getRawInterestOps() != interestOps) { - key.interestOps(interestOps); - if (Thread.currentThread() != thread && - wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - changed = true; - } - break; - case 1: - case 2: - if (channel.getRawInterestOps() != interestOps) { - if (Thread.currentThread() == thread) { - key.interestOps(interestOps); - changed = true; - } else { - selectorGuard.readLock().lock(); - try { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - key.interestOps(interestOps); - changed = true; - } finally { - selectorGuard.readLock().unlock(); - } - } - } - break; - default: - throw new Error(); - } + changed = setInterestOps0(channel, interestOps); } future.setSuccess(); if (changed) { - channel.setRawInterestOpsNow(interestOps); fireChannelInterestChanged(channel); } } catch (CancelledKeyException e) { @@ -733,6 +635,57 @@ class NioWorker implements Runnable { } } + private boolean setInterestOps0(NioSocketChannel channel, int interestOps) { + Selector selector = this.selector; + SelectionKey key = channel.socket.keyFor(selector); + + if (key == null || selector == null) { + // Not registered to the worker yet. + // Set the rawInterestOps immediately; RegisterTask will pick it up. + channel.setRawInterestOpsNow(interestOps); + return false; + } + + switch (CONSTRAINT_LEVEL) { + case 0: + if (channel.getRawInterestOps() != interestOps) { + key.interestOps(interestOps); + if (Thread.currentThread() != thread && + wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + channel.setRawInterestOpsNow(interestOps); + return true; + } + break; + case 1: + case 2: + if (channel.getRawInterestOps() != interestOps) { + if (Thread.currentThread() == thread) { + key.interestOps(interestOps); + channel.setRawInterestOpsNow(interestOps); + return true; + } else { + selectorGuard.readLock().lock(); + try { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + key.interestOps(interestOps); + channel.setRawInterestOpsNow(interestOps); + return true; + } finally { + selectorGuard.readLock().unlock(); + } + } + } + break; + default: + throw new Error(); + } + return false; + } + private final class RegisterTask implements Runnable { private final NioSocketChannel channel; private final ChannelFuture future;