From 4bf4d5f814a4eee0ff16aa108ce513be71c72834 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 28 Jan 2010 07:50:28 +0000 Subject: [PATCH] Overhaul on NIO datagram writes * Write operation is made in the caller thread opportunistically * QOTM example uses NIO datagram transport now * This modification still requires more performance analysis and testing --- .../socket/nio/NioDatagramChannel.java | 11 +- .../socket/nio/NioDatagramPipelineSink.java | 2 +- .../channel/socket/nio/NioDatagramWorker.java | 245 +++++++----------- .../example/qotm/QuoteOfTheMomentClient.java | 4 +- .../example/qotm/QuoteOfTheMomentServer.java | 4 +- 5 files changed, 102 insertions(+), 164 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java index 933c137155..29d54f40cd 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java @@ -27,6 +27,8 @@ import java.nio.channels.DatagramChannel; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.AbstractChannel; @@ -76,7 +78,7 @@ class NioDatagramChannel extends AbstractChannel /** * Monitor object for synchronizing access to the {@link WriteBufferQueue}. */ - final Object writeLock = new Object(); + final Lock writeLock = new ReentrantLock(); /** * WriteTask that performs write operations. @@ -111,11 +113,6 @@ class NioDatagramChannel extends AbstractChannel ByteBuffer currentWriteBuffer; boolean currentWriteBufferIsPooled; - /** - * Boolean that indicates that write operation is in progress. - */ - volatile boolean inWriteNowLoop; - private volatile InetSocketAddress localAddress; volatile InetSocketAddress remoteAddress; @@ -316,7 +313,7 @@ class NioDatagramChannel extends AbstractChannel public void run() { writeTaskInTaskQueue.set(false); - worker.write(NioDatagramChannel.this, false); + worker.write(NioDatagramChannel.this); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java index 55fa3bfbe5..19a38261bc 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -108,7 +108,7 @@ class NioDatagramPipelineSink extends AbstractChannelSink { final MessageEvent event = (MessageEvent) e; final boolean offered = channel.writeBufferQueue.offer(event); assert offered; - channel.worker.write(channel, true); + channel.worker.write(channel); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java index decc051f59..a8c4dee554 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java @@ -372,7 +372,7 @@ class NioDatagramWorker implements Runnable { } private void write(SelectionKey k) { - write((NioDatagramChannel) k.attachment(), false); + write((NioDatagramChannel) k.attachment()); } /** @@ -437,8 +437,7 @@ class NioDatagramWorker implements Runnable { close(ch, succeededFuture(ch)); } - void write(final NioDatagramChannel channel, - final boolean mightNeedWakeup) { + void write(final NioDatagramChannel channel) { /* * Note that we are not checking if the channel is connected. Connected * has a different meaning in UDP and means that the channels socket is @@ -449,51 +448,19 @@ class NioDatagramWorker implements Runnable { return; } - if (mightNeedWakeup && scheduleWriteIfNecessary(channel)) { + if (!channel.writeLock.tryLock()) { + rescheduleWrite(channel); return; } - if (channel.inWriteNowLoop) { - scheduleWriteIfNecessary(channel); - } else { - writeNow(channel, channel.getConfig().getWriteSpinCount()); - } - } - - private boolean scheduleWriteIfNecessary(final NioDatagramChannel channel) { - final Thread workerThread = thread; - if (workerThread == null || Thread.currentThread() != workerThread) { - if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { - // "add" the channels writeTask to the writeTaskQueue. - boolean offered = writeTaskQueue.offer(channel.writeTask); - assert offered; - } - - final Selector selector = this.selector; - if (selector != null) { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - } - return true; - } - - return false; - } - - private void writeNow(final NioDatagramChannel channel, - final int writeSpinCount) { + final Queue writeBuffer = channel.writeBufferQueue; + final int writeSpinCount = channel.getConfig().getWriteSpinCount(); boolean addOpWrite = false; boolean removeOpWrite = false; - int writtenBytes = 0; - Queue writeBuffer = channel.writeBufferQueue; synchronized (channel.writeLock) { - // inform the channel that write is in-progress - channel.inWriteNowLoop = true; - // loop forever... for (;;) { MessageEvent evt = channel.currentWriteEvent; @@ -570,74 +537,44 @@ class NioDatagramWorker implements Runnable { fireExceptionCaught(channel, t); } } - channel.inWriteNowLoop = false; } fireWriteComplete(channel, writtenBytes); + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. if (addOpWrite) { - setOpWrite(channel); + synchronized (channel.interestOpsLock) { + int interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + interestOps |= SelectionKey.OP_WRITE; + setInterestOps0(channel, interestOps); + } + } } else if (removeOpWrite) { - clearOpWrite(channel); + synchronized (channel.interestOpsLock) { + int interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + interestOps &= ~SelectionKey.OP_WRITE; + setInterestOps0(channel, interestOps); + } + } } } - private void setOpWrite(final NioDatagramChannel channel) { - Selector selector = this.selector; - SelectionKey key = channel.getDatagramChannel().keyFor(selector); - if (key == null) { - return; + private void rescheduleWrite(final NioDatagramChannel channel) { + if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { + // "add" the channels writeTask to the writeTaskQueue. + boolean offered = writeTaskQueue.offer(channel.writeTask); + assert offered; } - if (!key.isValid()) { - close(key); - return; - } - int interestOps; - boolean changed = false; - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - interestOps |= SelectionKey.OP_WRITE; - key.interestOps(interestOps); - changed = true; + final Selector selector = this.selector; + if (selector != null) { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); } } - - if (changed) { - channel.setRawInterestOpsNow(interestOps); - } - } - - private void clearOpWrite(NioDatagramChannel channel) { - Selector selector = this.selector; - SelectionKey key = channel.getDatagramChannel().keyFor(selector); - if (key == null) { - return; - } - if (!key.isValid()) { - close(key); - return; - } - int interestOps; - boolean changed = false; - - // interestOps can change at any time and at any thread. - // Acquire a lock to avoid possible race condition. - synchronized (channel.interestOpsLock) { - interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - interestOps &= ~SelectionKey.OP_WRITE; - key.interestOps(interestOps); - changed = true; - } - } - - if (changed) { - channel.setRawInterestOpsNow(interestOps); - } } static void disconnect(NioDatagramChannel channel, ChannelFuture future) { @@ -742,72 +679,20 @@ class NioDatagramWorker implements Runnable { void setInterestOps(final NioDatagramChannel channel, ChannelFuture future, int interestOps) { - boolean changed = false; + // Override OP_WRITE flag - a user cannot change this flag. + interestOps &= ~Channel.OP_WRITE; + interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; + try { // interestOps can change at any time and by any thread. // Acquire a lock to avoid possible race condition. + final boolean changed; synchronized (channel.interestOpsLock) { - final Selector selector = this.selector; - final SelectionKey key = channel.getDatagramChannel().keyFor(selector); - - if (key == null || selector == null) { - // Not registered to the worker yet. - // Set the rawInterestOps immediately; RegisterTask will pick it up. - channel.setRawInterestOpsNow(interestOps); - return; - } - - // Override OP_WRITE flag - a user cannot change this flag. - interestOps &= ~Channel.OP_WRITE; - interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; - - switch (NioProviderMetadata.CONSTRAINT_LEVEL) { - case 0: - if (channel.getRawInterestOps() != interestOps) { - // Set the interesteOps on the SelectionKey - key.interestOps(interestOps); - // If the worker thread (the one that that might possibly be blocked - // in a select() call) is not the thread executing this method wakeup - // the select() operation. - if (Thread.currentThread() != thread && - wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - changed = true; - } - break; - case 1: - case 2: - if (channel.getRawInterestOps() != interestOps) { - if (Thread.currentThread() == thread) { - // Going to set the interestOps from the same thread. - // Set the interesteOps on the SelectionKey - key.interestOps(interestOps); - changed = true; - } else { - // Going to set the interestOps from a different thread - // and some old provides will need synchronization. - selectorGuard.readLock().lock(); - try { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - key.interestOps(interestOps); - changed = true; - } finally { - selectorGuard.readLock().unlock(); - } - } - } - break; - default: - throw new Error(); - } + changed = setInterestOps0(channel, interestOps); } future.setSuccess(); if (changed) { - channel.setRawInterestOpsNow(interestOps); fireChannelInterestChanged(channel); } } catch (final CancelledKeyException e) { @@ -821,6 +706,62 @@ class NioDatagramWorker implements Runnable { } } + private boolean setInterestOps0(NioDatagramChannel channel, int interestOps) { + final Selector selector = this.selector; + final SelectionKey key = channel.getDatagramChannel().keyFor(selector); + + if (key == null || selector == null) { + // Not registered to the worker yet. + // Set the rawInterestOps immediately; RegisterTask will pick it up. + channel.setRawInterestOpsNow(interestOps); + return false; + } + + switch (NioProviderMetadata.CONSTRAINT_LEVEL) { + case 0: + if (channel.getRawInterestOps() != interestOps) { + // Set the interesteOps on the SelectionKey + key.interestOps(interestOps); + // If the worker thread (the one that that might possibly be blocked + // in a select() call) is not the thread executing this method wakeup + // the select() operation. + if (Thread.currentThread() != thread && + wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + return true; + } + break; + case 1: + case 2: + if (channel.getRawInterestOps() != interestOps) { + if (Thread.currentThread() == thread) { + // Going to set the interestOps from the same thread. + // Set the interesteOps on the SelectionKey + key.interestOps(interestOps); + return true; + } else { + // Going to set the interestOps from a different thread + // and some old provides will need synchronization. + selectorGuard.readLock().lock(); + try { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + key.interestOps(interestOps); + return true; + } finally { + selectorGuard.readLock().unlock(); + } + } + } + break; + default: + throw new Error(); + } + return false; + } + /** * RegisterTask is a task responsible for registering a channel with a * selector. diff --git a/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClient.java b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClient.java index d2b95280d3..1c05e62727 100644 --- a/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClient.java +++ b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClient.java @@ -25,7 +25,7 @@ import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory; import org.jboss.netty.channel.socket.DatagramChannel; import org.jboss.netty.channel.socket.DatagramChannelFactory; -import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; +import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; import org.jboss.netty.util.CharsetUtil; @@ -44,7 +44,7 @@ public class QuoteOfTheMomentClient { public static void main(String[] args) throws Exception { DatagramChannelFactory f = - new OioDatagramChannelFactory(Executors.newCachedThreadPool()); + new NioDatagramChannelFactory(Executors.newCachedThreadPool()); ConnectionlessBootstrap b = new ConnectionlessBootstrap(f); diff --git a/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServer.java b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServer.java index 2ecc9a5d02..853152d5d1 100644 --- a/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServer.java +++ b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServer.java @@ -24,7 +24,7 @@ import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory; import org.jboss.netty.channel.socket.DatagramChannelFactory; -import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; +import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; import org.jboss.netty.util.CharsetUtil; @@ -43,7 +43,7 @@ public class QuoteOfTheMomentServer { public static void main(String[] args) throws Exception { DatagramChannelFactory f = - new OioDatagramChannelFactory(Executors.newCachedThreadPool()); + new NioDatagramChannelFactory(Executors.newCachedThreadPool()); ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);