Added some javadocs and removed unsed code and excessive comments.

This commit is contained in:
Daniel Bevenius 2009-06-11 04:33:07 +00:00
parent effbe32fdb
commit c01e997a45
5 changed files with 113 additions and 148 deletions

View File

@ -49,16 +49,17 @@ import org.jboss.netty.util.internal.ThreadLocalBoolean;
* NioDatagramChannel provides a connection less NIO UDP channel for Netty.
* <p/>
*
* @author <a href="mailto:dbevenius@jboss.com">Daniel Bevenius</a>
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Daniel Bevenius (dbevenius@jboss.com)
* @version $Rev$, $Date$
*/
public class NioDatagramChannel extends AbstractChannel implements
ServerChannel {
/**
* Internal Netty logger.
*/
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioDatagramChannel.class);
private static final InternalLogger logger = InternalLoggerFactory
.getInstance(NioDatagramChannel.class);
/**
* The {@link DatagramChannelConfig}.
@ -76,22 +77,17 @@ public class NioDatagramChannel extends AbstractChannel implements
private final DatagramChannel datagramChannel;
/**
*
*/
volatile ChannelFuture connectFuture;
/**
*
* Monitor object to synchronize access to InterestedOps.
*/
final Object interestOpsLock = new Object();
/**
*
* Monitor object for synchronizing access to the {@link WriteBufferQueue}.
*/
final Object writeLock = new Object();
/**
*
* WriteTask that performs write operations.
*/
final Runnable writeTask = new WriteTask();
@ -101,7 +97,7 @@ public class NioDatagramChannel extends AbstractChannel implements
final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
/**
*
* Queue of write {@link MessageEvent}s.
*/
final Queue<MessageEvent> writeBufferQueue = new WriteBufferQueue();
@ -112,22 +108,22 @@ public class NioDatagramChannel extends AbstractChannel implements
final AtomicInteger writeBufferSize = new AtomicInteger();
/**
*
* Keeps track of the highWaterMark.
*/
final AtomicInteger highWaterMarkCounter = new AtomicInteger();
/**
*
* The current write {@link MessageEvent}
*/
MessageEvent currentWriteEvent;
/**
*
* The current write index.
*/
int currentWriteIndex;
/**
*
* Boolean that indicates that write operation is in progress.
*/
volatile boolean inWriteNowLoop;
@ -221,7 +217,6 @@ public class NioDatagramChannel extends AbstractChannel implements
* WriteBuffer is an extension of {@link LinkedTransferQueue} that adds
* support for highWaterMark checking of the write buffer size.
*
* @author <a href="mailto:dbevenius@jboss.com">Daniel Bevenius</a>
*/
private final class WriteBufferQueue extends
LinkedTransferQueue<MessageEvent> {
@ -240,20 +235,15 @@ public class NioDatagramChannel extends AbstractChannel implements
final boolean success = super.offer(e);
assert success;
final int messageSize =
((ChannelBuffer) e.getMessage()).readableBytes();
final int messageSize = ((ChannelBuffer) e.getMessage())
.readableBytes();
// Add the ChannelBuffers size to the writeBuffersSize
final int newWriteBufferSize =
writeBufferSize.addAndGet(messageSize);
final int newWriteBufferSize = writeBufferSize
.addAndGet(messageSize);
final int highWaterMark = getConfig().getWriteBufferHighWaterMark();
// Check if the newly calculated buffersize exceeds the highWaterMark limit.
if (newWriteBufferSize >= highWaterMark) {
// Check to see if the messages size we are adding is what will cause the highWaterMark to be breached.
if (newWriteBufferSize - messageSize < highWaterMark) {
// Increment the highWaterMarkCounter which track of the fact that the count
// has been reached.
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
@ -275,16 +265,14 @@ public class NioDatagramChannel extends AbstractChannel implements
public MessageEvent poll() {
final MessageEvent e = super.poll();
if (e != null) {
final int messageSize =
((ChannelBuffer) e.getMessage()).readableBytes();
// Subtract the ChannelBuffers size from the writeBuffersSize
final int newWriteBufferSize =
writeBufferSize.addAndGet(-messageSize);
final int messageSize = ((ChannelBuffer) e.getMessage())
.readableBytes();
final int newWriteBufferSize = writeBufferSize
.addAndGet(-messageSize);
final int lowWaterMark =
getConfig().getWriteBufferLowWaterMark();
final int lowWaterMark = getConfig()
.getWriteBufferLowWaterMark();
// Check if the newly calculated buffersize exceeds the lowhWaterMark limit.
if (newWriteBufferSize == 0 ||
newWriteBufferSize < lowWaterMark) {
if (newWriteBufferSize + messageSize >= lowWaterMark) {
@ -304,8 +292,6 @@ public class NioDatagramChannel extends AbstractChannel implements
/**
* WriteTask is a simple runnable performs writes by delegating the {@link NioUdpWorker}.
*
* @author <a href="mailto:dbevenius@jboss.com">Daniel Bevenius</a>
*
*/
private final class WriteTask implements Runnable {
WriteTask() {

View File

@ -51,7 +51,9 @@ import org.jboss.netty.util.internal.ExecutorUtil;
* threads. A worker thread performs non-blocking read and write for one or
* more {@link Channel}s in a non-blocking mode.
*
* @author <a href="mailto:dbevenius@jboss.com">Daniel Bevenius</a>
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Daniel Bevenius (dbevenius@jboss.com)
* @version $Rev$, $Date$
*
*/
public class NioDatagramChannelFactory implements ChannelFactory,

View File

@ -20,38 +20,33 @@
*/
package org.jboss.netty.channel.socket.nio;
import static org.jboss.netty.channel.Channels.*;
import static org.jboss.netty.channel.Channels.fireChannelBound;
import static org.jboss.netty.channel.Channels.fireChannelClosed;
import static org.jboss.netty.channel.Channels.fireChannelUnbound;
import static org.jboss.netty.channel.Channels.fireExceptionCaught;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
* NioDatagramPipelineSink receives downstream events from a ChannelPipeline.
* <p/>
* A {@link NioDatagramPipelineSink} contains an array of {@link NioUdpWorker}s
*
* @author <a href="mailto:dbevenius@jboss.com">Daniel Bevenius</a>
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Daniel Bevenius (dbevenius@jboss.com)
* @version $Rev$, $Date$
*/
public class NioDatagramPipelineSink extends AbstractChannelSink {
/**
* Internal Netty logger.
*/
private final InternalLogger logger =
InternalLoggerFactory.getInstance(NioDatagramPipelineSink.class);
private static final AtomicInteger nextId = new AtomicInteger();
@ -163,29 +158,4 @@ public class NioDatagramPipelineSink extends AbstractChannelSink {
return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
}
/**
* The connection sematics of a NioDatagramPipelineSink are different for datagram sockets than they are for stream
* sockets. Placing a DatagramChannel into a connected state causes datagrams to be ignored from any source
* address other than the one to which the channel is connected. Unwanted packets will be dropped.
* Not sure that this makes sense for a server side component.
*
* @param channel The UdpChannel to connect from.
* @param future
* @param remoteAddress The remote address to connect to.
*/
@SuppressWarnings("unused")
private void connect(final NioDatagramChannel channel,
ChannelFuture future, SocketAddress remoteAddress) {
try {
try {
channel.getDatagramChannel().socket().connect(remoteAddress);
} catch (final IOException e) {
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.connectFuture = future;
}
} catch (final Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
}

View File

@ -22,7 +22,15 @@
*/
package org.jboss.netty.channel.socket.nio;
import static org.jboss.netty.channel.Channels.*;
import static org.jboss.netty.channel.Channels.fireChannelClosed;
import static org.jboss.netty.channel.Channels.fireChannelConnected;
import static org.jboss.netty.channel.Channels.fireChannelDisconnected;
import static org.jboss.netty.channel.Channels.fireChannelInterestChanged;
import static org.jboss.netty.channel.Channels.fireChannelUnbound;
import static org.jboss.netty.channel.Channels.fireExceptionCaught;
import static org.jboss.netty.channel.Channels.fireMessageReceived;
import static org.jboss.netty.channel.Channels.fireWriteComplete;
import static org.jboss.netty.channel.Channels.succeededFuture;
import java.io.IOException;
import java.net.SocketAddress;
@ -55,25 +63,19 @@ import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.LinkedTransferQueue;
/**
*
* NioUdpWorker is responsible for registering channels with selector, and
* also manages the select process.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*
* @author Daniel Bevenius (dbevenius@jboss.com)
* @version $Rev$, $Date$
*
*/
class NioUdpWorker implements Runnable {
/**
* Internal Netty logger.
*/
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioUdpWorker.class);
/**
*
*/
private static final int CONSTRAINT_LEVEL =
NioProviderMetadata.CONSTRAINT_LEVEL;
private static final InternalLogger logger = InternalLoggerFactory
.getInstance(NioUdpWorker.class);
/**
* Maximum packate size for UDP packets.
@ -113,31 +115,39 @@ class NioUdpWorker implements Runnable {
volatile Selector selector;
/**
*
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeone for
* the select method and the select method will block for that time unless
* woken up.
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
/**
* Lock for this workers Selector.
*/
private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
/**
* Monitor object used to synchronize selector open/close.
*/
private final Object startStopLock = new Object();
/**
* Queue of {@link ChannelRegistionTask}s
*/
private final Queue<Runnable> registerTaskQueue =
new LinkedTransferQueue<Runnable>();
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
/**
* Queue of
* Queue of WriteTasks
*/
private final Queue<Runnable> writeTaskQueue =
new LinkedTransferQueue<Runnable>();
private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
/**
*
* @param bossId
* @param id
* @param executor
* Sole constructor.
*
* @param bossId This id of the NioDatagramPipelineSink.
* @param id The id of this worker.
* @param executor Executor used to exeucte runnables such as {@link ChannelRegistionTask}.
*/
NioUdpWorker(final int bossId, final int id, final Executor executor) {
this.bossId = bossId;
@ -152,8 +162,8 @@ class NioUdpWorker implements Runnable {
* @param future
*/
void register(final NioDatagramChannel channel, final ChannelFuture future) {
final Runnable channelRegTask =
new ChannelRegistionTask(channel, future);
final Runnable channelRegTask = new ChannelRegistionTask(channel,
future);
Selector selector;
synchronized (startStopLock) {
@ -168,6 +178,7 @@ class NioUdpWorker implements Runnable {
boolean success = false;
try {
// Start the main selector loop. See run() for details.
executor.execute(new ThreadRenamingRunnable(this,
"New I/O server worker #" + bossId + "'-'" + id));
success = true;
@ -201,6 +212,9 @@ class NioUdpWorker implements Runnable {
}
}
/**
* Selector loop.
*/
public void run() {
// Store a ref to the current thread.
thread = Thread.currentThread();
@ -211,7 +225,8 @@ class NioUdpWorker implements Runnable {
for (;;) {
wakenUp.set(false);
if (CONSTRAINT_LEVEL != 0) {
//
if (NioProviderMetadata.CONSTRAINT_LEVEL != 0) {
selectorGuard.writeLock().lock();
// This empty synchronization block prevents the selector from acquiring its lock.
selectorGuard.writeLock().unlock();
@ -234,7 +249,8 @@ class NioUdpWorker implements Runnable {
processSelectedKeys(selector.selectedKeys());
}
// Exit the loop when there's nothing to handle.
// Exit the loop when there's nothing to handle (the registered
// key set is empty.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connections are registered in a one-by-one manner instead of
@ -338,8 +354,8 @@ class NioUdpWorker implements Runnable {
* @param key The selection key which contains the Selector registration information.
*/
private static void read(final SelectionKey key) {
final NioDatagramChannel nioDatagramChannel =
(NioDatagramChannel) key.attachment();
final NioDatagramChannel nioDatagramChannel = (NioDatagramChannel) key
.attachment();
final DatagramChannel datagramChannel = (DatagramChannel) key.channel();
try {
@ -351,21 +367,14 @@ class NioUdpWorker implements Runnable {
// Recieve from the channel in a non blocking mode. We have already been notified that
// the channel is ready to receive.
final SocketAddress remoteAddress =
datagramChannel.receive(byteBuffer);
/*
if (remoteAddress == null)
{
// No data was available so return false to indicate this.
return false;
}
*/
final SocketAddress remoteAddress = datagramChannel
.receive(byteBuffer);
// Flip the buffer so that we can wrap it.
byteBuffer.flip();
// Create a Netty ChannelByffer by wrapping the ByteBuffer.
final ChannelBuffer channelBuffer =
ChannelBuffers.wrappedBuffer(byteBuffer);
final ChannelBuffer channelBuffer = ChannelBuffers
.wrappedBuffer(byteBuffer);
logger.debug("ChannelBuffer : " + channelBuffer +
", remoteAdress: " + remoteAddress);
@ -378,7 +387,6 @@ class NioUdpWorker implements Runnable {
fireExceptionCaught(nioDatagramChannel, t);
}
}
//return true;
}
private static void close(SelectionKey k) {
@ -409,11 +417,6 @@ class NioUdpWorker implements Runnable {
}
}
/**
*
* @param channel
* @return
*/
private static boolean scheduleWriteIfNecessary(
final NioDatagramChannel channel) {
final NioUdpWorker worker = channel.worker;
@ -422,8 +425,8 @@ class NioUdpWorker implements Runnable {
if (workerThread == null || Thread.currentThread() != workerThread) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
// "add" the channels writeTask to the writeTaskQueue.
boolean offered =
worker.writeTaskQueue.offer(channel.writeTask);
boolean offered = worker.writeTaskQueue
.offer(channel.writeTask);
assert offered;
}
@ -452,7 +455,7 @@ class NioUdpWorker implements Runnable {
Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
synchronized (channel.writeLock) {
// inform the channel that write is inprogres
// inform the channel that write is in-progress
channel.inWriteNowLoop = true;
// get the write event.
evt = channel.currentWriteEvent;
@ -477,11 +480,9 @@ class NioUdpWorker implements Runnable {
try {
for (int i = writeSpinCount; i > 0; i --) {
ChannelBuffer buffer = (ChannelBuffer) evt.getMessage();
int localWrittenBytes =
channel.getDatagramChannel().send(
buffer.toByteBuffer(),
int localWrittenBytes = channel.getDatagramChannel()
.send(buffer.toByteBuffer(),
evt.getRemoteAddress());
//int localWrittenBytes = buf.getBytes( bufIdx, channel.getDatagramChannel(), buf.writerIndex() - bufIdx);
if (localWrittenBytes != 0) {
bufIdx += localWrittenBytes;
writtenBytes += localWrittenBytes;
@ -672,10 +673,14 @@ class NioUdpWorker implements Runnable {
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
switch (CONSTRAINT_LEVEL) {
switch (NioProviderMetadata.CONSTRAINT_LEVEL) {
case 0:
if (channel.getRawInterestOps() != interestOps) {
// Set the interesteOps on the SelectionKey
key.interestOps(interestOps);
// If the worker thread (the one that that might possibly be blocked
// in a select() call) is not the thread executing this method wakeup
// the select() operation.
if (Thread.currentThread() != worker.thread &&
worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
@ -687,9 +692,13 @@ class NioUdpWorker implements Runnable {
case 2:
if (channel.getRawInterestOps() != interestOps) {
if (Thread.currentThread() == worker.thread) {
// Going to set the interestOps from the same thread.
// Set the interesteOps on the SelectionKey
key.interestOps(interestOps);
changed = true;
} else {
// Going to set the interestOps from a different thread
// and some old provides will need synchronization.
worker.selectorGuard.readLock().lock();
try {
if (worker.wakenUp.compareAndSet(false, true)) {
@ -767,8 +776,6 @@ class NioUdpWorker implements Runnable {
throw new ChannelException(
"Failed to register a socket to the selector.", e);
}
//fireChannelBound(channel, localAddress);
fireChannelConnected(channel, localAddress);
}
}

View File

@ -39,7 +39,9 @@ import org.junit.Test;
/**
* Unit test for {@link NioDatagramChannel}
*
* @author <a href="mailto:dbevenius@jboss.com">Daniel Bevenius</a>
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Daniel Bevenius (dbevenius@jboss.com)
* @version $Rev$, $Date$
*/
public class NioDatagramChannelTest {
private static Channel sc;
@ -48,9 +50,9 @@ public class NioDatagramChannelTest {
@BeforeClass
public static void setupChannel() {
final ServerBootstrap sb =
new ServerBootstrap(new NioDatagramChannelFactory(Executors
.newCachedThreadPool()));
final NioDatagramChannelFactory channelFactory = new NioDatagramChannelFactory(
Executors.newCachedThreadPool());
final ServerBootstrap sb = new ServerBootstrap(channelFactory);
inetSocketAddress = new InetSocketAddress("localhost", 9999);
sc = sb.bind(inetSocketAddress);
final SimpleHandler handler = new SimpleHandler();
@ -59,8 +61,8 @@ public class NioDatagramChannelTest {
@Test
public void checkBoundPort() throws Throwable {
final InetSocketAddress socketAddress =
(InetSocketAddress) sc.getLocalAddress();
final InetSocketAddress socketAddress = (InetSocketAddress) sc
.getLocalAddress();
assertEquals(9999, socketAddress.getPort());
}
@ -79,14 +81,14 @@ public class NioDatagramChannelTest {
}
public void clientBootstrap() {
final ClientBootstrap bootstrap =
new ClientBootstrap(new NioDatagramChannelFactory(Executors
.newCachedThreadPool()));
final NioDatagramChannelFactory channelFactory = new NioDatagramChannelFactory(
Executors.newCachedThreadPool());
final ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
bootstrap.getPipeline().addLast("test", new SimpleHandler());
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
InetSocketAddress clientAddress =
new InetSocketAddress("localhost", 8888);
InetSocketAddress clientAddress = new InetSocketAddress("localhost",
8888);
bootstrap.setOption("localAddress", clientAddress);
ChannelFuture ccf = bootstrap.connect(inetSocketAddress);
@ -94,9 +96,8 @@ public class NioDatagramChannelTest {
Channel cc = ccf.getChannel();
final String payload = "client payload";
ChannelFuture write =
cc.write(ChannelBuffers.wrappedBuffer(payload.getBytes(), 0,
payload.length()));
ChannelFuture write = cc.write(ChannelBuffers.wrappedBuffer(payload
.getBytes(), 0, payload.length()));
write.awaitUninterruptibly();
}
@ -111,9 +112,8 @@ public class NioDatagramChannelTest {
}
private void sendRecive(final String expectedPayload) throws IOException {
final UdpClient udpClient =
new UdpClient(inetSocketAddress.getAddress(), inetSocketAddress
.getPort());
final UdpClient udpClient = new UdpClient(inetSocketAddress
.getAddress(), inetSocketAddress.getPort());
final DatagramPacket dp = udpClient.send(expectedPayload.getBytes());
dp.setData(new byte[expectedPayload.length()]);