Add support for loopbackmode and accessing the configured interface when using epoll native transport with multicast (#9218)

Motivation:

We did not have support for enable / disable loopback mode in our native epoll transport and also missed the implemention to access the configured interface.

Modifications:

Add implementation and adjust test to cover it

Result:

More complete multicast support with native epoll transport
This commit is contained in:
Norman Maurer 2019-06-07 13:44:06 -07:00
parent c27fbb5ff2
commit 9d5420987a
4 changed files with 144 additions and 7 deletions

View File

@ -17,7 +17,6 @@ package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
@ -71,12 +70,16 @@ public class DatagramMulticastTest extends AbstractDatagramTest {
cb.option(ChannelOption.IP_MULTICAST_IF, iface); cb.option(ChannelOption.IP_MULTICAST_IF, iface);
cb.option(ChannelOption.SO_REUSEADDR, true); cb.option(ChannelOption.SO_REUSEADDR, true);
Channel sc = sb.bind(newSocketAddress(iface)).sync().channel(); DatagramChannel sc = (DatagramChannel) sb.bind(newSocketAddress(iface)).sync().channel();
assertEquals(iface, sc.config().getNetworkInterface());
assertInterfaceAddress(iface, sc.config().getInterface());
InetSocketAddress addr = (InetSocketAddress) sc.localAddress(); InetSocketAddress addr = sc.localAddress();
cb.localAddress(addr.getPort()); cb.localAddress(addr.getPort());
DatagramChannel cc = (DatagramChannel) cb.bind().sync().channel(); DatagramChannel cc = (DatagramChannel) cb.bind().sync().channel();
assertEquals(iface, cc.config().getNetworkInterface());
assertInterfaceAddress(iface, cc.config().getInterface());
InetSocketAddress groupAddress = SocketUtils.socketAddress(groupAddress(), addr.getPort()); InetSocketAddress groupAddress = SocketUtils.socketAddress(groupAddress(), addr.getPort());
@ -95,10 +98,32 @@ public class DatagramMulticastTest extends AbstractDatagramTest {
sc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), groupAddress)).sync(); sc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), groupAddress)).sync();
mhandler.await(); mhandler.await();
cc.config().setLoopbackModeDisabled(false);
sc.config().setLoopbackModeDisabled(false);
assertFalse(cc.config().isLoopbackModeDisabled());
assertFalse(sc.config().isLoopbackModeDisabled());
cc.config().setLoopbackModeDisabled(true);
sc.config().setLoopbackModeDisabled(true);
assertTrue(cc.config().isLoopbackModeDisabled());
assertTrue(sc.config().isLoopbackModeDisabled());
sc.close().awaitUninterruptibly(); sc.close().awaitUninterruptibly();
cc.close().awaitUninterruptibly(); cc.close().awaitUninterruptibly();
} }
private static void assertInterfaceAddress(NetworkInterface networkInterface, InetAddress expected) {
Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
if (expected.equals(addresses.nextElement())) {
return;
}
}
fail();
}
private static final class MulticastTestHandler extends SimpleChannelInboundHandler<DatagramPacket> { private static final class MulticastTestHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private final CountDownLatch latch = new CountDownLatch(1); private final CountDownLatch latch = new CountDownLatch(1);

View File

@ -68,6 +68,16 @@ static void netty_epoll_linuxsocket_setTimeToLive(JNIEnv* env, jclass clazz, jin
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval)); netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval));
} }
static void netty_epoll_linuxsocket_setIpMulticastLoop(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jint optval) {
if (ipv6 == JNI_TRUE) {
u_int val = (u_int) optval;
netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &val, sizeof(val));
} else {
u_char val = (u_char) optval;
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_MULTICAST_LOOP, &val, sizeof(val));
}
}
static void netty_epoll_linuxsocket_setInterface(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex) { static void netty_epoll_linuxsocket_setInterface(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex) {
struct sockaddr_storage interfaceAddr; struct sockaddr_storage interfaceAddr;
socklen_t interfaceAddrSize; socklen_t interfaceAddrSize;
@ -392,6 +402,23 @@ static void netty_epoll_linuxsocket_setTcpMd5Sig(JNIEnv* env, jclass clazz, jint
} }
} }
static int netty_epoll_linuxsocket_getInterface(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6) {
if (ipv6 == JNI_TRUE) {
int optval;
if (netty_unix_socket_getOption(env, fd, IPPROTO_IPV6, IPV6_MULTICAST_IF, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
} else {
struct in_addr optval;
if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_MULTICAST_IF, &optval, sizeof(optval)) == -1) {
return -1;
}
return ntohl(optval.s_addr);
}
}
static jint netty_epoll_linuxsocket_getTimeToLive(JNIEnv* env, jclass clazz, jint fd) { static jint netty_epoll_linuxsocket_getTimeToLive(JNIEnv* env, jclass clazz, jint fd) {
int optval; int optval;
if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval)) == -1) { if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval)) == -1) {
@ -400,6 +427,23 @@ static jint netty_epoll_linuxsocket_getTimeToLive(JNIEnv* env, jclass clazz, jin
return optval; return optval;
} }
static jint netty_epoll_linuxsocket_getIpMulticastLoop(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6) {
if (ipv6 == JNI_TRUE) {
u_int optval;
if (netty_unix_socket_getOption(env, fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &optval, sizeof(optval)) == -1) {
return -1;
}
return (jint) optval;
} else {
u_char optval;
if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_MULTICAST_LOOP, &optval, sizeof(optval)) == -1) {
return -1;
}
return (jint) optval;
}
}
static jint netty_epoll_linuxsocket_getTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd) { static jint netty_epoll_linuxsocket_getTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd) {
int optval; int optval;
if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)) == -1) { if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)) == -1) {
@ -602,6 +646,9 @@ static const JNINativeMethod fixed_method_table[] = {
{ "setTimeToLive", "(II)V", (void *) netty_epoll_linuxsocket_setTimeToLive }, { "setTimeToLive", "(II)V", (void *) netty_epoll_linuxsocket_setTimeToLive },
{ "getTimeToLive", "(I)I", (void *) netty_epoll_linuxsocket_getTimeToLive }, { "getTimeToLive", "(I)I", (void *) netty_epoll_linuxsocket_getTimeToLive },
{ "setInterface", "(IZ[BII)V", (void *) netty_epoll_linuxsocket_setInterface }, { "setInterface", "(IZ[BII)V", (void *) netty_epoll_linuxsocket_setInterface },
{ "getInterface", "(IZ)I", (void *) netty_epoll_linuxsocket_getInterface },
{ "setIpMulticastLoop", "(IZI)V", (void * ) netty_epoll_linuxsocket_setIpMulticastLoop },
{ "getIpMulticastLoop", "(IZ)I", (void * ) netty_epoll_linuxsocket_getIpMulticastLoop },
{ "setTcpCork", "(II)V", (void *) netty_epoll_linuxsocket_setTcpCork }, { "setTcpCork", "(II)V", (void *) netty_epoll_linuxsocket_setTcpCork },
{ "setSoBusyPoll", "(II)V", (void *) netty_epoll_linuxsocket_setSoBusyPoll }, { "setSoBusyPoll", "(II)V", (void *) netty_epoll_linuxsocket_setSoBusyPoll },
{ "setTcpQuickAck", "(II)V", (void *) netty_epoll_linuxsocket_setTcpQuickAck }, { "setTcpQuickAck", "(II)V", (void *) netty_epoll_linuxsocket_setTcpQuickAck },

View File

@ -316,12 +316,21 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
@Override @Override
public boolean isLoopbackModeDisabled() { public boolean isLoopbackModeDisabled() {
return false; try {
return ((EpollDatagramChannel) channel).socket.isLoopbackModeDisabled();
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public DatagramChannelConfig setLoopbackModeDisabled(boolean loopbackModeDisabled) { public DatagramChannelConfig setLoopbackModeDisabled(boolean loopbackModeDisabled) {
throw new UnsupportedOperationException("Multicast not supported"); try {
((EpollDatagramChannel) channel).socket.setLoopbackModeDisabled(loopbackModeDisabled);
return this;
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
@ -345,7 +354,11 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
@Override @Override
public InetAddress getInterface() { public InetAddress getInterface() {
return null; try {
return ((EpollDatagramChannel) channel).socket.getInterface();
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
@ -360,7 +373,11 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
@Override @Override
public NetworkInterface getNetworkInterface() { public NetworkInterface getNetworkInterface() {
return null; try {
return ((EpollDatagramChannel) channel).socket.getNetworkInterface();
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override

View File

@ -17,11 +17,13 @@ package io.netty.channel.epoll;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.DefaultFileRegion; import io.netty.channel.DefaultFileRegion;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.unix.NativeInetAddress; import io.netty.channel.unix.NativeInetAddress;
import io.netty.channel.unix.PeerCredentials; import io.netty.channel.unix.PeerCredentials;
import io.netty.channel.unix.Socket; import io.netty.channel.unix.Socket;
import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SocketUtils;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
@ -71,6 +73,41 @@ final class LinuxSocket extends Socket {
setInterface(intValue(), ipv6, nativeAddress.address(), nativeAddress.scopeId(), interfaceIndex(netInterface)); setInterface(intValue(), ipv6, nativeAddress.address(), nativeAddress.scopeId(), interfaceIndex(netInterface));
} }
InetAddress getInterface() throws IOException {
NetworkInterface inf = getNetworkInterface();
if (inf != null) {
Enumeration<InetAddress> addresses = SocketUtils.addressesFromNetworkInterface(inf);
if (addresses.hasMoreElements()) {
return addresses.nextElement();
}
}
return null;
}
NetworkInterface getNetworkInterface() throws IOException {
int ret = getInterface(intValue(), ipv6);
if (ipv6) {
return PlatformDependent.javaVersion() >= 7 ? NetworkInterface.getByIndex(ret) : null;
}
InetAddress address = inetAddress(ret);
return address != null ? NetworkInterface.getByInetAddress(address) : null;
}
private static InetAddress inetAddress(int value) {
byte[] var1 = {
(byte) (value >>> 24 & 255),
(byte) (value >>> 16 & 255),
(byte) (value >>> 8 & 255),
(byte) (value & 255)
};
try {
return InetAddress.getByAddress(var1);
} catch (UnknownHostException ignore) {
return null;
}
}
void joinGroup(InetAddress group, NetworkInterface netInterface, InetAddress source) throws IOException { void joinGroup(InetAddress group, NetworkInterface netInterface, InetAddress source) throws IOException {
final NativeInetAddress g = NativeInetAddress.newInstance(group); final NativeInetAddress g = NativeInetAddress.newInstance(group);
final boolean isIpv6 = group instanceof Inet6Address; final boolean isIpv6 = group instanceof Inet6Address;
@ -239,6 +276,14 @@ final class LinuxSocket extends Socket {
return getPeerCredentials(intValue()); return getPeerCredentials(intValue());
} }
boolean isLoopbackModeDisabled() throws IOException {
return getIpMulticastLoop(intValue(), ipv6) == 0;
}
void setLoopbackModeDisabled(boolean loopbackModeDisabled) throws IOException {
setIpMulticastLoop(intValue(), ipv6, loopbackModeDisabled ? 0 : 1);
}
long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException { long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException {
// Open the file-region as it may be created via the lazy constructor. This is needed as we directly access // Open the file-region as it may be created via the lazy constructor. This is needed as we directly access
// the FileChannel field via JNI. // the FileChannel field via JNI.
@ -340,5 +385,8 @@ final class LinuxSocket extends Socket {
int fd, boolean ipv6, byte[] address, int scopeId, byte[] key) throws IOException; int fd, boolean ipv6, byte[] address, int scopeId, byte[] key) throws IOException;
private static native void setInterface( private static native void setInterface(
int fd, boolean ipv6, byte[] interfaceAddress, int scopeId, int networkInterfaceIndex) throws IOException; int fd, boolean ipv6, byte[] interfaceAddress, int scopeId, int networkInterfaceIndex) throws IOException;
private static native int getInterface(int fd, boolean ipv6);
private static native int getIpMulticastLoop(int fd, boolean ipv6) throws IOException;
private static native void setIpMulticastLoop(int fd, boolean ipv6, int enabled) throws IOException;
private static native void setTimeToLive(int fd, int ttl) throws IOException; private static native void setTimeToLive(int fd, int ttl) throws IOException;
} }