From 165229658be7457f3a3c7540496b6ff9a0b6406f Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 7 Jun 2019 13:44:06 -0700 Subject: [PATCH] Add support for loopbackmode and accessing the configured interface when using epoll native transport with multicast (#9218) Motivation: We did not have support for enable / disable loopback mode in our native epoll transport and also missed the implemention to access the configured interface. Modifications: Add implementation and adjust test to cover it Result: More complete multicast support with native epoll transport --- .../socket/DatagramMulticastTest.java | 31 ++++++++++-- .../src/main/c/netty_epoll_linuxsocket.c | 47 ++++++++++++++++++ .../epoll/EpollDatagramChannelConfig.java | 25 ++++++++-- .../io/netty/channel/epoll/LinuxSocket.java | 48 +++++++++++++++++++ 4 files changed, 144 insertions(+), 7 deletions(-) diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java index 3f582eeccb..6ea3209469 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java @@ -17,7 +17,6 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; @@ -72,9 +71,11 @@ public class DatagramMulticastTest extends AbstractDatagramTest { cb.option(ChannelOption.IP_MULTICAST_IF, iface); cb.option(ChannelOption.SO_REUSEADDR, true); - Channel sc = sb.bind(newSocketAddress(iface)).sync().channel(); + DatagramChannel sc = (DatagramChannel) sb.bind(newSocketAddress(iface)).sync().channel(); + assertEquals(iface, sc.config().getNetworkInterface()); + assertInterfaceAddress(iface, sc.config().getInterface()); - InetSocketAddress addr = (InetSocketAddress) sc.localAddress(); + InetSocketAddress addr = sc.localAddress(); cb.localAddress(addr.getPort()); if (sc instanceof OioDatagramChannel) { @@ -85,6 +86,8 @@ public class DatagramMulticastTest extends AbstractDatagramTest { return; } DatagramChannel cc = (DatagramChannel) cb.bind().sync().channel(); + assertEquals(iface, cc.config().getNetworkInterface()); + assertInterfaceAddress(iface, cc.config().getInterface()); InetSocketAddress groupAddress = SocketUtils.socketAddress(groupAddress(), addr.getPort()); @@ -103,10 +106,32 @@ public class DatagramMulticastTest extends AbstractDatagramTest { sc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), groupAddress)).sync(); mhandler.await(); + cc.config().setLoopbackModeDisabled(false); + sc.config().setLoopbackModeDisabled(false); + + assertFalse(cc.config().isLoopbackModeDisabled()); + assertFalse(sc.config().isLoopbackModeDisabled()); + + cc.config().setLoopbackModeDisabled(true); + sc.config().setLoopbackModeDisabled(true); + + assertTrue(cc.config().isLoopbackModeDisabled()); + assertTrue(sc.config().isLoopbackModeDisabled()); + sc.close().awaitUninterruptibly(); cc.close().awaitUninterruptibly(); } + private static void assertInterfaceAddress(NetworkInterface networkInterface, InetAddress expected) { + Enumeration addresses = networkInterface.getInetAddresses(); + while (addresses.hasMoreElements()) { + if (expected.equals(addresses.nextElement())) { + return; + } + } + fail(); + } + private static final class MulticastTestHandler extends SimpleChannelInboundHandler { private final CountDownLatch latch = new CountDownLatch(1); diff --git a/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c b/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c index 8169230d63..6120d10c6b 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c +++ b/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c @@ -68,6 +68,16 @@ static void netty_epoll_linuxsocket_setTimeToLive(JNIEnv* env, jclass clazz, jin netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval)); } +static void netty_epoll_linuxsocket_setIpMulticastLoop(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jint optval) { + if (ipv6 == JNI_TRUE) { + u_int val = (u_int) optval; + netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &val, sizeof(val)); + } else { + u_char val = (u_char) optval; + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_MULTICAST_LOOP, &val, sizeof(val)); + } +} + static void netty_epoll_linuxsocket_setInterface(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex) { struct sockaddr_storage interfaceAddr; socklen_t interfaceAddrSize; @@ -392,6 +402,23 @@ static void netty_epoll_linuxsocket_setTcpMd5Sig(JNIEnv* env, jclass clazz, jint } } +static int netty_epoll_linuxsocket_getInterface(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6) { + if (ipv6 == JNI_TRUE) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_IPV6, IPV6_MULTICAST_IF, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; + } else { + struct in_addr optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_MULTICAST_IF, &optval, sizeof(optval)) == -1) { + return -1; + } + + return ntohl(optval.s_addr); + } +} + static jint netty_epoll_linuxsocket_getTimeToLive(JNIEnv* env, jclass clazz, jint fd) { int optval; if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval)) == -1) { @@ -400,6 +427,23 @@ static jint netty_epoll_linuxsocket_getTimeToLive(JNIEnv* env, jclass clazz, jin return optval; } + +static jint netty_epoll_linuxsocket_getIpMulticastLoop(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6) { + if (ipv6 == JNI_TRUE) { + u_int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &optval, sizeof(optval)) == -1) { + return -1; + } + return (jint) optval; + } else { + u_char optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_MULTICAST_LOOP, &optval, sizeof(optval)) == -1) { + return -1; + } + return (jint) optval; + } +} + static jint netty_epoll_linuxsocket_getTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd) { int optval; if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)) == -1) { @@ -602,6 +646,9 @@ static const JNINativeMethod fixed_method_table[] = { { "setTimeToLive", "(II)V", (void *) netty_epoll_linuxsocket_setTimeToLive }, { "getTimeToLive", "(I)I", (void *) netty_epoll_linuxsocket_getTimeToLive }, { "setInterface", "(IZ[BII)V", (void *) netty_epoll_linuxsocket_setInterface }, + { "getInterface", "(IZ)I", (void *) netty_epoll_linuxsocket_getInterface }, + { "setIpMulticastLoop", "(IZI)V", (void * ) netty_epoll_linuxsocket_setIpMulticastLoop }, + { "getIpMulticastLoop", "(IZ)I", (void * ) netty_epoll_linuxsocket_getIpMulticastLoop }, { "setTcpCork", "(II)V", (void *) netty_epoll_linuxsocket_setTcpCork }, { "setSoBusyPoll", "(II)V", (void *) netty_epoll_linuxsocket_setSoBusyPoll }, { "setTcpQuickAck", "(II)V", (void *) netty_epoll_linuxsocket_setTcpQuickAck }, diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java index e30e469497..4f646329f8 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java @@ -316,12 +316,21 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme @Override public boolean isLoopbackModeDisabled() { - return false; + try { + return ((EpollDatagramChannel) channel).socket.isLoopbackModeDisabled(); + } catch (IOException e) { + throw new ChannelException(e); + } } @Override public DatagramChannelConfig setLoopbackModeDisabled(boolean loopbackModeDisabled) { - throw new UnsupportedOperationException("Multicast not supported"); + try { + ((EpollDatagramChannel) channel).socket.setLoopbackModeDisabled(loopbackModeDisabled); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } } @Override @@ -345,7 +354,11 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme @Override public InetAddress getInterface() { - return null; + try { + return ((EpollDatagramChannel) channel).socket.getInterface(); + } catch (IOException e) { + throw new ChannelException(e); + } } @Override @@ -360,7 +373,11 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme @Override public NetworkInterface getNetworkInterface() { - return null; + try { + return ((EpollDatagramChannel) channel).socket.getNetworkInterface(); + } catch (IOException e) { + throw new ChannelException(e); + } } @Override diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java index 6848c39217..34648ac5d2 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java @@ -17,11 +17,13 @@ package io.netty.channel.epoll; import io.netty.channel.ChannelException; import io.netty.channel.DefaultFileRegion; +import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.unix.NativeInetAddress; import io.netty.channel.unix.PeerCredentials; import io.netty.channel.unix.Socket; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.SocketUtils; import java.io.IOException; import java.net.InetAddress; @@ -71,6 +73,41 @@ final class LinuxSocket extends Socket { setInterface(intValue(), ipv6, nativeAddress.address(), nativeAddress.scopeId(), interfaceIndex(netInterface)); } + InetAddress getInterface() throws IOException { + NetworkInterface inf = getNetworkInterface(); + if (inf != null) { + Enumeration addresses = SocketUtils.addressesFromNetworkInterface(inf); + if (addresses.hasMoreElements()) { + return addresses.nextElement(); + } + } + return null; + } + + NetworkInterface getNetworkInterface() throws IOException { + int ret = getInterface(intValue(), ipv6); + if (ipv6) { + return PlatformDependent.javaVersion() >= 7 ? NetworkInterface.getByIndex(ret) : null; + } + InetAddress address = inetAddress(ret); + return address != null ? NetworkInterface.getByInetAddress(address) : null; + } + + private static InetAddress inetAddress(int value) { + byte[] var1 = { + (byte) (value >>> 24 & 255), + (byte) (value >>> 16 & 255), + (byte) (value >>> 8 & 255), + (byte) (value & 255) + }; + + try { + return InetAddress.getByAddress(var1); + } catch (UnknownHostException ignore) { + return null; + } + } + void joinGroup(InetAddress group, NetworkInterface netInterface, InetAddress source) throws IOException { final NativeInetAddress g = NativeInetAddress.newInstance(group); final boolean isIpv6 = group instanceof Inet6Address; @@ -239,6 +276,14 @@ final class LinuxSocket extends Socket { return getPeerCredentials(intValue()); } + boolean isLoopbackModeDisabled() throws IOException { + return getIpMulticastLoop(intValue(), ipv6) == 0; + } + + void setLoopbackModeDisabled(boolean loopbackModeDisabled) throws IOException { + setIpMulticastLoop(intValue(), ipv6, loopbackModeDisabled ? 0 : 1); + } + long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException { // Open the file-region as it may be created via the lazy constructor. This is needed as we directly access // the FileChannel field via JNI. @@ -340,5 +385,8 @@ final class LinuxSocket extends Socket { int fd, boolean ipv6, byte[] address, int scopeId, byte[] key) throws IOException; private static native void setInterface( int fd, boolean ipv6, byte[] interfaceAddress, int scopeId, int networkInterfaceIndex) throws IOException; + private static native int getInterface(int fd, boolean ipv6); + private static native int getIpMulticastLoop(int fd, boolean ipv6) throws IOException; + private static native void setIpMulticastLoop(int fd, boolean ipv6, int enabled) throws IOException; private static native void setTimeToLive(int fd, int ttl) throws IOException; }