diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java index 993d3ad915..637dcd8b6c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java @@ -49,16 +49,17 @@ import org.jboss.netty.util.internal.ThreadLocalBoolean; * NioDatagramChannel provides a connection less NIO UDP channel for Netty. *

* - * @author Daniel Bevenius - * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Daniel Bevenius (dbevenius@jboss.com) + * @version $Rev$, $Date$ */ public class NioDatagramChannel extends AbstractChannel implements ServerChannel { /** * Internal Netty logger. */ - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioDatagramChannel.class); + private static final InternalLogger logger = InternalLoggerFactory + .getInstance(NioDatagramChannel.class); /** * The {@link DatagramChannelConfig}. @@ -76,22 +77,17 @@ public class NioDatagramChannel extends AbstractChannel implements private final DatagramChannel datagramChannel; /** - * - */ - volatile ChannelFuture connectFuture; - - /** - * + * Monitor object to synchronize access to InterestedOps. */ final Object interestOpsLock = new Object(); /** - * + * Monitor object for synchronizing access to the {@link WriteBufferQueue}. */ final Object writeLock = new Object(); /** - * + * WriteTask that performs write operations. */ final Runnable writeTask = new WriteTask(); @@ -101,7 +97,7 @@ public class NioDatagramChannel extends AbstractChannel implements final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); /** - * + * Queue of write {@link MessageEvent}s. */ final Queue writeBufferQueue = new WriteBufferQueue(); @@ -112,22 +108,22 @@ public class NioDatagramChannel extends AbstractChannel implements final AtomicInteger writeBufferSize = new AtomicInteger(); /** - * + * Keeps track of the highWaterMark. */ final AtomicInteger highWaterMarkCounter = new AtomicInteger(); /** - * + * The current write {@link MessageEvent} */ MessageEvent currentWriteEvent; /** - * + * The current write index. */ int currentWriteIndex; /** - * + * Boolean that indicates that write operation is in progress. */ volatile boolean inWriteNowLoop; @@ -221,7 +217,6 @@ public class NioDatagramChannel extends AbstractChannel implements * WriteBuffer is an extension of {@link LinkedTransferQueue} that adds * support for highWaterMark checking of the write buffer size. * - * @author Daniel Bevenius */ private final class WriteBufferQueue extends LinkedTransferQueue { @@ -240,20 +235,15 @@ public class NioDatagramChannel extends AbstractChannel implements final boolean success = super.offer(e); assert success; - final int messageSize = - ((ChannelBuffer) e.getMessage()).readableBytes(); + final int messageSize = ((ChannelBuffer) e.getMessage()) + .readableBytes(); - // Add the ChannelBuffers size to the writeBuffersSize - final int newWriteBufferSize = - writeBufferSize.addAndGet(messageSize); + final int newWriteBufferSize = writeBufferSize + .addAndGet(messageSize); final int highWaterMark = getConfig().getWriteBufferHighWaterMark(); - // Check if the newly calculated buffersize exceeds the highWaterMark limit. if (newWriteBufferSize >= highWaterMark) { - // Check to see if the messages size we are adding is what will cause the highWaterMark to be breached. if (newWriteBufferSize - messageSize < highWaterMark) { - // Increment the highWaterMarkCounter which track of the fact that the count - // has been reached. highWaterMarkCounter.incrementAndGet(); if (!notifying.get()) { @@ -275,16 +265,14 @@ public class NioDatagramChannel extends AbstractChannel implements public MessageEvent poll() { final MessageEvent e = super.poll(); if (e != null) { - final int messageSize = - ((ChannelBuffer) e.getMessage()).readableBytes(); - // Subtract the ChannelBuffers size from the writeBuffersSize - final int newWriteBufferSize = - writeBufferSize.addAndGet(-messageSize); + final int messageSize = ((ChannelBuffer) e.getMessage()) + .readableBytes(); + final int newWriteBufferSize = writeBufferSize + .addAndGet(-messageSize); - final int lowWaterMark = - getConfig().getWriteBufferLowWaterMark(); + final int lowWaterMark = getConfig() + .getWriteBufferLowWaterMark(); - // Check if the newly calculated buffersize exceeds the lowhWaterMark limit. if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { if (newWriteBufferSize + messageSize >= lowWaterMark) { @@ -304,8 +292,6 @@ public class NioDatagramChannel extends AbstractChannel implements /** * WriteTask is a simple runnable performs writes by delegating the {@link NioUdpWorker}. * - * @author Daniel Bevenius - * */ private final class WriteTask implements Runnable { WriteTask() { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java index e0cd4008f5..6e82846e94 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java @@ -51,7 +51,9 @@ import org.jboss.netty.util.internal.ExecutorUtil; * threads. A worker thread performs non-blocking read and write for one or * more {@link Channel}s in a non-blocking mode. * - * @author Daniel Bevenius + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Daniel Bevenius (dbevenius@jboss.com) + * @version $Rev$, $Date$ * */ public class NioDatagramChannelFactory implements ChannelFactory, diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java index 595a9f6ea0..7e37d800fa 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -20,38 +20,33 @@ */ package org.jboss.netty.channel.socket.nio; -import static org.jboss.netty.channel.Channels.*; +import static org.jboss.netty.channel.Channels.fireChannelBound; +import static org.jboss.netty.channel.Channels.fireChannelClosed; +import static org.jboss.netty.channel.Channels.fireChannelUnbound; +import static org.jboss.netty.channel.Channels.fireExceptionCaught; -import java.io.IOException; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import org.jboss.netty.channel.AbstractChannelSink; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.logging.InternalLogger; -import org.jboss.netty.logging.InternalLoggerFactory; /** * NioDatagramPipelineSink receives downstream events from a ChannelPipeline. *

* A {@link NioDatagramPipelineSink} contains an array of {@link NioUdpWorker}s * - * @author Daniel Bevenius + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Daniel Bevenius (dbevenius@jboss.com) + * @version $Rev$, $Date$ */ public class NioDatagramPipelineSink extends AbstractChannelSink { - /** - * Internal Netty logger. - */ - private final InternalLogger logger = - InternalLoggerFactory.getInstance(NioDatagramPipelineSink.class); private static final AtomicInteger nextId = new AtomicInteger(); @@ -163,29 +158,4 @@ public class NioDatagramPipelineSink extends AbstractChannelSink { return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; } - /** - * The connection sematics of a NioDatagramPipelineSink are different for datagram sockets than they are for stream - * sockets. Placing a DatagramChannel into a connected state causes datagrams to be ignored from any source - * address other than the one to which the channel is connected. Unwanted packets will be dropped. - * Not sure that this makes sense for a server side component. - * - * @param channel The UdpChannel to connect from. - * @param future - * @param remoteAddress The remote address to connect to. - */ - @SuppressWarnings("unused") - private void connect(final NioDatagramChannel channel, - ChannelFuture future, SocketAddress remoteAddress) { - try { - try { - channel.getDatagramChannel().socket().connect(remoteAddress); - } catch (final IOException e) { - future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - channel.connectFuture = future; - } - } catch (final Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java index 69db0217ba..7710b63418 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java @@ -22,7 +22,15 @@ */ package org.jboss.netty.channel.socket.nio; -import static org.jboss.netty.channel.Channels.*; +import static org.jboss.netty.channel.Channels.fireChannelClosed; +import static org.jboss.netty.channel.Channels.fireChannelConnected; +import static org.jboss.netty.channel.Channels.fireChannelDisconnected; +import static org.jboss.netty.channel.Channels.fireChannelInterestChanged; +import static org.jboss.netty.channel.Channels.fireChannelUnbound; +import static org.jboss.netty.channel.Channels.fireExceptionCaught; +import static org.jboss.netty.channel.Channels.fireMessageReceived; +import static org.jboss.netty.channel.Channels.fireWriteComplete; +import static org.jboss.netty.channel.Channels.succeededFuture; import java.io.IOException; import java.net.SocketAddress; @@ -55,25 +63,19 @@ import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.internal.LinkedTransferQueue; /** - * + * NioUdpWorker is responsible for registering channels with selector, and + * also manages the select process. + * * @author The Netty Project (netty-dev@lists.jboss.org) - * @author Trustin Lee (tlee@redhat.com) - * + * @author Daniel Bevenius (dbevenius@jboss.com) * @version $Rev$, $Date$ - * */ class NioUdpWorker implements Runnable { /** * Internal Netty logger. */ - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioUdpWorker.class); - - /** - * - */ - private static final int CONSTRAINT_LEVEL = - NioProviderMetadata.CONSTRAINT_LEVEL; + private static final InternalLogger logger = InternalLoggerFactory + .getInstance(NioUdpWorker.class); /** * Maximum packate size for UDP packets. @@ -113,31 +115,39 @@ class NioUdpWorker implements Runnable { volatile Selector selector; /** - * + * Boolean that controls determines if a blocked Selector.select should + * break out of its selection process. In our case we use a timeone for + * the select method and the select method will block for that time unless + * woken up. */ private final AtomicBoolean wakenUp = new AtomicBoolean(); + /** + * Lock for this workers Selector. + */ private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); + /** + * Monitor object used to synchronize selector open/close. + */ private final Object startStopLock = new Object(); /** * Queue of {@link ChannelRegistionTask}s */ - private final Queue registerTaskQueue = - new LinkedTransferQueue(); + private final Queue registerTaskQueue = new LinkedTransferQueue(); /** - * Queue of + * Queue of WriteTasks */ - private final Queue writeTaskQueue = - new LinkedTransferQueue(); + private final Queue writeTaskQueue = new LinkedTransferQueue(); /** - * - * @param bossId - * @param id - * @param executor + * Sole constructor. + * + * @param bossId This id of the NioDatagramPipelineSink. + * @param id The id of this worker. + * @param executor Executor used to exeucte runnables such as {@link ChannelRegistionTask}. */ NioUdpWorker(final int bossId, final int id, final Executor executor) { this.bossId = bossId; @@ -152,8 +162,8 @@ class NioUdpWorker implements Runnable { * @param future */ void register(final NioDatagramChannel channel, final ChannelFuture future) { - final Runnable channelRegTask = - new ChannelRegistionTask(channel, future); + final Runnable channelRegTask = new ChannelRegistionTask(channel, + future); Selector selector; synchronized (startStopLock) { @@ -168,6 +178,7 @@ class NioUdpWorker implements Runnable { boolean success = false; try { + // Start the main selector loop. See run() for details. executor.execute(new ThreadRenamingRunnable(this, "New I/O server worker #" + bossId + "'-'" + id)); success = true; @@ -201,6 +212,9 @@ class NioUdpWorker implements Runnable { } } + /** + * Selector loop. + */ public void run() { // Store a ref to the current thread. thread = Thread.currentThread(); @@ -211,7 +225,8 @@ class NioUdpWorker implements Runnable { for (;;) { wakenUp.set(false); - if (CONSTRAINT_LEVEL != 0) { + // + if (NioProviderMetadata.CONSTRAINT_LEVEL != 0) { selectorGuard.writeLock().lock(); // This empty synchronization block prevents the selector from acquiring its lock. selectorGuard.writeLock().unlock(); @@ -234,7 +249,8 @@ class NioUdpWorker implements Runnable { processSelectedKeys(selector.selectedKeys()); } - // Exit the loop when there's nothing to handle. + // Exit the loop when there's nothing to handle (the registered + // key set is empty. // The shutdown flag is used to delay the shutdown of this // loop to avoid excessive Selector creation when // connections are registered in a one-by-one manner instead of @@ -338,8 +354,8 @@ class NioUdpWorker implements Runnable { * @param key The selection key which contains the Selector registration information. */ private static void read(final SelectionKey key) { - final NioDatagramChannel nioDatagramChannel = - (NioDatagramChannel) key.attachment(); + final NioDatagramChannel nioDatagramChannel = (NioDatagramChannel) key + .attachment(); final DatagramChannel datagramChannel = (DatagramChannel) key.channel(); try { @@ -351,21 +367,14 @@ class NioUdpWorker implements Runnable { // Recieve from the channel in a non blocking mode. We have already been notified that // the channel is ready to receive. - final SocketAddress remoteAddress = - datagramChannel.receive(byteBuffer); - /* - if (remoteAddress == null) - { - // No data was available so return false to indicate this. - return false; - } - */ + final SocketAddress remoteAddress = datagramChannel + .receive(byteBuffer); // Flip the buffer so that we can wrap it. byteBuffer.flip(); // Create a Netty ChannelByffer by wrapping the ByteBuffer. - final ChannelBuffer channelBuffer = - ChannelBuffers.wrappedBuffer(byteBuffer); + final ChannelBuffer channelBuffer = ChannelBuffers + .wrappedBuffer(byteBuffer); logger.debug("ChannelBuffer : " + channelBuffer + ", remoteAdress: " + remoteAddress); @@ -378,7 +387,6 @@ class NioUdpWorker implements Runnable { fireExceptionCaught(nioDatagramChannel, t); } } - //return true; } private static void close(SelectionKey k) { @@ -409,11 +417,6 @@ class NioUdpWorker implements Runnable { } } - /** - * - * @param channel - * @return - */ private static boolean scheduleWriteIfNecessary( final NioDatagramChannel channel) { final NioUdpWorker worker = channel.worker; @@ -422,8 +425,8 @@ class NioUdpWorker implements Runnable { if (workerThread == null || Thread.currentThread() != workerThread) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { // "add" the channels writeTask to the writeTaskQueue. - boolean offered = - worker.writeTaskQueue.offer(channel.writeTask); + boolean offered = worker.writeTaskQueue + .offer(channel.writeTask); assert offered; } @@ -452,7 +455,7 @@ class NioUdpWorker implements Runnable { Queue writeBuffer = channel.writeBufferQueue; synchronized (channel.writeLock) { - // inform the channel that write is inprogres + // inform the channel that write is in-progress channel.inWriteNowLoop = true; // get the write event. evt = channel.currentWriteEvent; @@ -477,11 +480,9 @@ class NioUdpWorker implements Runnable { try { for (int i = writeSpinCount; i > 0; i --) { ChannelBuffer buffer = (ChannelBuffer) evt.getMessage(); - int localWrittenBytes = - channel.getDatagramChannel().send( - buffer.toByteBuffer(), + int localWrittenBytes = channel.getDatagramChannel() + .send(buffer.toByteBuffer(), evt.getRemoteAddress()); - //int localWrittenBytes = buf.getBytes( bufIdx, channel.getDatagramChannel(), buf.writerIndex() - bufIdx); if (localWrittenBytes != 0) { bufIdx += localWrittenBytes; writtenBytes += localWrittenBytes; @@ -672,10 +673,14 @@ class NioUdpWorker implements Runnable { interestOps &= ~Channel.OP_WRITE; interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; - switch (CONSTRAINT_LEVEL) { + switch (NioProviderMetadata.CONSTRAINT_LEVEL) { case 0: if (channel.getRawInterestOps() != interestOps) { + // Set the interesteOps on the SelectionKey key.interestOps(interestOps); + // If the worker thread (the one that that might possibly be blocked + // in a select() call) is not the thread executing this method wakeup + // the select() operation. if (Thread.currentThread() != worker.thread && worker.wakenUp.compareAndSet(false, true)) { selector.wakeup(); @@ -687,9 +692,13 @@ class NioUdpWorker implements Runnable { case 2: if (channel.getRawInterestOps() != interestOps) { if (Thread.currentThread() == worker.thread) { + // Going to set the interestOps from the same thread. + // Set the interesteOps on the SelectionKey key.interestOps(interestOps); changed = true; } else { + // Going to set the interestOps from a different thread + // and some old provides will need synchronization. worker.selectorGuard.readLock().lock(); try { if (worker.wakenUp.compareAndSet(false, true)) { @@ -767,8 +776,6 @@ class NioUdpWorker implements Runnable { throw new ChannelException( "Failed to register a socket to the selector.", e); } - - //fireChannelBound(channel, localAddress); fireChannelConnected(channel, localAddress); } } diff --git a/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelTest.java b/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelTest.java index 9e6df24165..baf776a5a6 100644 --- a/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelTest.java +++ b/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelTest.java @@ -39,7 +39,9 @@ import org.junit.Test; /** * Unit test for {@link NioDatagramChannel} * - * @author Daniel Bevenius + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Daniel Bevenius (dbevenius@jboss.com) + * @version $Rev$, $Date$ */ public class NioDatagramChannelTest { private static Channel sc; @@ -48,9 +50,9 @@ public class NioDatagramChannelTest { @BeforeClass public static void setupChannel() { - final ServerBootstrap sb = - new ServerBootstrap(new NioDatagramChannelFactory(Executors - .newCachedThreadPool())); + final NioDatagramChannelFactory channelFactory = new NioDatagramChannelFactory( + Executors.newCachedThreadPool()); + final ServerBootstrap sb = new ServerBootstrap(channelFactory); inetSocketAddress = new InetSocketAddress("localhost", 9999); sc = sb.bind(inetSocketAddress); final SimpleHandler handler = new SimpleHandler(); @@ -59,8 +61,8 @@ public class NioDatagramChannelTest { @Test public void checkBoundPort() throws Throwable { - final InetSocketAddress socketAddress = - (InetSocketAddress) sc.getLocalAddress(); + final InetSocketAddress socketAddress = (InetSocketAddress) sc + .getLocalAddress(); assertEquals(9999, socketAddress.getPort()); } @@ -79,14 +81,14 @@ public class NioDatagramChannelTest { } public void clientBootstrap() { - final ClientBootstrap bootstrap = - new ClientBootstrap(new NioDatagramChannelFactory(Executors - .newCachedThreadPool())); + final NioDatagramChannelFactory channelFactory = new NioDatagramChannelFactory( + Executors.newCachedThreadPool()); + final ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); bootstrap.getPipeline().addLast("test", new SimpleHandler()); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); - InetSocketAddress clientAddress = - new InetSocketAddress("localhost", 8888); + InetSocketAddress clientAddress = new InetSocketAddress("localhost", + 8888); bootstrap.setOption("localAddress", clientAddress); ChannelFuture ccf = bootstrap.connect(inetSocketAddress); @@ -94,9 +96,8 @@ public class NioDatagramChannelTest { Channel cc = ccf.getChannel(); final String payload = "client payload"; - ChannelFuture write = - cc.write(ChannelBuffers.wrappedBuffer(payload.getBytes(), 0, - payload.length())); + ChannelFuture write = cc.write(ChannelBuffers.wrappedBuffer(payload + .getBytes(), 0, payload.length())); write.awaitUninterruptibly(); } @@ -111,9 +112,8 @@ public class NioDatagramChannelTest { } private void sendRecive(final String expectedPayload) throws IOException { - final UdpClient udpClient = - new UdpClient(inetSocketAddress.getAddress(), inetSocketAddress - .getPort()); + final UdpClient udpClient = new UdpClient(inetSocketAddress + .getAddress(), inetSocketAddress.getPort()); final DatagramPacket dp = udpClient.send(expectedPayload.getBytes()); dp.setData(new byte[expectedPayload.length()]);