diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java new file mode 100644 index 0000000000..a8f5b5fcdc --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java @@ -0,0 +1,165 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2008, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.nio; + +import java.net.DatagramSocket; +import java.util.Map; + +import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictor; +import org.jboss.netty.channel.ReceiveBufferSizePredictor; +import org.jboss.netty.channel.socket.DefaultDatagramChannelConfig; +import org.jboss.netty.logging.InternalLogger; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.internal.ConversionUtil; + +/** + * The default {@link NioSocketChannelConfig} implementation. + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * + * @version $Rev$, $Date$ + * + */ +class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig + implements NioDatagramChannelConfig { + + private static final InternalLogger logger = + InternalLoggerFactory + .getInstance(DefaultNioDatagramChannelConfig.class); + + private volatile int writeBufferHighWaterMark = 64 * 1024; + private volatile int writeBufferLowWaterMark = 32 * 1024; + private volatile ReceiveBufferSizePredictor predictor = + new AdaptiveReceiveBufferSizePredictor(); + private volatile int writeSpinCount = 16; + + DefaultNioDatagramChannelConfig(DatagramSocket socket) { + super(socket); + } + + @Override + public void setOptions(Map options) { + super.setOptions(options); + if (getWriteBufferHighWaterMark() < getWriteBufferLowWaterMark()) { + // Recover the integrity of the configuration with a sensible value. + setWriteBufferLowWaterMark0(getWriteBufferHighWaterMark() >>> 1); + // Notify the user about misconfiguration. + logger.warn("writeBufferLowWaterMark cannot be greater than " + + "writeBufferHighWaterMark; setting to the half of the " + + "writeBufferHighWaterMark."); + } + } + + @Override + public boolean setOption(String key, Object value) { + if (super.setOption(key, value)) { + return true; + } + + if (key.equals("writeBufferHighWaterMark")) { + setWriteBufferHighWaterMark0(ConversionUtil.toInt(value)); + } else if (key.equals("writeBufferLowWaterMark")) { + setWriteBufferLowWaterMark0(ConversionUtil.toInt(value)); + } else if (key.equals("writeSpinCount")) { + setWriteSpinCount(ConversionUtil.toInt(value)); + } else if (key.equals("receiveBufferSizePredictor")) { + setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value); + } else { + return false; + } + return true; + } + + public int getWriteBufferHighWaterMark() { + return writeBufferHighWaterMark; + } + + public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + if (writeBufferHighWaterMark < getWriteBufferLowWaterMark()) { + throw new IllegalArgumentException( + "writeBufferHighWaterMark cannot be less than " + + "writeBufferLowWaterMark (" + + getWriteBufferLowWaterMark() + "): " + + writeBufferHighWaterMark); + } + setWriteBufferHighWaterMark0(writeBufferHighWaterMark); + } + + private void setWriteBufferHighWaterMark0(int writeBufferHighWaterMark) { + if (writeBufferHighWaterMark < 0) { + throw new IllegalArgumentException("writeBufferHighWaterMark: " + + writeBufferHighWaterMark); + } + this.writeBufferHighWaterMark = writeBufferHighWaterMark; + } + + public int getWriteBufferLowWaterMark() { + return writeBufferLowWaterMark; + } + + public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + if (writeBufferLowWaterMark > getWriteBufferHighWaterMark()) { + throw new IllegalArgumentException( + "writeBufferLowWaterMark cannot be greater than " + + "writeBufferHighWaterMark (" + + getWriteBufferHighWaterMark() + "): " + + writeBufferLowWaterMark); + } + setWriteBufferLowWaterMark0(writeBufferLowWaterMark); + } + + private void setWriteBufferLowWaterMark0(int writeBufferLowWaterMark) { + if (writeBufferLowWaterMark < 0) { + throw new IllegalArgumentException("writeBufferLowWaterMark: " + + writeBufferLowWaterMark); + } + this.writeBufferLowWaterMark = writeBufferLowWaterMark; + } + + public int getWriteSpinCount() { + return writeSpinCount; + } + + public void setWriteSpinCount(int writeSpinCount) { + if (writeSpinCount <= 0) { + throw new IllegalArgumentException( + "writeSpinCount must be a positive integer."); + } + this.writeSpinCount = writeSpinCount; + } + + @Override + public ReceiveBufferSizePredictor getReceiveBufferSizePredictor() { + return predictor; + } + + @Override + public void setReceiveBufferSizePredictor( + ReceiveBufferSizePredictor predictor) { + if (predictor == null) { + throw new NullPointerException("predictor"); + } + this.predictor = predictor; + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java new file mode 100644 index 0000000000..993d3ad915 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java @@ -0,0 +1,320 @@ +/* + * JBoss, Home of Professional Open Source Copyright 2009, Red Hat Middleware + * LLC, and individual contributors by the @authors tag. See the copyright.txt + * in the distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License as published by the Free + * Software Foundation; either version 2.1 of the License, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this software; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF + * site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.nio; + +import static org.jboss.netty.channel.Channels.*; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.DatagramChannel; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.AbstractChannel; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelSink; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.ServerChannel; +import org.jboss.netty.channel.socket.DatagramChannelConfig; +import org.jboss.netty.logging.InternalLogger; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.internal.LinkedTransferQueue; +import org.jboss.netty.util.internal.ThreadLocalBoolean; + +/** + * NioDatagramChannel provides a connection less NIO UDP channel for Netty. + *

+ * + * @author Daniel Bevenius + * + */ +public class NioDatagramChannel extends AbstractChannel implements + ServerChannel { + /** + * Internal Netty logger. + */ + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(NioDatagramChannel.class); + + /** + * The {@link DatagramChannelConfig}. + */ + private final NioDatagramChannelConfig config; + + /** + * The {@link NioUdpWorker} for this NioDatagramChannnel. + */ + final NioUdpWorker worker; + + /** + * The {@link DatagramChannel} that this channel uses. + */ + private final DatagramChannel datagramChannel; + + /** + * + */ + volatile ChannelFuture connectFuture; + + /** + * + */ + final Object interestOpsLock = new Object(); + + /** + * + */ + final Object writeLock = new Object(); + + /** + * + */ + final Runnable writeTask = new WriteTask(); + + /** + * Indicates if there is a {@link WriteTask} in the task queue. + */ + final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); + + /** + * + */ + final Queue writeBufferQueue = new WriteBufferQueue(); + + /** + * Keeps track of the number of bytes that the {@link WriteBufferQueue} currently + * contains. + */ + final AtomicInteger writeBufferSize = new AtomicInteger(); + + /** + * + */ + final AtomicInteger highWaterMarkCounter = new AtomicInteger(); + + /** + * + */ + MessageEvent currentWriteEvent; + + /** + * + */ + int currentWriteIndex; + + /** + * + */ + volatile boolean inWriteNowLoop; + + /** + * + * @param factory + * @param pipeline + * @param sink + * @param worker + */ + public NioDatagramChannel(final ChannelFactory factory, + final ChannelPipeline pipeline, final ChannelSink sink, + final NioUdpWorker worker) { + super(null, factory, pipeline, sink); + this.worker = worker; + datagramChannel = openNonBlockingChannel(); + setSoTimeout(1000); + config = new DefaultNioDatagramChannelConfig(datagramChannel.socket()); + + fireChannelOpen(this); + } + + private DatagramChannel openNonBlockingChannel() { + try { + final DatagramChannel channel = DatagramChannel.open(); + channel.configureBlocking(false); + return channel; + } catch (final IOException e) { + throw new ChannelException("Failed to open a DatagramChannel.", e); + } + } + + private void setSoTimeout(final int timeout) { + try { + datagramChannel.socket().setSoTimeout(timeout); + } catch (final IOException e) { + try { + datagramChannel.close(); + } catch (final IOException e2) { + logger.warn("Failed to close a partially DatagramSocket.", e2); + } + throw new ChannelException( + "Failed to set the DatagramSocket timeout.", e); + } + } + + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) datagramChannel.socket() + .getLocalSocketAddress(); + } + + public SocketAddress getRemoteAddress() { + return datagramChannel.socket().getRemoteSocketAddress(); + } + + public boolean isBound() { + return isOpen() && datagramChannel.socket().isBound(); + } + + public boolean isConnected() { + return datagramChannel.socket().isBound(); + } + + @Override + protected boolean setClosed() { + return super.setClosed(); + } + + @Override + protected ChannelFuture getSucceededFuture() { + return super.getSucceededFuture(); + } + + public NioDatagramChannelConfig getConfig() { + return config; + } + + DatagramChannel getDatagramChannel() { + return datagramChannel; + } + + int getRawInterestOps() { + return super.getInterestOps(); + } + + void setRawInterestOpsNow(int interestOps) { + super.setInterestOpsNow(interestOps); + } + + /** + * WriteBuffer is an extension of {@link LinkedTransferQueue} that adds + * support for highWaterMark checking of the write buffer size. + * + * @author Daniel Bevenius + */ + private final class WriteBufferQueue extends + LinkedTransferQueue { + private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); + + WriteBufferQueue() { + super(); + } + + /** + * This method first delegates to {@link LinkedTransferQueue#offer(Object)} and + * adds support for keeping track of the size of the this write buffer. + */ + @Override + public boolean offer(final MessageEvent e) { + final boolean success = super.offer(e); + assert success; + + final int messageSize = + ((ChannelBuffer) e.getMessage()).readableBytes(); + + // Add the ChannelBuffers size to the writeBuffersSize + final int newWriteBufferSize = + writeBufferSize.addAndGet(messageSize); + + final int highWaterMark = getConfig().getWriteBufferHighWaterMark(); + // Check if the newly calculated buffersize exceeds the highWaterMark limit. + if (newWriteBufferSize >= highWaterMark) { + // Check to see if the messages size we are adding is what will cause the highWaterMark to be breached. + if (newWriteBufferSize - messageSize < highWaterMark) { + // Increment the highWaterMarkCounter which track of the fact that the count + // has been reached. + highWaterMarkCounter.incrementAndGet(); + + if (!notifying.get()) { + notifying.set(Boolean.TRUE); + fireChannelInterestChanged(NioDatagramChannel.this); + notifying.set(Boolean.FALSE); + } + } + } + + return true; + } + + /** + * This method first delegates to {@link LinkedTransferQueue#poll()} and + * adds support for keeping track of the size of the this writebuffers queue. + */ + @Override + public MessageEvent poll() { + final MessageEvent e = super.poll(); + if (e != null) { + final int messageSize = + ((ChannelBuffer) e.getMessage()).readableBytes(); + // Subtract the ChannelBuffers size from the writeBuffersSize + final int newWriteBufferSize = + writeBufferSize.addAndGet(-messageSize); + + final int lowWaterMark = + getConfig().getWriteBufferLowWaterMark(); + + // Check if the newly calculated buffersize exceeds the lowhWaterMark limit. + if (newWriteBufferSize == 0 || + newWriteBufferSize < lowWaterMark) { + if (newWriteBufferSize + messageSize >= lowWaterMark) { + highWaterMarkCounter.decrementAndGet(); + if (!notifying.get()) { + notifying.set(Boolean.TRUE); + fireChannelInterestChanged(NioDatagramChannel.this); + notifying.set(Boolean.FALSE); + } + } + } + } + return e; + } + } + + /** + * WriteTask is a simple runnable performs writes by delegating the {@link NioUdpWorker}. + * + * @author Daniel Bevenius + * + */ + private final class WriteTask implements Runnable { + WriteTask() { + super(); + } + + public void run() { + writeTaskInTaskQueue.set(false); + NioUdpWorker.write(NioDatagramChannel.this, false); + } + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelConfig.java new file mode 100644 index 0000000000..b4b77c8543 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelConfig.java @@ -0,0 +1,100 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2008, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.nio; + +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictor; +import org.jboss.netty.channel.ChannelConfig; +import org.jboss.netty.channel.ReceiveBufferSizePredictor; +import org.jboss.netty.channel.socket.DatagramChannel; +import org.jboss.netty.channel.socket.DatagramChannelConfig; + +/** + * A {@link DatagramChannelConfig} for a NIO TCP/IP {@link DatagramChannel}. + * + *

Available options

+ * + * In addition to the options provided by {@link ChannelConfig} and + * {@link DatagramChannelConfig}, {@link NioDatagramChannelConfig} allows the + * following options in the option map: + * + * + * + * + * + * + * + * + * + *
NameAssociated setter method
{@code "writeSpinCount"}{@link #setWriteSpinCount(int)}
{@code "receiveBufferSizePredictor"}{@link #setReceiveBufferSizePredictor(ReceiveBufferSizePredictor)}
+ * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * + * @version $Rev$, $Date$ + */ +public interface NioDatagramChannelConfig extends DatagramChannelConfig { + + int getWriteBufferHighWaterMark(); + void setWriteBufferHighWaterMark(int writeBufferHighWaterMark); + + int getWriteBufferLowWaterMark(); + void setWriteBufferLowWaterMark(int writeBufferLowWaterMark); + + /** + * Returns the maximum loop count for a write operation until + * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. + * It is similar to what a spin lock is used for in concurrency programming. + * It improves memory utilization and write throughput depending on + * the platform that JVM runs on. The default value is {@code 16}. + */ + int getWriteSpinCount(); + + /** + * Sets the maximum loop count for a write operation until + * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. + * It is similar to what a spin lock is used for in concurrency programming. + * It improves memory utilization and write throughput depending on + * the platform that JVM runs on. The default value is {@code 16}. + * + * @throws IllegalArgumentException + * if the specified value is {@code 0} or less than {@code 0} + */ + void setWriteSpinCount(int writeSpinCount); + + /** + * Returns the {@link ReceiveBufferSizePredictor} which predicts the + * number of readable bytes in the socket receive buffer. The default + * predictor is {@link AdaptiveReceiveBufferSizePredictor}. + */ + ReceiveBufferSizePredictor getReceiveBufferSizePredictor(); + + /** + * Sets the {@link ReceiveBufferSizePredictor} which predicts the + * number of readable bytes in the socket receive buffer. The default + * predictor is {@link AdaptiveReceiveBufferSizePredictor}. + */ + void setReceiveBufferSizePredictor(ReceiveBufferSizePredictor predictor); +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java new file mode 100644 index 0000000000..e0cd4008f5 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java @@ -0,0 +1,107 @@ +/* + * JBoss, Home of Professional Open Source Copyright 2009, Red Hat Middleware + * LLC, and individual contributors by the @authors tag. See the copyright.txt + * in the distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License as published by the Free + * Software Foundation; either version 2.1 of the License, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this software; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF + * site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.nio; + +import java.util.concurrent.Executor; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ServerChannelFactory; +import org.jboss.netty.util.internal.ExecutorUtil; + +/** + * A {@link NioDatagramChannelFactory} creates a server-side NIO-based + * {@link NioDatagramChannel}. It utilizes the non-blocking I/O mode which + * was introduced with NIO to serve many number of concurrent connections + * efficiently. + * + *

How threads work

+ *

+ * There is only one type of thread in a {@link NioDatagramChannelFactory}, + * as opposed to the {@link NioServerSocketChannelFactory} where there is one boss + * thread and a worker thread. + * The boss thread in {@link NioServerSocketChannelFactory} performs the accept of + * incoming connection and then passes the accepted Channel to on of the worker + * threads. As DatagramChannels can act as both server (listener) and client (sender) + * hence there is no concept of ServerSocketChannel and SocketChannel whic means that + * there nothing to accept accept. This is the reason that there is only worker theads. + * + *

Worker threads

+ *

+ * One {@link NioDatagramChannelFactory} can have one or more worker + * threads. A worker thread performs non-blocking read and write for one or + * more {@link Channel}s in a non-blocking mode. + * + * @author Daniel Bevenius + * + */ +public class NioDatagramChannelFactory implements ChannelFactory, + ServerChannelFactory { + /** + * + */ + private final Executor workerExecutor; + + /** + * + */ + private final NioDatagramPipelineSink sink; + + /** + * + * @param workerExecutor the {@link Executor} which will execute the I/O worker threads + */ + public NioDatagramChannelFactory(final Executor workerExecutor) { + this(workerExecutor, Runtime.getRuntime().availableProcessors()); + } + + /** + * Creates a new instance. + * + * @param workerExecutor the {@link Executor} which will execute the I/O worker threads + * @param workerCount the maximum number of I/O worker threads + */ + public NioDatagramChannelFactory(final Executor workerExecutor, + final int workerCount) { + if (workerCount <= 0) { + throw new IllegalArgumentException(String + .format("workerCount (%s) must be a positive integer.", + workerCount)); + } + + if (workerExecutor == null) { + throw new NullPointerException( + "workerExecutor argument must not be null"); + } + this.workerExecutor = workerExecutor; + + sink = new NioDatagramPipelineSink(workerExecutor, workerCount); + } + + public NioDatagramChannel newChannel(final ChannelPipeline pipeline) { + return new NioDatagramChannel(this, pipeline, sink, sink.nextWorker()); + } + + public void releaseExternalResources() { + ExecutorUtil.terminate(workerExecutor); + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java new file mode 100644 index 0000000000..595a9f6ea0 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -0,0 +1,191 @@ +/* + * JBoss, Home of Professional Open Source Copyright 2009, Red Hat Middleware + * LLC, and individual contributors by the @authors tag. See the copyright.txt + * in the distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License as published by the Free + * Software Foundation; either version 2.1 of the License, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this software; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF + * site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.nio; + +import static org.jboss.netty.channel.Channels.*; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +import org.jboss.netty.channel.AbstractChannelSink; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelState; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.logging.InternalLogger; +import org.jboss.netty.logging.InternalLoggerFactory; + +/** + * NioDatagramPipelineSink receives downstream events from a ChannelPipeline. + *

+ * A {@link NioDatagramPipelineSink} contains an array of {@link NioUdpWorker}s + * + * @author Daniel Bevenius + */ +public class NioDatagramPipelineSink extends AbstractChannelSink { + /** + * Internal Netty logger. + */ + private final InternalLogger logger = + InternalLoggerFactory.getInstance(NioDatagramPipelineSink.class); + + private static final AtomicInteger nextId = new AtomicInteger(); + + private final int id = nextId.incrementAndGet(); + + private final NioUdpWorker[] workers; + + private final AtomicInteger workerIndex = new AtomicInteger(); + + /** + * Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioUdpWorker}s specified in workerCount. + * The {@link NioUdpWorker}s take care of reading and writing for the {@link NioDatagramChannel}. + * + * @param workerExecutor + * @param workerCount The number of UdpWorkers for this sink. + */ + NioDatagramPipelineSink(final Executor workerExecutor, final int workerCount) { + workers = new NioUdpWorker[workerCount]; + for (int i = 0; i < workers.length; i ++) { + workers[i] = new NioUdpWorker(id, i + 1, workerExecutor); + } + } + + /** + * Handle downstream event. + * + * @param pipeline The channelpiple line that passed down the downstream event. + * @param event The downstream event. + */ + public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) + throws Exception { + final NioDatagramChannel channel = (NioDatagramChannel) e.getChannel(); + final ChannelFuture future = e.getFuture(); + if (e instanceof ChannelStateEvent) { + final ChannelStateEvent stateEvent = (ChannelStateEvent) e; + final ChannelState state = stateEvent.getState(); + final Object value = stateEvent.getValue(); + switch (state) { + case OPEN: + if (Boolean.FALSE.equals(value)) { + NioUdpWorker.close(channel, future); + } + break; + case BOUND: + if (value != null) { + bind(channel, future, (InetSocketAddress) value); + } else { + NioUdpWorker.close(channel, future); + } + break; + case INTEREST_OPS: + NioUdpWorker.setInterestOps(channel, future, ((Integer) value) + .intValue()); + break; + } + } else if (e instanceof MessageEvent) { + final MessageEvent event = (MessageEvent) e; + final boolean offered = channel.writeBufferQueue.offer(event); + assert offered; + NioUdpWorker.write(channel, true); + } + } + + private void close(NioDatagramChannel channel, ChannelFuture future) { + try { + channel.getDatagramChannel().socket().close(); + future.setSuccess(); + if (channel.setClosed()) { + if (channel.isBound()) { + fireChannelUnbound(channel); + } + fireChannelClosed(channel); + } + } catch (final Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + + /** + * Will bind the DatagramSocket to the passed-in address. + * Every call bind will spawn a new thread using the that basically inturn + */ + private void bind(final NioDatagramChannel channel, + final ChannelFuture future, final InetSocketAddress address) { + boolean bound = false; + boolean started = false; + try { + // First bind the DatagramSocket the specified port. + channel.getDatagramChannel().socket().bind(address); + bound = true; + + future.setSuccess(); + fireChannelBound(channel, address); + + nextWorker().register(channel, null); + started = true; + } catch (final Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } finally { + if (!started && bound) { + close(channel, future); + } + } + } + + NioUdpWorker nextWorker() { + return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; + } + + /** + * The connection sematics of a NioDatagramPipelineSink are different for datagram sockets than they are for stream + * sockets. Placing a DatagramChannel into a connected state causes datagrams to be ignored from any source + * address other than the one to which the channel is connected. Unwanted packets will be dropped. + * Not sure that this makes sense for a server side component. + * + * @param channel The UdpChannel to connect from. + * @param future + * @param remoteAddress The remote address to connect to. + */ + @SuppressWarnings("unused") + private void connect(final NioDatagramChannel channel, + ChannelFuture future, SocketAddress remoteAddress) { + try { + try { + channel.getDatagramChannel().socket().connect(remoteAddress); + } catch (final IOException e) { + future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + channel.connectFuture = future; + } + } catch (final Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java new file mode 100644 index 0000000000..69db0217ba --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java @@ -0,0 +1,775 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2008, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.nio; + +import static org.jboss.netty.channel.Channels.*; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; +import java.nio.channels.NotYetConnectedException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.logging.InternalLogger; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.ThreadRenamingRunnable; +import org.jboss.netty.util.internal.LinkedTransferQueue; + +/** + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * + * @version $Rev$, $Date$ + * + */ +class NioUdpWorker implements Runnable { + /** + * Internal Netty logger. + */ + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(NioUdpWorker.class); + + /** + * + */ + private static final int CONSTRAINT_LEVEL = + NioProviderMetadata.CONSTRAINT_LEVEL; + + /** + * Maximum packate size for UDP packets. + * 65,536-byte maximum size of an IP datagram minus the 20-byte size of the IP header and the 8-byte size of the UDP header. + */ + private static int MAX_PACKET_SIZE = 65507; + + /** + * This id of this worker. + */ + private final int id; + + /** + * This id of the NioDatagramPipelineSink. + */ + private final int bossId; + + /** + * Executor used to exeucte runnables such as {@link ChannelRegistionTask}. + */ + private final Executor executor; + + /** + * Boolean to indicate if this worker has been started. + */ + private boolean started; + + /** + * If this worker has been started thread will be a reference to the thread + * used when starting. i.e. the current thread when the run method is executed. + */ + private volatile Thread thread; + + /** + * The NIO {@link Selector}. + */ + volatile Selector selector; + + /** + * + */ + private final AtomicBoolean wakenUp = new AtomicBoolean(); + + private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); + + private final Object startStopLock = new Object(); + + /** + * Queue of {@link ChannelRegistionTask}s + */ + private final Queue registerTaskQueue = + new LinkedTransferQueue(); + + /** + * Queue of + */ + private final Queue writeTaskQueue = + new LinkedTransferQueue(); + + /** + * + * @param bossId + * @param id + * @param executor + */ + NioUdpWorker(final int bossId, final int id, final Executor executor) { + this.bossId = bossId; + this.id = id; + this.executor = executor; + } + + /** + * Registers the passed-in channel with a selector. + * + * @param channel The channel to register. + * @param future + */ + void register(final NioDatagramChannel channel, final ChannelFuture future) { + final Runnable channelRegTask = + new ChannelRegistionTask(channel, future); + Selector selector; + + synchronized (startStopLock) { + if (!started) { + // Open a selector if this worker didn't start yet. + try { + this.selector = selector = Selector.open(); + } catch (final Throwable t) { + throw new ChannelException("Failed to create a selector.", + t); + } + + boolean success = false; + try { + executor.execute(new ThreadRenamingRunnable(this, + "New I/O server worker #" + bossId + "'-'" + id)); + success = true; + } finally { + if (!success) { + try { + // Release the Selector if the execution fails. + selector.close(); + } catch (final Throwable t) { + logger.warn("Failed to close a selector.", t); + } + this.selector = selector = null; + // The method will return to the caller at this point. + } + } + } else { + // Use the existing selector if this worker has been started. + selector = this.selector; + } + assert selector != null && selector.isOpen(); + + started = true; + + // "Add" the registration task to the register task queue. + boolean offered = registerTaskQueue.offer(channelRegTask); + assert offered; + } + + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } + + public void run() { + // Store a ref to the current thread. + thread = Thread.currentThread(); + + final Selector selector = this.selector; + boolean shutdown = false; + + for (;;) { + wakenUp.set(false); + + if (CONSTRAINT_LEVEL != 0) { + selectorGuard.writeLock().lock(); + // This empty synchronization block prevents the selector from acquiring its lock. + selectorGuard.writeLock().unlock(); + } + + try { + int selectedKeyCount = selector.select(500); + + // Wake up immediately in the next turn if someone might + // have waken up the selector between 'wakenUp.set(false)' + // and 'selector.select(...)'. + if (wakenUp.get()) { + selector.wakeup(); + } + + processRegisterTaskQueue(); + processWriteTaskQueue(); + + if (selectedKeyCount > 0) { + processSelectedKeys(selector.selectedKeys()); + } + + // Exit the loop when there's nothing to handle. + // The shutdown flag is used to delay the shutdown of this + // loop to avoid excessive Selector creation when + // connections are registered in a one-by-one manner instead of + // concurrent manner. + if (selector.keys().isEmpty()) { + if (shutdown || executor instanceof ExecutorService && + ((ExecutorService) executor).isShutdown()) { + synchronized (startStopLock) { + if (registerTaskQueue.isEmpty() && + selector.keys().isEmpty()) { + started = false; + try { + selector.close(); + } catch (IOException e) { + logger.warn("Failed to close a selector.", + e); + } finally { + this.selector = null; + } + break; + } else { + shutdown = false; + } + } + } else { + // Give one more second. + shutdown = true; + } + } else { + shutdown = false; + } + } catch (Throwable t) { + logger.warn("Unexpected exception in the selector loop.", t); + + // Prevent possible consecutive immediate failures that lead to + // excessive CPU consumption. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore. + } + } + } + } + + /** + * Will go through all the {@link ChannelRegistionTask}s in the + * task queue and run them (registering them). + */ + private void processRegisterTaskQueue() { + for (;;) { + final Runnable task = registerTaskQueue.poll(); + if (task == null) { + break; + } + + task.run(); + } + } + + /** + * Will go through all the WriteTasks and run them. + */ + private void processWriteTaskQueue() { + for (;;) { + final Runnable task = writeTaskQueue.poll(); + if (task == null) { + break; + } + + task.run(); + } + } + + private static void processSelectedKeys(final Set selectedKeys) { + for (Iterator i = selectedKeys.iterator(); i.hasNext();) { + final SelectionKey key = i.next(); + i.remove(); + try { + if (key.isReadable()) { + read(key); + } + if (key.isWritable()) { + write(key); + } + } catch (final CancelledKeyException ignore) { + close(key); + } + } + } + + private static void write(SelectionKey k) { + write((NioDatagramChannel) k.attachment(), false); + } + + /** + * Read is called when a Selector has been notified that the underlying channel + * was something to be read. The channel would previously have registered its interest + * in read operations. + * + * @param key The selection key which contains the Selector registration information. + */ + private static void read(final SelectionKey key) { + final NioDatagramChannel nioDatagramChannel = + (NioDatagramChannel) key.attachment(); + + final DatagramChannel datagramChannel = (DatagramChannel) key.channel(); + try { + // Allocating a non-direct buffer with a max udp packge size. + // Would using a direct buffer be more efficient or would this negatively + // effect performance, as direct buffer allocation has a higher upfront cost + // where as a ByteBuffer is heap allocated. + final ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_PACKET_SIZE); + + // Recieve from the channel in a non blocking mode. We have already been notified that + // the channel is ready to receive. + final SocketAddress remoteAddress = + datagramChannel.receive(byteBuffer); + /* + if (remoteAddress == null) + { + // No data was available so return false to indicate this. + return false; + } + */ + + // Flip the buffer so that we can wrap it. + byteBuffer.flip(); + // Create a Netty ChannelByffer by wrapping the ByteBuffer. + final ChannelBuffer channelBuffer = + ChannelBuffers.wrappedBuffer(byteBuffer); + + logger.debug("ChannelBuffer : " + channelBuffer + + ", remoteAdress: " + remoteAddress); + + // Notify the interested parties about the newly arrived message (channelBuffer). + fireMessageReceived(nioDatagramChannel, channelBuffer, + remoteAddress); + } catch (final Throwable t) { + if (!nioDatagramChannel.getDatagramChannel().socket().isClosed()) { + fireExceptionCaught(nioDatagramChannel, t); + } + } + //return true; + } + + private static void close(SelectionKey k) { + final NioDatagramChannel ch = (NioDatagramChannel) k.attachment(); + close(ch, succeededFuture(ch)); + } + + static void write(final NioDatagramChannel channel, + final boolean mightNeedWakeup) { + /* + * Note that we are not checking if the channel is connected. Connected has a different + * meaning in UDP and means that the channels socket is configured to only send and + * recieve from a given remote peer. + */ + if (!channel.isOpen()) { + cleanUpWriteBuffer(channel); + return; + } + + if (mightNeedWakeup && scheduleWriteIfNecessary(channel)) { + return; + } + + if (channel.inWriteNowLoop) { + scheduleWriteIfNecessary(channel); + } else { + writeNow(channel, channel.getConfig().getWriteSpinCount()); + } + } + + /** + * + * @param channel + * @return + */ + private static boolean scheduleWriteIfNecessary( + final NioDatagramChannel channel) { + final NioUdpWorker worker = channel.worker; + final Thread workerThread = worker.thread; + + if (workerThread == null || Thread.currentThread() != workerThread) { + if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { + // "add" the channels writeTask to the writeTaskQueue. + boolean offered = + worker.writeTaskQueue.offer(channel.writeTask); + assert offered; + } + + final Selector workerSelector = worker.selector; + if (workerSelector != null) { + if (worker.wakenUp.compareAndSet(false, true)) { + workerSelector.wakeup(); + } + } + return true; + } + + return false; + } + + private static void writeNow(final NioDatagramChannel channel, + final int writeSpinCount) { + boolean open = true; + boolean addOpWrite = false; + boolean removeOpWrite = false; + + MessageEvent evt; + ChannelBuffer buf; + int bufIdx; + int writtenBytes = 0; + + Queue writeBuffer = channel.writeBufferQueue; + synchronized (channel.writeLock) { + // inform the channel that write is inprogres + channel.inWriteNowLoop = true; + // get the write event. + evt = channel.currentWriteEvent; + + // loop forever... + for (;;) { + if (evt == null) { + evt = writeBuffer.poll(); + if (evt == null) { + channel.currentWriteEvent = null; + removeOpWrite = true; + break; + } + + buf = (ChannelBuffer) evt.getMessage(); + bufIdx = buf.readerIndex(); + } else { + buf = (ChannelBuffer) evt.getMessage(); + bufIdx = channel.currentWriteIndex; + } + + try { + for (int i = writeSpinCount; i > 0; i --) { + ChannelBuffer buffer = (ChannelBuffer) evt.getMessage(); + int localWrittenBytes = + channel.getDatagramChannel().send( + buffer.toByteBuffer(), + evt.getRemoteAddress()); + //int localWrittenBytes = buf.getBytes( bufIdx, channel.getDatagramChannel(), buf.writerIndex() - bufIdx); + if (localWrittenBytes != 0) { + bufIdx += localWrittenBytes; + writtenBytes += localWrittenBytes; + break; + } + } + + if (bufIdx == buf.writerIndex()) { + // Successful write - proceed to the next message. + evt.getFuture().setSuccess(); + evt = null; + } else { + // Not written fully - perhaps the kernel buffer is full. + channel.currentWriteEvent = evt; + channel.currentWriteIndex = bufIdx; + addOpWrite = true; + break; + } + } catch (final AsynchronousCloseException e) { + // Doesn't need a user attention - ignore. + } catch (final Throwable t) { + evt.getFuture().setFailure(t); + evt = null; + fireExceptionCaught(channel, t); + if (t instanceof IOException) { + open = false; + close(channel, succeededFuture(channel)); + } + } + } + channel.inWriteNowLoop = false; + } + + fireWriteComplete(channel, writtenBytes); + + if (open) { + if (addOpWrite) { + setOpWrite(channel); + } else if (removeOpWrite) { + clearOpWrite(channel); + } + } + } + + private static void setOpWrite(final NioDatagramChannel channel) { + NioUdpWorker worker = channel.worker; + Selector selector = worker.selector; + SelectionKey key = channel.getDatagramChannel().keyFor(selector); + if (key == null) { + return; + } + if (!key.isValid()) { + close(key); + return; + } + int interestOps; + boolean changed = false; + + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. + synchronized (channel.interestOpsLock) { + interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + interestOps |= SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; + } + } + + if (changed) { + channel.setRawInterestOpsNow(interestOps); + } + } + + private static void clearOpWrite(NioDatagramChannel channel) { + NioUdpWorker worker = channel.worker; + Selector selector = worker.selector; + SelectionKey key = channel.getDatagramChannel().keyFor(selector); + if (key == null) { + return; + } + if (!key.isValid()) { + close(key); + return; + } + int interestOps; + boolean changed = false; + + // interestOps can change at any time and at any thread. + // Acquire a lock to avoid possible race condition. + synchronized (channel.interestOpsLock) { + interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + interestOps &= ~SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; + } + } + + if (changed) { + channel.setRawInterestOpsNow(interestOps); + } + } + + static void close(final NioDatagramChannel channel, + final ChannelFuture future) { + NioUdpWorker worker = channel.worker; + Selector selector = worker.selector; + SelectionKey key = channel.getDatagramChannel().keyFor(selector); + if (key != null) { + key.cancel(); + } + + boolean connected = channel.isOpen(); + boolean bound = channel.isBound(); + try { + channel.getDatagramChannel().close(); + if (channel.setClosed()) { + future.setSuccess(); + if (connected) { + fireChannelDisconnected(channel); + } + if (bound) { + fireChannelUnbound(channel); + } + + cleanUpWriteBuffer(channel); + fireChannelClosed(channel); + } else { + future.setSuccess(); + } + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + + private static void cleanUpWriteBuffer(final NioDatagramChannel channel) { + // Create the exception only once to avoid the excessive overhead + // caused by fillStackTrace. + Exception cause; + if (channel.isOpen()) { + cause = new NotYetConnectedException(); + } else { + cause = new ClosedChannelException(); + } + + // Clean up the stale messages in the write buffer. + synchronized (channel.writeLock) { + MessageEvent evt = channel.currentWriteEvent; + if (evt != null) { + channel.currentWriteEvent = null; + channel.currentWriteIndex = 0; + evt.getFuture().setFailure(cause); + fireExceptionCaught(channel, cause); + } + + Queue writeBuffer = channel.writeBufferQueue; + for (;;) { + evt = writeBuffer.poll(); + if (evt == null) { + break; + } + evt.getFuture().setFailure(cause); + fireExceptionCaught(channel, cause); + } + } + } + + static void setInterestOps(final NioDatagramChannel channel, + ChannelFuture future, int interestOps) { + final NioUdpWorker worker = channel.worker; + final Selector selector = worker.selector; + + final SelectionKey key = channel.getDatagramChannel().keyFor(selector); + if (key == null || selector == null) { + Exception cause = new NotYetConnectedException(); + future.setFailure(cause); + fireExceptionCaught(channel, cause); + } + + boolean changed = false; + try { + // interestOps can change at any time and by any thread. + // Acquire a lock to avoid possible race condition. + synchronized (channel.interestOpsLock) { + // Override OP_WRITE flag - a user cannot change this flag. + interestOps &= ~Channel.OP_WRITE; + interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; + + switch (CONSTRAINT_LEVEL) { + case 0: + if (channel.getRawInterestOps() != interestOps) { + key.interestOps(interestOps); + if (Thread.currentThread() != worker.thread && + worker.wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + changed = true; + } + break; + case 1: + case 2: + if (channel.getRawInterestOps() != interestOps) { + if (Thread.currentThread() == worker.thread) { + key.interestOps(interestOps); + changed = true; + } else { + worker.selectorGuard.readLock().lock(); + try { + if (worker.wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + key.interestOps(interestOps); + changed = true; + } finally { + worker.selectorGuard.readLock().unlock(); + } + } + } + break; + default: + throw new Error(); + } + } + + future.setSuccess(); + if (changed) { + channel.setRawInterestOpsNow(interestOps); + fireChannelInterestChanged(channel); + } + } catch (final Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + + /** + * RegisterTask is a task responsible for registering a channel with a + * selector. + * + * @author Daniel Bevenius + * + */ + private final class ChannelRegistionTask implements Runnable { + private final NioDatagramChannel channel; + + private final ChannelFuture future; + + ChannelRegistionTask(final NioDatagramChannel channel, + final ChannelFuture future) { + this.channel = channel; + this.future = future; + } + + /** + * This runnable's task. Does the actual registering by calling the + * underlying DatagramChannels peer DatagramSocket register method. + * + */ + public void run() { + final SocketAddress localAddress = channel.getLocalAddress(); + if (localAddress == null) { + if (future != null) { + future.setFailure(new ClosedChannelException()); + } + close(channel, succeededFuture(channel)); + return; + } + + try { + // Register interest in both reads and writes. + channel.getDatagramChannel().register(selector, + SelectionKey.OP_READ | SelectionKey.OP_WRITE, channel); + if (future != null) { + future.setSuccess(); + } + } catch (final ClosedChannelException e) { + if (future != null) { + future.setFailure(e); + } + close(channel, succeededFuture(channel)); + throw new ChannelException( + "Failed to register a socket to the selector.", e); + } + + //fireChannelBound(channel, localAddress); + fireChannelConnected(channel, localAddress); + } + } +} diff --git a/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelTest.java b/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelTest.java new file mode 100644 index 0000000000..9e6df24165 --- /dev/null +++ b/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelTest.java @@ -0,0 +1,129 @@ +/* + * JBoss, Home of Professional Open Source Copyright 2009, Red Hat Middleware + * LLC, and individual contributors by the @authors tag. See the copyright.txt + * in the distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License as published by the Free + * Software Foundation; either version 2.1 of the License, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this software; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF + * site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.nio; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.util.concurrent.Executors; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Unit test for {@link NioDatagramChannel} + * + * @author Daniel Bevenius + */ +public class NioDatagramChannelTest { + private static Channel sc; + + private static InetSocketAddress inetSocketAddress; + + @BeforeClass + public static void setupChannel() { + final ServerBootstrap sb = + new ServerBootstrap(new NioDatagramChannelFactory(Executors + .newCachedThreadPool())); + inetSocketAddress = new InetSocketAddress("localhost", 9999); + sc = sb.bind(inetSocketAddress); + final SimpleHandler handler = new SimpleHandler(); + sc.getPipeline().addFirst("handler", handler); + } + + @Test + public void checkBoundPort() throws Throwable { + final InetSocketAddress socketAddress = + (InetSocketAddress) sc.getLocalAddress(); + assertEquals(9999, socketAddress.getPort()); + } + + @Test + public void sendReciveOne() throws Throwable { + final String expectedPayload = "some payload"; + sendRecive(expectedPayload); + } + + @Test + public void sendReciveMultiple() throws Throwable { + final String expectedPayload = "some payload"; + for (int i = 0; i < 1000; i ++) { + sendRecive(expectedPayload); + } + } + + public void clientBootstrap() { + final ClientBootstrap bootstrap = + new ClientBootstrap(new NioDatagramChannelFactory(Executors + .newCachedThreadPool())); + bootstrap.getPipeline().addLast("test", new SimpleHandler()); + bootstrap.setOption("tcpNoDelay", true); + bootstrap.setOption("keepAlive", true); + InetSocketAddress clientAddress = + new InetSocketAddress("localhost", 8888); + bootstrap.setOption("localAddress", clientAddress); + + ChannelFuture ccf = bootstrap.connect(inetSocketAddress); + ccf.awaitUninterruptibly(); + + Channel cc = ccf.getChannel(); + final String payload = "client payload"; + ChannelFuture write = + cc.write(ChannelBuffers.wrappedBuffer(payload.getBytes(), 0, + payload.length())); + write.awaitUninterruptibly(); + } + + @AfterClass + public static void closeChannel() { + if (sc != null) { + final ChannelFuture future = sc.close(); + if (future != null) { + future.awaitUninterruptibly(); + } + } + } + + private void sendRecive(final String expectedPayload) throws IOException { + final UdpClient udpClient = + new UdpClient(inetSocketAddress.getAddress(), inetSocketAddress + .getPort()); + final DatagramPacket dp = udpClient.send(expectedPayload.getBytes()); + + dp.setData(new byte[expectedPayload.length()]); + assertFalse("The payload should have been cleared", expectedPayload + .equals(new String(dp.getData()))); + + udpClient.receive(dp); + + assertEquals(expectedPayload, new String(dp.getData())); + udpClient.close(); + } + +} diff --git a/src/test/java/org/jboss/netty/channel/socket/nio/SimpleHandler.java b/src/test/java/org/jboss/netty/channel/socket/nio/SimpleHandler.java new file mode 100644 index 0000000000..9f2f16f9be --- /dev/null +++ b/src/test/java/org/jboss/netty/channel/socket/nio/SimpleHandler.java @@ -0,0 +1,46 @@ +/* + * JBoss, Home of Professional Open Source Copyright 2009, Red Hat Middleware + * LLC, and individual contributors by the @authors tag. See the copyright.txt + * in the distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License as published by the Free + * Software Foundation; either version 2.1 of the License, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this software; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF + * site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipelineCoverage; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelHandler; + +/** + * + * @author Daniel Bevenius + * + */ +@ChannelPipelineCoverage("all") +public class SimpleHandler extends SimpleChannelHandler { + + @Override + public void messageReceived(final ChannelHandlerContext ctx, + final MessageEvent e) throws Exception { + final ChannelBuffer cb = (ChannelBuffer) e.getMessage(); + final byte[] actual = new byte[cb.readableBytes()]; + cb.getBytes(0, actual); + //System.out.println("TestHandler payload : " + new String(actual)); + ctx.sendDownstream(e); + } +} diff --git a/src/test/java/org/jboss/netty/channel/socket/nio/UdpClient.java b/src/test/java/org/jboss/netty/channel/socket/nio/UdpClient.java new file mode 100644 index 0000000000..e1dde597e6 --- /dev/null +++ b/src/test/java/org/jboss/netty/channel/socket/nio/UdpClient.java @@ -0,0 +1,62 @@ +/* + * JBoss, Home of Professional Open Source Copyright 2009, Red Hat Middleware + * LLC, and individual contributors by the @authors tag. See the copyright.txt + * in the distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License as published by the Free + * Software Foundation; either version 2.1 of the License, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this software; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF + * site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.nio; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.SocketException; + +/** + * + * @author Daniel Bevenius + * + */ +public class UdpClient { + + private final InetAddress address; + private final DatagramSocket clientSocket; + private final int port; + + public UdpClient(final InetAddress address, final int port) + throws SocketException { + this.address = address; + this.port = port; + clientSocket = new DatagramSocket(); + clientSocket.setReuseAddress(true); + } + + public DatagramPacket send(final byte[] payload) throws IOException { + final DatagramPacket dp = + new DatagramPacket(payload, payload.length, address, port); + clientSocket.send(dp); + return dp; + } + + public void receive(final DatagramPacket dp) throws IOException { + clientSocket.receive(dp); + } + + public void close() { + clientSocket.close(); + } +}