diff --git a/pom.xml b/pom.xml
index 1f4bde641c..1fc24ddae0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -313,6 +313,11 @@
sun.misc.Unsafe
java.util.zip.Deflater
java.util.concurrent.LinkedTransferQueue
+
+ java.nio.channels.DatagramChannel
+ java.nio.channels.MembershipKey
+ java.net.StandardSocketOptions
+ java.net.StandardProtocolFamily
diff --git a/src/main/java/org/jboss/netty/channel/socket/DatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/DatagramChannel.java
index 82173e32f5..149a0f78cf 100644
--- a/src/main/java/org/jboss/netty/channel/socket/DatagramChannel.java
+++ b/src/main/java/org/jboss/netty/channel/socket/DatagramChannel.java
@@ -20,6 +20,7 @@ import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
/**
* A UDP/IP {@link Channel} which is created by {@link DatagramChannelFactory}.
@@ -35,20 +36,20 @@ public interface DatagramChannel extends Channel {
/**
* Joins a multicast group.
*/
- void joinGroup(InetAddress multicastAddress);
+ ChannelFuture joinGroup(InetAddress multicastAddress);
/**
* Joins the specified multicast group at the specified interface.
*/
- void joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface);
+ ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface);
/**
* Leaves a multicast group.
*/
- void leaveGroup(InetAddress multicastAddress);
+ ChannelFuture leaveGroup(InetAddress multicastAddress);
/**
* Leaves a multicast group on a specified local interface.
*/
- void leaveGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface);
+ ChannelFuture leaveGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface);
}
diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java
index b99186ee22..c2aee91938 100644
--- a/src/main/java/org/jboss/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java
+++ b/src/main/java/org/jboss/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java
@@ -15,13 +15,21 @@
*/
package org.jboss.netty.channel.socket.nio;
-import java.net.DatagramSocket;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.StandardSocketOptions;
+import java.nio.channels.DatagramChannel;
+import java.util.Enumeration;
import java.util.Map;
+import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.socket.DefaultDatagramChannelConfig;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.ConversionUtil;
+import org.jboss.netty.util.internal.DetectionUtil;
/**
* The default {@link NioSocketChannelConfig} implementation.
@@ -36,9 +44,11 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig
private volatile int writeBufferHighWaterMark = 64 * 1024;
private volatile int writeBufferLowWaterMark = 32 * 1024;
private volatile int writeSpinCount = 16;
-
- DefaultNioDatagramChannelConfig(DatagramSocket socket) {
- super(socket);
+ private final DatagramChannel channel;
+
+ DefaultNioDatagramChannelConfig(DatagramChannel channel) {
+ super(channel.socket());
+ this.channel = channel;
}
@Override
@@ -132,4 +142,108 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig
}
this.writeSpinCount = writeSpinCount;
}
+
+ @Override
+ public void setNetworkInterface(NetworkInterface networkInterface) {
+ if (DetectionUtil.javaVersion() < 7) {
+ throw new UnsupportedOperationException();
+ } else {
+ try {
+ channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, networkInterface);
+ } catch (IOException e) {
+ throw new ChannelException(e);
+ }
+ }
+ }
+
+ @Override
+ public NetworkInterface getNetworkInterface() {
+ if (DetectionUtil.javaVersion() < 7) {
+ throw new UnsupportedOperationException();
+ } else {
+ try {
+ return (NetworkInterface) channel.getOption(StandardSocketOptions.IP_MULTICAST_IF);
+ } catch (IOException e) {
+ throw new ChannelException(e);
+ }
+ }
+ }
+
+ @Override
+ public int getTimeToLive() {
+ if (DetectionUtil.javaVersion() < 7) {
+ throw new UnsupportedOperationException();
+ } else {
+ try {
+ return (int) channel.getOption(StandardSocketOptions.IP_MULTICAST_TTL);
+ } catch (IOException e) {
+ throw new ChannelException(e);
+ }
+ }
+ }
+
+ @Override
+ public void setTimeToLive(int ttl) {
+ if (DetectionUtil.javaVersion() < 7) {
+ throw new UnsupportedOperationException();
+ } else {
+ try {
+ channel.setOption(StandardSocketOptions.IP_MULTICAST_TTL, ttl);
+ } catch (IOException e) {
+ throw new ChannelException(e);
+ }
+ }
+
+ }
+
+ @Override
+ public InetAddress getInterface() {
+ NetworkInterface inf = getNetworkInterface();
+ if (inf == null) {
+ return null;
+ } else {
+ Enumeration addresses = inf.getInetAddresses();
+ if (addresses.hasMoreElements()) {
+ return addresses.nextElement();
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public void setInterface(InetAddress interfaceAddress) {
+ try {
+ setNetworkInterface(NetworkInterface.getByInetAddress(interfaceAddress));
+ } catch (SocketException e) {
+ throw new ChannelException(e);
+ }
+ }
+
+ @Override
+ public boolean isLoopbackModeDisabled() {
+ if (DetectionUtil.javaVersion() < 7) {
+ throw new UnsupportedOperationException();
+ } else {
+ try {
+ return (Boolean) channel.getOption(StandardSocketOptions.IP_MULTICAST_LOOP);
+ } catch (IOException e) {
+ throw new ChannelException(e);
+ }
+ }
+ }
+
+ @Override
+ public void setLoopbackModeDisabled(boolean loopbackModeDisabled) {
+ if (DetectionUtil.javaVersion() < 7) {
+ throw new UnsupportedOperationException();
+ } else {
+ try {
+ channel.setOption(StandardSocketOptions.IP_MULTICAST_LOOP, loopbackModeDisabled);
+ } catch (IOException e) {
+ throw new ChannelException(e);
+ }
+ }
+ }
+
+
}
diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java
index ca8aa9a26c..0277d63b2b 100644
--- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java
+++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java
@@ -21,14 +21,23 @@ import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
+import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.DatagramChannelConfig;
+import org.jboss.netty.util.internal.DetectionUtil;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
+import java.net.SocketException;
import java.nio.channels.DatagramChannel;
+import java.nio.channels.MembershipKey;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
/**
* Provides an NIO based {@link org.jboss.netty.channel.socket.DatagramChannel}.
@@ -36,24 +45,64 @@ import java.nio.channels.DatagramChannel;
public final class NioDatagramChannel extends AbstractNioChannel
implements org.jboss.netty.channel.socket.DatagramChannel {
+ /**
+ * The supported ProtocolFamily by UDP
+ *
+ */
+ public enum ProtocolFamily {
+ INET,
+ INET6
+ }
+
/**
* The {@link DatagramChannelConfig}.
*/
private final NioDatagramChannelConfig config;
-
+ private Map> memberships;
+
+ /**
+ * Use {@link #NioDatagramChannel(ChannelFactory, ChannelPipeline, ChannelSink, NioDatagramWorker, ProtocolFamily)}
+ */
+ @Deprecated
NioDatagramChannel(final ChannelFactory factory,
final ChannelPipeline pipeline, final ChannelSink sink,
final NioDatagramWorker worker) {
- super(null, factory, pipeline, sink, worker, openNonBlockingChannel());
- config = new DefaultNioDatagramChannelConfig(channel.socket());
+ this(factory, pipeline, sink, worker, null);
+ }
+
+ NioDatagramChannel(final ChannelFactory factory,
+ final ChannelPipeline pipeline, final ChannelSink sink,
+ final NioDatagramWorker worker, ProtocolFamily family) {
+ super(null, factory, pipeline, sink, worker, openNonBlockingChannel(family));
+ config = new DefaultNioDatagramChannelConfig(channel);
fireChannelOpen(this);
}
- private static DatagramChannel openNonBlockingChannel() {
+ private static DatagramChannel openNonBlockingChannel(ProtocolFamily family) {
try {
- final DatagramChannel channel = DatagramChannel.open();
+ final DatagramChannel channel;
+
+ // check if we are on java 7 or if the family was not specified
+ if (DetectionUtil.javaVersion() < 7 || family == null) {
+ channel = DatagramChannel.open();
+ } else {
+ // This block only works on java7++, but we checked before if we have it
+ switch (family) {
+ case INET:
+ channel = DatagramChannel.open(java.net.StandardProtocolFamily.INET);
+ break;
+
+ case INET6:
+ channel = DatagramChannel.open(java.net.StandardProtocolFamily.INET6);
+ break;
+
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
channel.configureBlocking(false);
return channel;
} catch (final IOException e) {
@@ -61,6 +110,8 @@ public final class NioDatagramChannel extends AbstractNioChannel>();
+
+ }
+ List keys = memberships.get(multicastAddress);
+ if (keys == null) {
+ keys = new ArrayList();
+ memberships.put(multicastAddress, keys);
+ }
+ keys.add(key);
+ }
+ } catch (Throwable e) {
+ return Channels.failedFuture(this, e);
+ }
+ }
+ return Channels.succeededFuture(this);
+ }
+
+ public ChannelFuture leaveGroup(InetAddress multicastAddress) {
+ try {
+ return leaveGroup(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null);
+ } catch (SocketException e) {
+ return Channels.failedFuture(this, e);
+ }
+
+ }
+
+ public ChannelFuture leaveGroup(InetSocketAddress multicastAddress,
NetworkInterface networkInterface) {
- throw new UnsupportedOperationException();
+ return leaveGroup(multicastAddress.getAddress(), networkInterface, null);
}
- public void leaveGroup(InetAddress multicastAddress) {
- throw new UnsupportedOperationException();
+ /**
+ * Leave the specified multicast group at the specified interface using the specified source.
+ */
+ public ChannelFuture leaveGroup(InetAddress multicastAddress,
+ NetworkInterface networkInterface, InetAddress source) {
+ if (DetectionUtil.javaVersion() < 7) {
+ throw new UnsupportedOperationException();
+ } else {
+ if (multicastAddress == null) {
+ throw new NullPointerException("multicastAddress");
+ }
+
+ if (networkInterface == null) {
+ throw new NullPointerException("networkInterface");
+ }
+
+ synchronized (this) {
+ if (memberships != null) {
+ List keys = memberships.get(multicastAddress);
+ if (keys != null) {
+ Iterator keyIt = keys.iterator();
+
+ while (keyIt.hasNext()) {
+ MembershipKey key = keyIt.next();
+ if (networkInterface.equals(key.networkInterface())) {
+ if (source == null && key.sourceAddress() == null || (source != null && source.equals(key.sourceAddress()))) {
+ key.drop();
+ keyIt.remove();
+ }
+
+ }
+ }
+ if (keys.isEmpty()) {
+ memberships.remove(multicastAddress);
+ }
+ }
+ }
+ }
+ return Channels.succeededFuture(this);
+ }
}
-
- public void leaveGroup(InetSocketAddress multicastAddress,
- NetworkInterface networkInterface) {
- throw new UnsupportedOperationException();
+
+ /**
+ * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
+ *
+ */
+ public ChannelFuture block(InetAddress multicastAddress,
+ NetworkInterface networkInterface, InetAddress sourceToBlock) {
+ if (DetectionUtil.javaVersion() < 7) {
+ throw new UnsupportedOperationException();
+ } else {
+ if (multicastAddress == null) {
+ throw new NullPointerException("multicastAddress");
+ }
+ if (sourceToBlock == null) {
+ throw new NullPointerException("sourceToBlock");
+ }
+
+ if (networkInterface == null) {
+ throw new NullPointerException("networkInterface");
+ }
+ synchronized (this) {
+ if (memberships != null) {
+ List keys = memberships.get(multicastAddress);
+ for (MembershipKey key: keys) {
+ if (networkInterface.equals(key.networkInterface())) {
+ try {
+ key.block(sourceToBlock);
+ } catch (IOException e) {
+ return Channels.failedFuture(this, e);
+ }
+ }
+ }
+ }
+ }
+ return Channels.succeededFuture(this);
+
+
+ }
}
+
+ /**
+* Block the given sourceToBlock address for the given multicastAddress
+*
+*/
+ public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
+ try {
+ block(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), sourceToBlock);
+ } catch (SocketException e) {
+ return Channels.failedFuture(this, e);
+ }
+ return Channels.succeededFuture(this);
+ }
@Override
InetSocketAddress getLocalSocketAddress() throws Exception {
return (InetSocketAddress) channel.socket().getLocalSocketAddress();
diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java
index 6ea6044288..1d9c12702f 100644
--- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java
+++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java
@@ -24,6 +24,7 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannel.ProtocolFamily;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
import org.jboss.netty.util.ExternalResourceReleasable;
@@ -78,14 +79,25 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
private final NioDatagramPipelineSink sink;
private final WorkerPool workerPool;
-
+ private final NioDatagramChannel.ProtocolFamily family;
+
/**
* Create a new {@link NioDatagramChannelFactory} with a {@link Executors#newCachedThreadPool()}.
*
* See {@link #NioDatagramChannelFactory(Executor)}
*/
+ @Deprecated
public NioDatagramChannelFactory() {
- this(Executors.newCachedThreadPool());
+ this(Executors.newCachedThreadPool(), null);
+ }
+
+ /**
+ * Create a new {@link NioDatagramChannelFactory} with a {@link Executors#newCachedThreadPool()}.
+ *
+ * See {@link #NioDatagramChannelFactory(Executor)}
+ */
+ public NioDatagramChannelFactory(ProtocolFamily family) {
+ this(Executors.newCachedThreadPool(), family);
}
/**
@@ -97,6 +109,7 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
*/
+ @Deprecated
public NioDatagramChannelFactory(final Executor workerExecutor) {
this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
}
@@ -109,6 +122,7 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
* @param workerCount
* the maximum number of I/O worker threads
*/
+ @Deprecated
public NioDatagramChannelFactory(final Executor workerExecutor, final int workerCount) {
this(new NioDatagramWorkerPool(workerExecutor, workerCount, true));
}
@@ -119,13 +133,60 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
* @param workerPool
* the {@link WorkerPool} which will be used to obtain the {@link NioDatagramWorker} that execute the I/O worker threads
*/
- public NioDatagramChannelFactory(WorkerPool workerPool) {
- this.workerPool = workerPool;
- sink = new NioDatagramPipelineSink(workerPool);
- }
+ @Deprecated
+ public NioDatagramChannelFactory(WorkerPool workerPool) {
+ this(workerPool, null);
+ }
+
+ /**
+ * Creates a new instance. Calling this constructor is same with calling
+ * {@link #NioDatagramChannelFactory(Executor, int)} with 2 * the number of
+ * available processors in the machine. The number of available processors
+ * is obtained by {@link Runtime#availableProcessors()}.
+ *
+ * @param workerExecutor
+ * the {@link Executor} which will execute the I/O worker threads
+ * @param family
+ * the {@link ProtocolFamily} to use. This should be used for UDP multicast.
+ * Be aware that this option is only considered when running on java7+
+ */
+ public NioDatagramChannelFactory(final Executor workerExecutor, ProtocolFamily family) {
+ this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS, family);
+ }
+ /**
+ * Creates a new instance.
+ *
+ * @param workerExecutor
+ * the {@link Executor} which will execute the I/O worker threads
+ * @param workerCount
+ * the maximum number of I/O worker threads
+ * @param family
+ * the {@link ProtocolFamily} to use. This should be used for UDP multicast.
+ * Be aware that this option is only considered when running on java7+
+ */
+ public NioDatagramChannelFactory(final Executor workerExecutor,
+ final int workerCount, ProtocolFamily family) {
+ this(new NioDatagramWorkerPool(workerExecutor, workerCount, true), family);
+ }
+
+ /**
+ * Creates a new instance.
+ *
+ * @param workerPool
+ * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads
+ * @param family
+ * the {@link ProtocolFamily} to use. This should be used for UDP multicast.
+ * Be aware that this option is only considered when running on java7+
+ */
+ public NioDatagramChannelFactory(WorkerPool workerPool, ProtocolFamily family) {
+ this.workerPool = workerPool;
+ this.family = family;
+ sink = new NioDatagramPipelineSink(workerPool);
+ }
+
public DatagramChannel newChannel(final ChannelPipeline pipeline) {
- return new NioDatagramChannel(this, pipeline, sink, sink.nextWorker());
+ return new NioDatagramChannel(this, pipeline, sink, sink.nextWorker(), family);
}
public void releaseExternalResources() {
diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannel.java
index e3d385e879..00231693f7 100644
--- a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannel.java
+++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannel.java
@@ -26,8 +26,10 @@ import java.net.SocketException;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
+import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelConfig;
import org.jboss.netty.channel.socket.DefaultDatagramChannelConfig;
@@ -68,22 +70,24 @@ final class OioDatagramChannel extends AbstractOioChannel
return config;
}
- public void joinGroup(InetAddress multicastAddress) {
+ public ChannelFuture joinGroup(InetAddress multicastAddress) {
ensureBound();
try {
socket.joinGroup(multicastAddress);
+ return Channels.succeededFuture(this);
} catch (IOException e) {
- throw new ChannelException(e);
+ return Channels.failedFuture(this, e);
}
}
- public void joinGroup(
+ public ChannelFuture joinGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
ensureBound();
try {
socket.joinGroup(multicastAddress, networkInterface);
+ return Channels.succeededFuture(this);
} catch (IOException e) {
- throw new ChannelException(e);
+ return Channels.failedFuture(this, e);
}
}
@@ -95,20 +99,22 @@ final class OioDatagramChannel extends AbstractOioChannel
}
}
- public void leaveGroup(InetAddress multicastAddress) {
+ public ChannelFuture leaveGroup(InetAddress multicastAddress) {
try {
socket.leaveGroup(multicastAddress);
+ return Channels.succeededFuture(this);
} catch (IOException e) {
- throw new ChannelException(e);
+ return Channels.failedFuture(this, e);
}
}
- public void leaveGroup(
+ public ChannelFuture leaveGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
try {
socket.leaveGroup(multicastAddress, networkInterface);
+ return Channels.succeededFuture(this);
} catch (IOException e) {
- throw new ChannelException(e);
+ return Channels.failedFuture(this, e);
}
}
diff --git a/src/test/java/org/jboss/netty/channel/socket/AbstractDatagramMulticastTest.java b/src/test/java/org/jboss/netty/channel/socket/AbstractDatagramMulticastTest.java
new file mode 100644
index 0000000000..d41081605f
--- /dev/null
+++ b/src/test/java/org/jboss/netty/channel/socket/AbstractDatagramMulticastTest.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jboss.netty.channel.socket;
+
+import static org.junit.Assert.assertTrue;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.DatagramChannel;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.util.TestUtil;
+import org.jboss.netty.util.internal.ExecutorUtil;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public abstract class AbstractDatagramMulticastTest {
+
+
+ private static ExecutorService executor;
+
+
+ @BeforeClass
+ public static void init() {
+ executor = Executors.newCachedThreadPool();
+ }
+
+ @AfterClass
+ public static void destroy() {
+ ExecutorUtil.terminate(executor);
+ }
+
+ protected abstract DatagramChannelFactory newServerSocketChannelFactory(Executor executor);
+ protected abstract DatagramChannelFactory newClientSocketChannelFactory(Executor executor);
+
+ @Test
+ public void testMulticast() throws Throwable {
+ ConnectionlessBootstrap sb = new ConnectionlessBootstrap(newServerSocketChannelFactory(executor));
+ ConnectionlessBootstrap cb = new ConnectionlessBootstrap(newClientSocketChannelFactory(executor));
+ MulticastTestHandler mhandler = new MulticastTestHandler();
+
+ cb.getPipeline().addFirst("handler", mhandler);
+ sb.getPipeline().addFirst("handler", new SimpleChannelUpstreamHandler());
+
+ int port = TestUtil.getFreePort();
+
+ NetworkInterface iface = NetworkInterface.getByInetAddress(InetAddress.getLocalHost());
+
+ // check if the NetworkInterface is null, this is the case on my ubuntu dev machine but not on osx and windows.
+ // if so fail back the the first interface
+ if (iface == null) {
+ // use nextElement() as NetWorkInterface.getByIndex(0) returns null
+ iface = NetworkInterface.getNetworkInterfaces().nextElement();
+ }
+ sb.setOption("networkInterface", iface);
+ sb.setOption("reuseAddress", true);
+
+ Channel sc = sb.bind(new InetSocketAddress(port));
+
+
+ String group = "230.0.0.1";
+ InetSocketAddress groupAddress = new InetSocketAddress(group, port);
+
+ cb.setOption("networkInterface", iface);
+ cb.setOption("reuseAddress", true);
+
+ DatagramChannel cc = (DatagramChannel) cb.bind(new InetSocketAddress(port));
+
+ assertTrue(cc.joinGroup(groupAddress, iface).awaitUninterruptibly().isSuccess());
+
+ assertTrue(sc.write(wrapInt(1), groupAddress).awaitUninterruptibly().isSuccess());
+
+
+ assertTrue(mhandler.await());
+
+ assertTrue(sc.write(wrapInt(1), groupAddress).awaitUninterruptibly().isSuccess());
+
+
+ // leave the group
+ assertTrue(cc.leaveGroup(groupAddress, iface).awaitUninterruptibly().isSuccess());
+
+ // sleep a second to make sure we left the group
+ Thread.sleep(1000);
+
+ // we should not receive a message anymore as we left the group before
+ assertTrue(sc.write(wrapInt(1), groupAddress).awaitUninterruptibly().isSuccess());
+
+ sc.close().awaitUninterruptibly();
+ cc.close().awaitUninterruptibly();
+
+ }
+
+ private ChannelBuffer wrapInt(int value) {
+ ChannelBuffer buf = ChannelBuffers.buffer(4);
+ buf.writeInt(value);
+ return buf;
+ }
+
+ private final class MulticastTestHandler extends SimpleChannelUpstreamHandler {
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ private boolean done = false;
+ private volatile boolean fail = false;
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ super.messageReceived(ctx, e);
+ if (done) {
+ fail = true;
+ }
+
+ Assert.assertEquals(1,((ChannelBuffer)e.getMessage()).readInt());
+
+ latch.countDown();
+
+ // mark the handler as done as we only are supposed to receive one message
+ done = true;
+ }
+
+ public boolean await() throws Exception {
+ boolean success = latch.await(10, TimeUnit.SECONDS);
+ if (fail) {
+ // fail if we receive an message after we are done
+ Assert.fail();
+ }
+ return success;
+ }
+
+ }
+}
diff --git a/src/test/java/org/jboss/netty/channel/socket/NioNioDatagramMulticastTest.java b/src/test/java/org/jboss/netty/channel/socket/NioNioDatagramMulticastTest.java
new file mode 100644
index 0000000000..24897998cb
--- /dev/null
+++ b/src/test/java/org/jboss/netty/channel/socket/NioNioDatagramMulticastTest.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jboss.netty.channel.socket;
+
+import java.util.concurrent.Executor;
+
+import org.jboss.netty.channel.socket.nio.NioDatagramChannel.ProtocolFamily;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
+
+public class NioNioDatagramMulticastTest extends AbstractDatagramMulticastTest {
+
+ @Override
+ protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) {
+ return new NioDatagramChannelFactory(executor, ProtocolFamily.INET);
+ }
+
+ @Override
+ protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) {
+ return new OioDatagramChannelFactory(executor);
+ }
+
+}
diff --git a/src/test/java/org/jboss/netty/channel/socket/NioOioDatagramMulticastTest.java b/src/test/java/org/jboss/netty/channel/socket/NioOioDatagramMulticastTest.java
new file mode 100644
index 0000000000..5a37bbf0b3
--- /dev/null
+++ b/src/test/java/org/jboss/netty/channel/socket/NioOioDatagramMulticastTest.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jboss.netty.channel.socket;
+
+import java.util.concurrent.Executor;
+
+import org.jboss.netty.channel.socket.nio.NioDatagramChannel.ProtocolFamily;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
+
+public class NioOioDatagramMulticastTest extends AbstractDatagramMulticastTest {
+
+ @Override
+ protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) {
+ return new OioDatagramChannelFactory(executor);
+ }
+
+ @Override
+ protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) {
+ return new NioDatagramChannelFactory(executor, ProtocolFamily.INET);
+ }
+
+}
diff --git a/src/test/java/org/jboss/netty/channel/socket/OioNioDatagramMulticastTest.java b/src/test/java/org/jboss/netty/channel/socket/OioNioDatagramMulticastTest.java
new file mode 100644
index 0000000000..d64a45f340
--- /dev/null
+++ b/src/test/java/org/jboss/netty/channel/socket/OioNioDatagramMulticastTest.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jboss.netty.channel.socket;
+
+import java.util.concurrent.Executor;
+
+import org.jboss.netty.channel.socket.nio.NioDatagramChannel.ProtocolFamily;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+
+public class OioNioDatagramMulticastTest extends AbstractDatagramMulticastTest {
+
+ @Override
+ protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) {
+ return new NioDatagramChannelFactory(executor, ProtocolFamily.INET);
+ }
+
+ @Override
+ protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) {
+ return new NioDatagramChannelFactory(executor, ProtocolFamily.INET);
+ }
+
+}
diff --git a/src/test/java/org/jboss/netty/channel/socket/OioOioDatagramMulticastTest.java b/src/test/java/org/jboss/netty/channel/socket/OioOioDatagramMulticastTest.java
new file mode 100644
index 0000000000..dffeb31fb3
--- /dev/null
+++ b/src/test/java/org/jboss/netty/channel/socket/OioOioDatagramMulticastTest.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2011 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jboss.netty.channel.socket;
+
+import java.util.concurrent.Executor;
+
+import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
+
+public class OioOioDatagramMulticastTest extends AbstractDatagramMulticastTest {
+
+ @Override
+ protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) {
+ return new OioDatagramChannelFactory(executor);
+ }
+
+ @Override
+ protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) {
+ return new OioDatagramChannelFactory(executor);
+ }
+
+}
diff --git a/src/test/java/org/jboss/netty/util/TestUtil.java b/src/test/java/org/jboss/netty/util/TestUtil.java
index fb70d4312c..b74b7e9529 100644
--- a/src/test/java/org/jboss/netty/util/TestUtil.java
+++ b/src/test/java/org/jboss/netty/util/TestUtil.java
@@ -15,7 +15,9 @@
*/
package org.jboss.netty.util;
+import java.io.IOException;
import java.net.InetAddress;
+import java.net.ServerSocket;
import java.net.UnknownHostException;
@@ -25,7 +27,9 @@ import java.net.UnknownHostException;
public final class TestUtil {
private static final InetAddress LOCALHOST;
-
+ private final static int START_PORT = 20000;
+ private final static int END_PORT = 30000;
+
static {
InetAddress localhost = null;
try {
@@ -53,6 +57,26 @@ public final class TestUtil {
return LOCALHOST;
}
+
+ /**
+ * Return a free port which can be used to bind to
+ *
+ * @return port
+ */
+ public static int getFreePort() {
+ for(int start = START_PORT; start <= END_PORT; start++) {
+ try {
+ ServerSocket socket = new ServerSocket(start);
+ socket.setReuseAddress(true);
+ socket.close();
+ return start;
+ } catch (IOException e) {
+ // ignore
+ }
+
+ }
+ throw new RuntimeException("Unable to find a free port....");
+ }
private TestUtil() {
// Unused
}