* Merged recent changes in the trunk to the NIO UDP transport

* Other miscellaneous modifications like typo fix
This commit is contained in:
Trustin Lee 2009-06-11 06:10:46 +00:00
parent e4871f8460
commit 899b16678f
5 changed files with 199 additions and 148 deletions

View File

@ -25,7 +25,9 @@ package org.jboss.netty.channel.socket.nio;
import static org.jboss.netty.channel.Channels.*;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.nio.channels.DatagramChannel;
import java.util.Queue;
@ -34,16 +36,14 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.ServerChannel;
import org.jboss.netty.channel.socket.DatagramChannelConfig;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.ThreadLocalBoolean;
@ -56,13 +56,8 @@ import org.jboss.netty.util.internal.ThreadLocalBoolean;
*
* @version $Rev$, $Date$
*/
public class NioDatagramChannel extends AbstractChannel implements
ServerChannel {
/**
* Internal Netty logger.
*/
private static final InternalLogger logger = InternalLoggerFactory
.getInstance(NioDatagramChannel.class);
class NioDatagramChannel extends AbstractChannel
implements org.jboss.netty.channel.socket.DatagramChannel {
/**
* The {@link DatagramChannelConfig}.
@ -77,7 +72,7 @@ public class NioDatagramChannel extends AbstractChannel implements
/**
* The {@link DatagramChannel} that this channel uses.
*/
private final DatagramChannel datagramChannel;
private final java.nio.channels.DatagramChannel datagramChannel;
/**
* Monitor object to synchronize access to InterestedOps.
@ -120,30 +115,17 @@ public class NioDatagramChannel extends AbstractChannel implements
*/
MessageEvent currentWriteEvent;
/**
* The current write index.
*/
int currentWriteIndex;
/**
* Boolean that indicates that write operation is in progress.
*/
volatile boolean inWriteNowLoop;
/**
*
* @param factory
* @param pipeline
* @param sink
* @param worker
*/
public NioDatagramChannel(final ChannelFactory factory,
NioDatagramChannel(final ChannelFactory factory,
final ChannelPipeline pipeline, final ChannelSink sink,
final NioUdpWorker worker) {
super(null, factory, pipeline, sink);
this.worker = worker;
datagramChannel = openNonBlockingChannel();
setSoTimeout(1000);
config = new DefaultNioDatagramChannelConfig(datagramChannel.socket());
fireChannelOpen(this);
@ -159,27 +141,22 @@ public class NioDatagramChannel extends AbstractChannel implements
}
}
private void setSoTimeout(final int timeout) {
public InetSocketAddress getLocalAddress() {
try {
datagramChannel.socket().setSoTimeout(timeout);
} catch (final IOException e) {
try {
datagramChannel.close();
} catch (final IOException e2) {
logger.warn("Failed to close a partially DatagramSocket.", e2);
}
throw new ChannelException(
"Failed to set the DatagramSocket timeout.", e);
return (InetSocketAddress) datagramChannel.socket().getLocalSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) datagramChannel.socket()
.getLocalSocketAddress();
}
public SocketAddress getRemoteAddress() {
return datagramChannel.socket().getRemoteSocketAddress();
public InetSocketAddress getRemoteAddress() {
try {
return (InetSocketAddress) datagramChannel.socket().getRemoteSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
public boolean isBound() {
@ -187,7 +164,7 @@ public class NioDatagramChannel extends AbstractChannel implements
}
public boolean isConnected() {
return datagramChannel.socket().isBound();
return datagramChannel.socket().isConnected();
}
@Override
@ -195,11 +172,6 @@ public class NioDatagramChannel extends AbstractChannel implements
return super.setClosed();
}
@Override
protected ChannelFuture getSucceededFuture() {
return super.getSucceededFuture();
}
public NioDatagramChannelConfig getConfig() {
return config;
}
@ -208,6 +180,37 @@ public class NioDatagramChannel extends AbstractChannel implements
return datagramChannel;
}
@Override
public int getInterestOps() {
if (!isOpen()) {
return Channel.OP_WRITE;
}
int interestOps = getRawInterestOps();
int writeBufferSize = this.writeBufferSize.get();
if (writeBufferSize != 0) {
if (highWaterMarkCounter.get() > 0) {
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (writeBufferSize >= lowWaterMark) {
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
} else {
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (writeBufferSize >= highWaterMark) {
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
}
} else {
interestOps &= ~Channel.OP_WRITE;
}
return interestOps;
}
int getRawInterestOps() {
return super.getInterestOps();
}
@ -216,6 +219,15 @@ public class NioDatagramChannel extends AbstractChannel implements
super.setInterestOpsNow(interestOps);
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
return super.write(message, null);
} else {
return super.write(message, remoteAddress);
}
}
/**
* WriteBuffer is an extension of {@link LinkedTransferQueue} that adds
* support for highWaterMark checking of the write buffer size.
@ -233,21 +245,17 @@ public class NioDatagramChannel extends AbstractChannel implements
* adds support for keeping track of the size of the this write buffer.
*/
@Override
public boolean offer(final MessageEvent e) {
final boolean success = super.offer(e);
public boolean offer(MessageEvent e) {
boolean success = super.offer(e);
assert success;
final int messageSize = ((ChannelBuffer) e.getMessage())
.readableBytes();
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
final int newWriteBufferSize = writeBufferSize
.addAndGet(messageSize);
final int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (newWriteBufferSize >= highWaterMark) {
if (newWriteBufferSize - messageSize < highWaterMark) {
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(NioDatagramChannel.this);
@ -255,7 +263,6 @@ public class NioDatagramChannel extends AbstractChannel implements
}
}
}
return true;
}
@ -265,18 +272,13 @@ public class NioDatagramChannel extends AbstractChannel implements
*/
@Override
public MessageEvent poll() {
final MessageEvent e = super.poll();
MessageEvent e = super.poll();
if (e != null) {
final int messageSize = ((ChannelBuffer) e.getMessage())
.readableBytes();
final int newWriteBufferSize = writeBufferSize
.addAndGet(-messageSize);
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
final int lowWaterMark = getConfig()
.getWriteBufferLowWaterMark();
if (newWriteBufferSize == 0 ||
newWriteBufferSize < lowWaterMark) {
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (newWriteBufferSize + messageSize >= lowWaterMark) {
highWaterMarkCounter.decrementAndGet();
if (!notifying.get()) {
@ -305,4 +307,22 @@ public class NioDatagramChannel extends AbstractChannel implements
NioUdpWorker.write(NioDatagramChannel.this, false);
}
}
public void joinGroup(InetAddress multicastAddress) {
throw new UnsupportedOperationException();
}
public void joinGroup(InetSocketAddress multicastAddress,
NetworkInterface networkInterface) {
throw new UnsupportedOperationException();
}
public void leaveGroup(InetAddress multicastAddress) {
throw new UnsupportedOperationException();
}
public void leaveGroup(InetSocketAddress multicastAddress,
NetworkInterface networkInterface) {
throw new UnsupportedOperationException();
}
}

View File

@ -25,14 +25,14 @@ package org.jboss.netty.channel.socket.nio;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ServerChannelFactory;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.util.internal.ExecutorUtil;
/**
* A {@link NioDatagramChannelFactory} creates a server-side NIO-based
* {@link NioDatagramChannel}. It utilizes the non-blocking I/O mode which
* {@link DatagramChannel}. It utilizes the non-blocking I/O mode which
* was introduced with NIO to serve many number of concurrent connections
* efficiently.
*
@ -59,8 +59,7 @@ import org.jboss.netty.util.internal.ExecutorUtil;
*
* @version $Rev$, $Date$
*/
public class NioDatagramChannelFactory implements ChannelFactory,
ServerChannelFactory {
public class NioDatagramChannelFactory implements DatagramChannelFactory {
/**
*
*/
@ -102,7 +101,7 @@ public class NioDatagramChannelFactory implements ChannelFactory,
sink = new NioDatagramPipelineSink(workerExecutor, workerCount);
}
public NioDatagramChannel newChannel(final ChannelPipeline pipeline) {
public DatagramChannel newChannel(final ChannelPipeline pipeline) {
return new NioDatagramChannel(this, pipeline, sink, sink.nextWorker());
}

View File

@ -47,7 +47,7 @@ import org.jboss.netty.channel.MessageEvent;
*
* @version $Rev$, $Date$
*/
public class NioDatagramPipelineSink extends AbstractChannelSink {
class NioDatagramPipelineSink extends AbstractChannelSink {
private static final AtomicInteger nextId = new AtomicInteger();
@ -74,8 +74,9 @@ public class NioDatagramPipelineSink extends AbstractChannelSink {
/**
* Handle downstream event.
*
* @param pipeline The channelpiple line that passed down the downstream event.
* @param event The downstream event.
* @param pipeline the {@link ChannelPipeline} that passes down the
* downstream event.
* @param e The downstream event.
*/
public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e)
throws Exception {
@ -98,6 +99,14 @@ public class NioDatagramPipelineSink extends AbstractChannelSink {
NioUdpWorker.close(channel, future);
}
break;
case CONNECTED:
// TODO Implement me
if (value != null) {
//connect(channel, future, (SocketAddress) value);
} else {
//NioUdpWorker.disconnect(channel, future);
}
break;
case INTEREST_OPS:
NioUdpWorker.setInterestOps(channel, future, ((Integer) value)
.intValue());

View File

@ -49,6 +49,7 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadRenamingRunnable;
@ -71,12 +72,6 @@ class NioUdpWorker implements Runnable {
private static final InternalLogger logger = InternalLoggerFactory
.getInstance(NioUdpWorker.class);
/**
* Maximum packate size for UDP packets.
* 65,536-byte maximum size of an IP datagram minus the 20-byte size of the IP header and the 8-byte size of the UDP header.
*/
private static int MAX_PACKET_SIZE = 65507;
/**
* This id of this worker.
*/
@ -321,17 +316,21 @@ class NioUdpWorker implements Runnable {
private static void processSelectedKeys(final Set<SelectionKey> selectedKeys) {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
final SelectionKey key = i.next();
SelectionKey k = i.next();
i.remove();
try {
if (key.isReadable()) {
read(key);
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_READ) != 0) {
if (!read(k)) {
// Connection already closed - no need to handle write.
continue;
}
}
if (key.isWritable()) {
write(key);
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
write(k);
}
} catch (final CancelledKeyException ignore) {
close(key);
} catch (CancelledKeyException e) {
close(k);
}
}
}
@ -347,40 +346,57 @@ class NioUdpWorker implements Runnable {
*
* @param key The selection key which contains the Selector registration information.
*/
private static void read(final SelectionKey key) {
final NioDatagramChannel nioDatagramChannel = (NioDatagramChannel) key
.attachment();
private static boolean read(final SelectionKey key) {
final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final DatagramChannel datagramChannel = (DatagramChannel) key.channel();
final DatagramChannel nioChannel = (DatagramChannel) key.channel();
// Allocating a non-direct buffer with a max udp packge size.
// Would using a direct buffer be more efficient or would this negatively
// effect performance, as direct buffer allocation has a higher upfront cost
// where as a ByteBuffer is heap allocated.
final ByteBuffer byteBuffer = ByteBuffer.allocate(predictor.nextReceiveBufferSize());
boolean failure = true;
SocketAddress remoteAddress = null;
try {
// Allocating a non-direct buffer with a max udp packge size.
// Would using a direct buffer be more efficient or would this negatively
// effect performance, as direct buffer allocation has a higher upfront cost
// where as a ByteBuffer is heap allocated.
final ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_PACKET_SIZE);
// Recieve from the channel in a non blocking mode. We have already been notified that
// Receive from the channel in a non blocking mode. We have already been notified that
// the channel is ready to receive.
final SocketAddress remoteAddress = datagramChannel
.receive(byteBuffer);
remoteAddress = nioChannel.receive(byteBuffer);
failure = false;
} catch (AsynchronousCloseException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}
if (remoteAddress != null) {
// Flip the buffer so that we can wrap it.
byteBuffer.flip();
// Create a Netty ChannelByffer by wrapping the ByteBuffer.
final ChannelBuffer channelBuffer = ChannelBuffers
.wrappedBuffer(byteBuffer);
logger.debug("ChannelBuffer : " + channelBuffer +
", remoteAdress: " + remoteAddress);
int readBytes = byteBuffer.remaining();
if (readBytes > 0) {
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
// Notify the interested parties about the newly arrived message (channelBuffer).
fireMessageReceived(nioDatagramChannel, channelBuffer,
remoteAddress);
} catch (final Throwable t) {
if (!nioDatagramChannel.getDatagramChannel().socket().isClosed()) {
fireExceptionCaught(nioDatagramChannel, t);
// Create a Netty ChannelByffer by wrapping the ByteBuffer.
final ChannelBuffer channelBuffer = ChannelBuffers
.wrappedBuffer(byteBuffer);
// Notify the interested parties about the newly arrived message (channelBuffer).
fireMessageReceived(channel, channelBuffer,
remoteAddress);
}
}
if (failure) {
close(key);
return false;
}
return true;
}
private static void close(SelectionKey k) {
@ -393,7 +409,7 @@ class NioUdpWorker implements Runnable {
/*
* Note that we are not checking if the channel is connected. Connected has a different
* meaning in UDP and means that the channels socket is configured to only send and
* recieve from a given remote peer.
* receive from a given remote peer.
*/
if (!channel.isOpen()) {
cleanUpWriteBuffer(channel);
@ -444,7 +460,6 @@ class NioUdpWorker implements Runnable {
MessageEvent evt;
ChannelBuffer buf;
int bufIdx;
int writtenBytes = 0;
Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
@ -465,33 +480,30 @@ class NioUdpWorker implements Runnable {
}
buf = (ChannelBuffer) evt.getMessage();
bufIdx = buf.readerIndex();
} else {
buf = (ChannelBuffer) evt.getMessage();
bufIdx = channel.currentWriteIndex;
}
try {
int localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) {
ChannelBuffer buffer = (ChannelBuffer) evt.getMessage();
int localWrittenBytes = channel.getDatagramChannel()
.send(buffer.toByteBuffer(),
evt.getRemoteAddress());
localWrittenBytes =
channel.getDatagramChannel().send(
buf.toByteBuffer(),
evt.getRemoteAddress());
if (localWrittenBytes != 0) {
bufIdx += localWrittenBytes;
writtenBytes += localWrittenBytes;
break;
}
}
if (bufIdx == buf.writerIndex()) {
if (localWrittenBytes > 0) {
// Successful write - proceed to the next message.
evt.getFuture().setSuccess();
evt = null;
} else {
// Not written fully - perhaps the kernel buffer is full.
// Not written at all - perhaps the kernel buffer is full.
channel.currentWriteEvent = evt;
channel.currentWriteIndex = bufIdx;
addOpWrite = true;
break;
}
@ -590,7 +602,7 @@ class NioUdpWorker implements Runnable {
key.cancel();
}
boolean connected = channel.isOpen();
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
try {
channel.getDatagramChannel().close();
@ -615,33 +627,46 @@ class NioUdpWorker implements Runnable {
}
private static void cleanUpWriteBuffer(final NioDatagramChannel channel) {
// Create the exception only once to avoid the excessive overhead
// caused by fillStackTrace.
Exception cause;
if (channel.isOpen()) {
cause = new NotYetConnectedException();
} else {
cause = new ClosedChannelException();
}
Exception cause = null;
// Clean up the stale messages in the write buffer.
synchronized (channel.writeLock) {
MessageEvent evt = channel.currentWriteEvent;
if (evt != null) {
channel.currentWriteEvent = null;
channel.currentWriteIndex = 0;
// Create the exception only once to avoid the excessive overhead
// caused by fillStackTrace.
if (channel.isOpen()) {
cause = new NotYetConnectedException();
} else {
cause = new ClosedChannelException();
}
evt.getFuture().setFailure(cause);
fireExceptionCaught(channel, cause);
}
Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
for (;;) {
evt = writeBuffer.poll();
if (evt == null) {
break;
if (!writeBuffer.isEmpty()) {
// Create the exception only once to avoid the excessive overhead
// caused by fillStackTrace.
if (cause == null) {
if (channel.isOpen()) {
cause = new NotYetConnectedException();
} else {
cause = new ClosedChannelException();
}
}
for (;;) {
evt = writeBuffer.poll();
if (evt == null) {
break;
}
evt.getFuture().setFailure(cause);
fireExceptionCaught(channel, cause);
}
evt.getFuture().setFailure(cause);
fireExceptionCaught(channel, cause);
}
}
}
@ -767,6 +792,7 @@ class NioUdpWorker implements Runnable {
throw new ChannelException(
"Failed to register a socket to the selector.", e);
}
// XXX
fireChannelConnected(channel, localAddress);
}
}

View File

@ -29,8 +29,7 @@ import java.net.DatagramPacket;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
@ -39,8 +38,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
/**
* Unit test for {@link NioDatagramChannel}
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Daniel Bevenius (dbevenius@jboss.com)
* @version $Rev$, $Date$
@ -54,7 +51,7 @@ public class NioDatagramChannelTest {
public static void setupChannel() {
final NioDatagramChannelFactory channelFactory = new NioDatagramChannelFactory(
Executors.newCachedThreadPool());
final ServerBootstrap sb = new ServerBootstrap(channelFactory);
final ConnectionlessBootstrap sb = new ConnectionlessBootstrap(channelFactory);
inetSocketAddress = new InetSocketAddress("localhost", 9999);
sc = sb.bind(inetSocketAddress);
final SimpleHandler handler = new SimpleHandler();
@ -85,7 +82,7 @@ public class NioDatagramChannelTest {
public void clientBootstrap() {
final NioDatagramChannelFactory channelFactory = new NioDatagramChannelFactory(
Executors.newCachedThreadPool());
final ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
final ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(channelFactory);
bootstrap.getPipeline().addLast("test", new SimpleHandler());
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);