Finish support of NIO UDP multicast. This also change the methods to

return a ChannelFuture. See #216
This commit is contained in:
norman 2012-04-02 11:57:32 +02:00
parent 72f9f502bb
commit 9b90e3191a
5 changed files with 94 additions and 40 deletions

View File

@ -213,7 +213,7 @@
<rules> <rules>
<requireJavaVersion> <requireJavaVersion>
<!-- Enforce java 1.7 as minimum for compiling --> <!-- Enforce java 1.7 as minimum for compiling -->
<!-- This is needed because of java.util.zip.Deflater --> <!-- This is needed because of java.util.zip.Deflater and NIO UDP multicast-->
<version>[1.7.0,)</version> <version>[1.7.0,)</version>
</requireJavaVersion> </requireJavaVersion>
<requireMavenVersion> <requireMavenVersion>
@ -252,6 +252,10 @@
<ignores> <ignores>
<ignore>sun.misc.Unsafe</ignore> <ignore>sun.misc.Unsafe</ignore>
<ignore>java.util.zip.Deflater</ignore> <ignore>java.util.zip.Deflater</ignore>
<!-- Used for NIO UDP multicast -->
<ignore>java.nio.channels.DatagramChannel</ignore>
<ignore>java.nio.channels.MembershipKey</ignore>
<ignore>java.net.StandardSocketOptions</ignore>
</ignores> </ignores>
</configuration> </configuration>
<executions> <executions>

View File

@ -20,6 +20,7 @@ import java.net.InetSocketAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
/** /**
* A UDP/IP {@link Channel} which is created by {@link DatagramChannelFactory}. * A UDP/IP {@link Channel} which is created by {@link DatagramChannelFactory}.
@ -37,20 +38,20 @@ public interface DatagramChannel extends Channel {
/** /**
* Joins a multicast group. * Joins a multicast group.
*/ */
void joinGroup(InetAddress multicastAddress); ChannelFuture joinGroup(InetAddress multicastAddress);
/** /**
* Joins the specified multicast group at the specified interface. * Joins the specified multicast group at the specified interface.
*/ */
void joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface); ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface);
/** /**
* Leaves a multicast group. * Leaves a multicast group.
*/ */
void leaveGroup(InetAddress multicastAddress); ChannelFuture leaveGroup(InetAddress multicastAddress);
/** /**
* Leaves a multicast group on a specified local interface. * Leaves a multicast group on a specified local interface.
*/ */
void leaveGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface); ChannelFuture leaveGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface);
} }

View File

@ -15,13 +15,18 @@
*/ */
package io.netty.channel.socket.nio; package io.netty.channel.socket.nio;
import java.net.DatagramSocket; import io.netty.channel.ChannelException;
import java.util.Map;
import io.netty.channel.socket.DefaultDatagramChannelConfig; import io.netty.channel.socket.DefaultDatagramChannelConfig;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.ConversionUtil; import io.netty.util.internal.ConversionUtil;
import io.netty.util.internal.DetectionUtil;
import java.io.IOException;
import java.net.NetworkInterface;
import java.net.StandardSocketOptions;
import java.nio.channels.DatagramChannel;
import java.util.Map;
/** /**
* The default {@link NioSocketChannelConfig} implementation. * The default {@link NioSocketChannelConfig} implementation.
@ -37,8 +42,11 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig
private volatile int writeBufferLowWaterMark = 32 * 1024; private volatile int writeBufferLowWaterMark = 32 * 1024;
private volatile int writeSpinCount = 16; private volatile int writeSpinCount = 16;
DefaultNioDatagramChannelConfig(DatagramSocket socket) { private final DatagramChannel channel;
super(socket);
DefaultNioDatagramChannelConfig(DatagramChannel channel) {
super(channel.socket());
this.channel = channel;
} }
@Override @Override
@ -138,4 +146,31 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig
} }
this.writeSpinCount = writeSpinCount; this.writeSpinCount = writeSpinCount;
} }
@Override
public void setNetworkInterface(NetworkInterface networkInterface) {
if (DetectionUtil.javaVersion() < 7) {
throw new UnsupportedOperationException();
} else {
try {
channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, networkInterface);
} catch (IOException e) {
throw new ChannelException(e);
}
}
}
@Override
public NetworkInterface getNetworkInterface() {
if (DetectionUtil.javaVersion() < 7) {
throw new UnsupportedOperationException();
} else {
try {
return (NetworkInterface) channel.getOption(StandardSocketOptions.IP_MULTICAST_IF);
} catch (IOException e) {
throw new ChannelException(e);
}
}
}
} }

View File

@ -21,6 +21,7 @@ import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink; import io.netty.channel.ChannelSink;
import io.netty.channel.Channels;
import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.util.internal.DetectionUtil; import io.netty.util.internal.DetectionUtil;
@ -61,7 +62,7 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
final ChannelPipeline pipeline, final ChannelSink sink, final ChannelPipeline pipeline, final ChannelSink sink,
final NioDatagramWorker worker) { final NioDatagramWorker worker) {
super(null, factory, pipeline, sink, worker, new NioDatagramJdkChannel(openNonBlockingChannel())); super(null, factory, pipeline, sink, worker, new NioDatagramJdkChannel(openNonBlockingChannel()));
config = new DefaultNioDatagramChannelConfig(getJdkChannel().getChannel().socket()); config = new DefaultNioDatagramChannelConfig(getJdkChannel().getChannel());
} }
private static DatagramChannel openNonBlockingChannel() { private static DatagramChannel openNonBlockingChannel() {
@ -106,23 +107,23 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
} }
@Override @Override
public void joinGroup(InetAddress multicastAddress) { public ChannelFuture joinGroup(InetAddress multicastAddress) {
try { try {
joinGroup(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null); return joinGroup(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null);
} catch (SocketException e) { } catch (SocketException e) {
throw new ChannelException(e); return Channels.failedFuture(this, e);
} }
} }
@Override @Override
public void joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) { public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
joinGroup(multicastAddress.getAddress(), networkInterface, null); return joinGroup(multicastAddress.getAddress(), networkInterface, null);
} }
/** /**
* Joins the specified multicast group at the specified interface using the specified source. * Joins the specified multicast group at the specified interface using the specified source.
*/ */
public void joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) { public ChannelFuture joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
if (DetectionUtil.javaVersion() < 7) { if (DetectionUtil.javaVersion() < 7) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} else { } else {
@ -149,32 +150,33 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
keys.add(key); keys.add(key);
} }
} catch (IOException e) { } catch (Throwable e) {
throw new ChannelException(e); return Channels.failedFuture(this, e);
} }
} }
return Channels.succeededFuture(this);
} }
@Override @Override
public void leaveGroup(InetAddress multicastAddress) { public ChannelFuture leaveGroup(InetAddress multicastAddress) {
try { try {
leaveGroup(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null); return leaveGroup(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null);
} catch (SocketException e) { } catch (SocketException e) {
throw new ChannelException(e); return Channels.failedFuture(this, e);
} }
} }
@Override @Override
public void leaveGroup(InetSocketAddress multicastAddress, public ChannelFuture leaveGroup(InetSocketAddress multicastAddress,
NetworkInterface networkInterface) { NetworkInterface networkInterface) {
leaveGroup(multicastAddress.getAddress(), networkInterface, null); return leaveGroup(multicastAddress.getAddress(), networkInterface, null);
} }
/** /**
* Leave the specified multicast group at the specified interface using the specified source. * Leave the specified multicast group at the specified interface using the specified source.
*/ */
public void leaveGroup(InetAddress multicastAddress, public ChannelFuture leaveGroup(InetAddress multicastAddress,
NetworkInterface networkInterface, InetAddress source) { NetworkInterface networkInterface, InetAddress source) {
if (DetectionUtil.javaVersion() < 7) { if (DetectionUtil.javaVersion() < 7) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
@ -209,8 +211,7 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
} }
} }
} }
return Channels.succeededFuture(this);
} }
} }
@ -218,7 +219,7 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
* Block the given sourceToBlock address for the given multicastAddress on the given networkInterface * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
* *
*/ */
public void block(InetAddress multicastAddress, public ChannelFuture block(InetAddress multicastAddress,
NetworkInterface networkInterface, InetAddress sourceToBlock) { NetworkInterface networkInterface, InetAddress sourceToBlock) {
if (DetectionUtil.javaVersion() < 7) { if (DetectionUtil.javaVersion() < 7) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
@ -241,26 +242,30 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
try { try {
key.block(sourceToBlock); key.block(sourceToBlock);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); return Channels.failedFuture(this, e);
} }
} }
} }
} }
} }
return Channels.succeededFuture(this);
} }
} }
/** /**
* Block the given sourceToBlock address for the given multicastAddress * Block the given sourceToBlock address for the given multicastAddress
* *
*/ */
public void block(InetAddress multicastAddress, InetAddress sourceToBlock) { public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
try { try {
block(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), sourceToBlock); block(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), sourceToBlock);
} catch (SocketException e) { } catch (SocketException e) {
throw new ChannelException(e); return Channels.failedFuture(this, e);
} }
return Channels.succeededFuture(this);
} }
@Override @Override

View File

@ -26,8 +26,10 @@ import java.net.SocketException;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink; import io.netty.channel.ChannelSink;
import io.netty.channel.Channels;
import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DefaultDatagramChannelConfig; import io.netty.channel.socket.DefaultDatagramChannelConfig;
@ -76,23 +78,26 @@ final class OioDatagramChannel extends AbstractOioChannel
} }
@Override @Override
public void joinGroup(InetAddress multicastAddress) { public ChannelFuture joinGroup(InetAddress multicastAddress) {
ensureBound(); ensureBound();
try { try {
socket.joinGroup(multicastAddress); socket.joinGroup(multicastAddress);
return Channels.succeededFuture(this);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); return Channels.failedFuture(this, e);
} }
} }
@Override @Override
public void joinGroup( public ChannelFuture joinGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface) { InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
ensureBound(); ensureBound();
try { try {
socket.joinGroup(multicastAddress, networkInterface); socket.joinGroup(multicastAddress, networkInterface);
return Channels.succeededFuture(this);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); return Channels.failedFuture(this, e);
} }
} }
@ -105,21 +110,25 @@ final class OioDatagramChannel extends AbstractOioChannel
} }
@Override @Override
public void leaveGroup(InetAddress multicastAddress) { public ChannelFuture leaveGroup(InetAddress multicastAddress) {
try { try {
socket.leaveGroup(multicastAddress); socket.leaveGroup(multicastAddress);
return Channels.succeededFuture(this);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); return Channels.failedFuture(this, e);
} }
} }
@Override @Override
public void leaveGroup( public ChannelFuture leaveGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface) { InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
try { try {
socket.leaveGroup(multicastAddress, networkInterface); socket.leaveGroup(multicastAddress, networkInterface);
return Channels.succeededFuture(this);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); return Channels.failedFuture(this, e);
} }
} }