diff --git a/pom.xml b/pom.xml index f69676e699..5a6bfbe7fc 100644 --- a/pom.xml +++ b/pom.xml @@ -213,7 +213,7 @@ - + [1.7.0,) @@ -252,6 +252,10 @@ sun.misc.Unsafe java.util.zip.Deflater + + java.nio.channels.DatagramChannel + java.nio.channels.MembershipKey + java.net.StandardSocketOptions diff --git a/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java index 29d59f7e67..aa44161599 100644 --- a/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java @@ -20,6 +20,7 @@ import java.net.InetSocketAddress; import java.net.NetworkInterface; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; /** * A UDP/IP {@link Channel} which is created by {@link DatagramChannelFactory}. @@ -37,20 +38,20 @@ public interface DatagramChannel extends Channel { /** * Joins a multicast group. */ - void joinGroup(InetAddress multicastAddress); + ChannelFuture joinGroup(InetAddress multicastAddress); /** * Joins the specified multicast group at the specified interface. */ - void joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface); + ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface); /** * Leaves a multicast group. */ - void leaveGroup(InetAddress multicastAddress); + ChannelFuture leaveGroup(InetAddress multicastAddress); /** * Leaves a multicast group on a specified local interface. */ - void leaveGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface); + ChannelFuture leaveGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java index 42fb4e3656..8a329b546f 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java @@ -15,13 +15,18 @@ */ package io.netty.channel.socket.nio; -import java.net.DatagramSocket; -import java.util.Map; - +import io.netty.channel.ChannelException; import io.netty.channel.socket.DefaultDatagramChannelConfig; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.ConversionUtil; +import io.netty.util.internal.DetectionUtil; + +import java.io.IOException; +import java.net.NetworkInterface; +import java.net.StandardSocketOptions; +import java.nio.channels.DatagramChannel; +import java.util.Map; /** * The default {@link NioSocketChannelConfig} implementation. @@ -37,8 +42,11 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig private volatile int writeBufferLowWaterMark = 32 * 1024; private volatile int writeSpinCount = 16; - DefaultNioDatagramChannelConfig(DatagramSocket socket) { - super(socket); + private final DatagramChannel channel; + + DefaultNioDatagramChannelConfig(DatagramChannel channel) { + super(channel.socket()); + this.channel = channel; } @Override @@ -138,4 +146,31 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig } this.writeSpinCount = writeSpinCount; } + + @Override + public void setNetworkInterface(NetworkInterface networkInterface) { + if (DetectionUtil.javaVersion() < 7) { + throw new UnsupportedOperationException(); + } else { + try { + channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, networkInterface); + } catch (IOException e) { + throw new ChannelException(e); + } + } + } + + @Override + public NetworkInterface getNetworkInterface() { + if (DetectionUtil.javaVersion() < 7) { + throw new UnsupportedOperationException(); + } else { + try { + return (NetworkInterface) channel.getOption(StandardSocketOptions.IP_MULTICAST_IF); + } catch (IOException e) { + throw new ChannelException(e); + } + } + } + } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index d199d135df..ac82ecbee6 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; +import io.netty.channel.Channels; import io.netty.channel.socket.DatagramChannelConfig; import io.netty.util.internal.DetectionUtil; @@ -61,7 +62,7 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n final ChannelPipeline pipeline, final ChannelSink sink, final NioDatagramWorker worker) { super(null, factory, pipeline, sink, worker, new NioDatagramJdkChannel(openNonBlockingChannel())); - config = new DefaultNioDatagramChannelConfig(getJdkChannel().getChannel().socket()); + config = new DefaultNioDatagramChannelConfig(getJdkChannel().getChannel()); } private static DatagramChannel openNonBlockingChannel() { @@ -106,23 +107,23 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n } @Override - public void joinGroup(InetAddress multicastAddress) { + public ChannelFuture joinGroup(InetAddress multicastAddress) { try { - joinGroup(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null); + return joinGroup(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null); } catch (SocketException e) { - throw new ChannelException(e); + return Channels.failedFuture(this, e); } } @Override - public void joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) { - joinGroup(multicastAddress.getAddress(), networkInterface, null); + public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) { + return joinGroup(multicastAddress.getAddress(), networkInterface, null); } /** * Joins the specified multicast group at the specified interface using the specified source. */ - public void joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) { + public ChannelFuture joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) { if (DetectionUtil.javaVersion() < 7) { throw new UnsupportedOperationException(); } else { @@ -149,32 +150,33 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n keys.add(key); } - } catch (IOException e) { - throw new ChannelException(e); + } catch (Throwable e) { + return Channels.failedFuture(this, e); } } + return Channels.succeededFuture(this); } @Override - public void leaveGroup(InetAddress multicastAddress) { + public ChannelFuture leaveGroup(InetAddress multicastAddress) { try { - leaveGroup(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null); + return leaveGroup(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null); } catch (SocketException e) { - throw new ChannelException(e); + return Channels.failedFuture(this, e); } } @Override - public void leaveGroup(InetSocketAddress multicastAddress, + public ChannelFuture leaveGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) { - leaveGroup(multicastAddress.getAddress(), networkInterface, null); + return leaveGroup(multicastAddress.getAddress(), networkInterface, null); } /** * Leave the specified multicast group at the specified interface using the specified source. */ - public void leaveGroup(InetAddress multicastAddress, + public ChannelFuture leaveGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) { if (DetectionUtil.javaVersion() < 7) { throw new UnsupportedOperationException(); @@ -193,7 +195,7 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n if (keys != null) { Iterator keyIt = keys.iterator(); - while(keyIt.hasNext()) { + while (keyIt.hasNext()) { MembershipKey key = keyIt.next(); if (networkInterface.equals(key.networkInterface())) { if (source == null && key.sourceAddress() == null || (source != null && source.equals(key.sourceAddress()))) { @@ -209,8 +211,7 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n } } } - - + return Channels.succeededFuture(this); } } @@ -218,7 +219,7 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface * */ - public void block(InetAddress multicastAddress, + public ChannelFuture block(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress sourceToBlock) { if (DetectionUtil.javaVersion() < 7) { throw new UnsupportedOperationException(); @@ -241,26 +242,30 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n try { key.block(sourceToBlock); } catch (IOException e) { - throw new ChannelException(e); + return Channels.failedFuture(this, e); } } } } } + return Channels.succeededFuture(this); } } + /** * Block the given sourceToBlock address for the given multicastAddress * */ - public void block(InetAddress multicastAddress, InetAddress sourceToBlock) { + public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) { try { block(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), sourceToBlock); } catch (SocketException e) { - throw new ChannelException(e); + return Channels.failedFuture(this, e); } + return Channels.succeededFuture(this); + } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index a73a24c503..1cc51506f0 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -26,8 +26,10 @@ import java.net.SocketException; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; +import io.netty.channel.Channels; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DefaultDatagramChannelConfig; @@ -76,23 +78,26 @@ final class OioDatagramChannel extends AbstractOioChannel } @Override - public void joinGroup(InetAddress multicastAddress) { + public ChannelFuture joinGroup(InetAddress multicastAddress) { ensureBound(); try { socket.joinGroup(multicastAddress); + return Channels.succeededFuture(this); } catch (IOException e) { - throw new ChannelException(e); + return Channels.failedFuture(this, e); } } @Override - public void joinGroup( + public ChannelFuture joinGroup( InetSocketAddress multicastAddress, NetworkInterface networkInterface) { ensureBound(); try { socket.joinGroup(multicastAddress, networkInterface); + return Channels.succeededFuture(this); + } catch (IOException e) { - throw new ChannelException(e); + return Channels.failedFuture(this, e); } } @@ -105,21 +110,25 @@ final class OioDatagramChannel extends AbstractOioChannel } @Override - public void leaveGroup(InetAddress multicastAddress) { + public ChannelFuture leaveGroup(InetAddress multicastAddress) { try { socket.leaveGroup(multicastAddress); + return Channels.succeededFuture(this); + } catch (IOException e) { - throw new ChannelException(e); + return Channels.failedFuture(this, e); } } @Override - public void leaveGroup( + public ChannelFuture leaveGroup( InetSocketAddress multicastAddress, NetworkInterface networkInterface) { try { socket.leaveGroup(multicastAddress, networkInterface); + return Channels.succeededFuture(this); + } catch (IOException e) { - throw new ChannelException(e); + return Channels.failedFuture(this, e); } }