Ported multicast test / Fixed bugs in NioDatagramChannelConfig

This commit is contained in:
Trustin Lee 2012-06-03 02:57:52 -07:00
parent e6ceb91a85
commit ada61d4985
4 changed files with 147 additions and 158 deletions

View File

@ -19,11 +19,14 @@ import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
public final class SocketAddresses { public final class SocketAddresses {
public static final InetAddress LOCALHOST; public static final InetAddress LOCALHOST;
public static final NetworkInterface LOOPBACK_IF;
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SocketAddresses.class); InternalLoggerFactory.getInstance(SocketAddresses.class);
@ -48,6 +51,26 @@ public final class SocketAddresses {
} }
LOCALHOST = localhost; LOCALHOST = localhost;
NetworkInterface loopbackIf;
try {
loopbackIf = NetworkInterface.getByInetAddress(LOCALHOST);
} catch (SocketException e) {
loopbackIf = null;
}
// check if the NetworkInterface is null, this is the case on my ubuntu dev machine but not on osx and windows.
// if so fail back the the first interface
if (loopbackIf == null) {
// use nextElement() as NetWorkInterface.getByIndex(0) returns null
try {
loopbackIf = NetworkInterface.getNetworkInterfaces().nextElement();
} catch (SocketException e) {
logger.error("Failed to enumerate network interfaces", e);
}
}
LOOPBACK_IF = loopbackIf;
} }
private SocketAddresses() { private SocketAddresses() {

View File

@ -1,154 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.assertTrue;
import io.netty.bootstrap.ConnectionlessBootstrap;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelFactory;
import io.netty.testsuite.util.TestUtils;
import io.netty.util.internal.ExecutorUtil;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public abstract class AbstractDatagramMulticastTest {
private static ExecutorService executor;
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected abstract DatagramChannelFactory newServerSocketChannelFactory(Executor executor);
protected abstract DatagramChannelFactory newClientSocketChannelFactory(Executor executor);
@Test
public void testMulticast() throws Throwable {
ConnectionlessBootstrap sb = new ConnectionlessBootstrap(newServerSocketChannelFactory(executor));
ConnectionlessBootstrap cb = new ConnectionlessBootstrap(newClientSocketChannelFactory(executor));
MulticastTestHandler mhandler = new MulticastTestHandler();
cb.pipeline().addFirst("handler", mhandler);
sb.pipeline().addFirst("handler", new SimpleChannelUpstreamHandler());
int port = TestUtils.getFreePort();
NetworkInterface iface = NetworkInterface.getByInetAddress(InetAddress.getLocalHost());
// check if the NetworkInterface is null, this is the case on my ubuntu dev machine but not on osx and windows.
// if so fail back the the first interface
if (iface == null) {
// use nextElement() as NetWorkInterface.getByIndex(0) returns null
iface = NetworkInterface.getNetworkInterfaces().nextElement();
}
sb.setOption("networkInterface", iface);
sb.setOption("reuseAddress", true);
Channel sc = sb.bind(new InetSocketAddress(port));
String group = "230.0.0.1";
InetSocketAddress groupAddress = new InetSocketAddress(group, port);
cb.setOption("networkInterface", iface);
cb.setOption("reuseAddress", true);
DatagramChannel cc = (DatagramChannel) cb.bind(new InetSocketAddress(port));
assertTrue(cc.joinGroup(groupAddress, iface).awaitUninterruptibly().isSuccess());
assertTrue(sc.write(ChannelBuffers.copyInt(1), groupAddress).awaitUninterruptibly().isSuccess());
assertTrue(mhandler.await());
assertTrue(sc.write(ChannelBuffers.copyInt(1), groupAddress).awaitUninterruptibly().isSuccess());
// leave the group
assertTrue(cc.leaveGroup(groupAddress, iface).awaitUninterruptibly().isSuccess());
// sleep a second to make sure we left the group
Thread.sleep(1000);
// we should not receive a message anymore as we left the group before
assertTrue(sc.write(ChannelBuffers.copyInt(1), groupAddress).awaitUninterruptibly().isSuccess());
sc.close().awaitUninterruptibly();
cc.close().awaitUninterruptibly();
}
private final class MulticastTestHandler extends SimpleChannelUpstreamHandler {
private final CountDownLatch latch = new CountDownLatch(1);
private boolean done = false;
private volatile boolean fail = false;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.messageReceived(ctx, e);
if (done) {
fail = true;
}
Assert.assertEquals(1,((ChannelBuffer)e.getMessage()).readInt());
latch.countDown();
// mark the handler as done as we only are supposed to receive one message
done = true;
}
public boolean await() throws Exception {
boolean success = latch.await(10, TimeUnit.SECONDS);
if (fail) {
// fail if we receive an message after we are done
Assert.fail();
}
return success;
}
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.SocketAddresses;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
public class DatagramMulticastTest extends AbstractDatagramTest {
@Test
public void testMulticast() throws Throwable {
run();
}
public void testMulticast(Bootstrap sb, Bootstrap cb) throws Throwable {
MulticastTestHandler mhandler = new MulticastTestHandler();
sb.handler(mhandler);
cb.handler(new ChannelInboundMessageHandlerAdapter<DatagramPacket>() {
@Override
public void messageReceived(
ChannelInboundHandlerContext<DatagramPacket> ctx,
DatagramPacket msg) throws Exception {
// Nothing will be sent.
}
});
sb.option(ChannelOption.IP_MULTICAST_IF, SocketAddresses.LOOPBACK_IF);
sb.option(ChannelOption.SO_REUSEADDR, true);
cb.option(ChannelOption.IP_MULTICAST_IF, SocketAddresses.LOOPBACK_IF);
cb.option(ChannelOption.SO_REUSEADDR, true);
cb.localAddress(addr);
Channel sc = sb.bind().sync().channel();
DatagramChannel cc = (DatagramChannel) cb.bind().sync().channel();
String group = "230.0.0.1";
InetSocketAddress groupAddress = new InetSocketAddress(group, addr.getPort());
cc.joinGroup(groupAddress, SocketAddresses.LOOPBACK_IF).sync();
sc.write(new DatagramPacket(ChannelBuffers.copyInt(1), groupAddress)).sync();
assertTrue(mhandler.await());
sc.write(new DatagramPacket(ChannelBuffers.copyInt(1), groupAddress)).sync();
// leave the group
cc.leaveGroup(groupAddress, SocketAddresses.LOOPBACK_IF).sync();
// sleep a second to make sure we left the group
Thread.sleep(1000);
// we should not receive a message anymore as we left the group before
sc.write(new DatagramPacket(ChannelBuffers.copyInt(1), groupAddress)).sync();
mhandler.await();
sc.close().awaitUninterruptibly();
cc.close().awaitUninterruptibly();
}
private final class MulticastTestHandler extends ChannelInboundMessageHandlerAdapter<DatagramPacket> {
private final CountDownLatch latch = new CountDownLatch(1);
private boolean done = false;
private volatile boolean fail = false;
@Override
public void messageReceived(
ChannelInboundHandlerContext<DatagramPacket> ctx,
DatagramPacket msg) throws Exception {
if (done) {
fail = true;
}
Assert.assertEquals(1, msg.data().readInt());
latch.countDown();
// mark the handler as done as we only are supposed to receive one message
done = true;
}
public boolean await() throws Exception {
boolean success = latch.await(10, TimeUnit.SECONDS);
if (fail) {
// fail if we receive an message after we are done
Assert.fail();
}
return success;
}
}
}

View File

@ -24,6 +24,7 @@ import java.net.InetAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import java.net.SocketException; import java.net.SocketException;
import java.nio.channels.DatagramChannel; import java.nio.channels.DatagramChannel;
import java.nio.channels.NetworkChannel;
import java.util.Enumeration; import java.util.Enumeration;
/** /**
@ -53,7 +54,7 @@ class NioDatagramChannelConfig extends DefaultDatagramChannelConfig {
Method setOption = null; Method setOption = null;
if (socketOptionType != null) { if (socketOptionType != null) {
try { try {
ipMulticastIf = Class.forName("java.net.StandardSocketOptions", true, classLoader).getDeclaredField("IP_MULTICAST_TTL").get(null); ipMulticastTtl = Class.forName("java.net.StandardSocketOptions", true, classLoader).getDeclaredField("IP_MULTICAST_TTL").get(null);
} catch (Exception e) { } catch (Exception e) {
throw new Error("cannot locate the IP_MULTICAST_TTL field", e); throw new Error("cannot locate the IP_MULTICAST_TTL field", e);
} }
@ -65,19 +66,19 @@ class NioDatagramChannelConfig extends DefaultDatagramChannelConfig {
} }
try { try {
ipMulticastIf = Class.forName("java.net.StandardSocketOptions", true, classLoader).getDeclaredField("IP_MULTICAST_LOOP").get(null); ipMulticastLoop = Class.forName("java.net.StandardSocketOptions", true, classLoader).getDeclaredField("IP_MULTICAST_LOOP").get(null);
} catch (Exception e) { } catch (Exception e) {
throw new Error("cannot locate the IP_MULTICAST_LOOP field", e); throw new Error("cannot locate the IP_MULTICAST_LOOP field", e);
} }
try { try {
getOption = DatagramChannel.class.getDeclaredMethod("getOption", socketOptionType); getOption = NetworkChannel.class.getDeclaredMethod("getOption", socketOptionType);
} catch (Exception e) { } catch (Exception e) {
throw new Error("cannot locate the getOption() method", e); throw new Error("cannot locate the getOption() method", e);
} }
try { try {
setOption = DatagramChannel.class.getDeclaredMethod("setOption", socketOptionType, Object.class); setOption = NetworkChannel.class.getDeclaredMethod("setOption", socketOptionType, Object.class);
} catch (Exception e) { } catch (Exception e) {
throw new Error("cannot locate the setOption() method", e); throw new Error("cannot locate the setOption() method", e);
} }