diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractDatagramTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractDatagramTest.java index 9c4a5f0d1e..ebf597b55d 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractDatagramTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractDatagramTest.java @@ -35,7 +35,7 @@ public abstract class AbstractDatagramTest extends AbstractComboTestsuiteTest> newFactories() { - return SocketTestPermutation.INSTANCE.datagram(InternetProtocolFamily.IPv4); + return SocketTestPermutation.INSTANCE.datagram(internetProtocolFamily()); } @Override diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java index 154e80554d..692e82d5d5 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java @@ -25,20 +25,15 @@ import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.testsuite.transport.TestsuitePermutation; -import io.netty.util.NetUtil; import io.netty.util.internal.SocketUtils; import org.junit.Assume; -import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.net.Inet4Address; -import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.MulticastSocket; import java.net.NetworkInterface; -import java.net.SocketException; import java.net.UnknownHostException; import java.util.Enumeration; import java.util.List; @@ -72,6 +67,7 @@ public class DatagramMulticastTest extends AbstractDatagramTest { sb.option(ChannelOption.IP_MULTICAST_IF, iface); sb.option(ChannelOption.SO_REUSEADDR, true); + cb.option(ChannelOption.IP_MULTICAST_IF, iface); cb.option(ChannelOption.SO_REUSEADDR, true); diff --git a/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c b/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c index 4a424ee305..8169230d63 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c +++ b/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c @@ -64,6 +64,32 @@ static jfieldID fdFieldId = NULL; static jfieldID fileDescriptorFieldId = NULL; // JNI Registered Methods Begin +static void netty_epoll_linuxsocket_setTimeToLive(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval)); +} + +static void netty_epoll_linuxsocket_setInterface(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex) { + struct sockaddr_storage interfaceAddr; + socklen_t interfaceAddrSize; + struct sockaddr_in* interfaceIpAddr; + + if (ipv6 == JNI_TRUE) { + if (interfaceIndex == -1) { + netty_unix_errors_throwIOException(env, "Unable to find network index"); + return; + } + netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_MULTICAST_IF, &interfaceIndex, sizeof(interfaceIndex)); + } else { + if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr"); + return; + } + + interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr; + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_MULTICAST_IF, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr)); + } +} + static void netty_epoll_linuxsocket_setTcpCork(JNIEnv* env, jclass clazz, jint fd, jint optval) { netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval)); } @@ -120,10 +146,217 @@ static void netty_epoll_linuxsocket_setSoBusyPoll(JNIEnv* env, jclass clazz, jin netty_unix_socket_setOption(env, fd, SOL_SOCKET, SO_BUSY_POLL, &optval, sizeof(optval)); } -static void netty_epoll_linuxsocket_setTcpMd5Sig(JNIEnv* env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jbyteArray key) { +static void netty_epoll_linuxsocket_joinGroup(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex) { + struct sockaddr_storage groupAddr; + socklen_t groupAddrSize; + struct sockaddr_storage interfaceAddr; + socklen_t interfaceAddrSize; + struct sockaddr_in* groupIpAddr; + struct sockaddr_in* interfaceIpAddr; + struct ip_mreq mreq; + + struct sockaddr_in6* groupIp6Addr; + struct ipv6_mreq mreq6; + + if (netty_unix_socket_initSockaddr(env, ipv6, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for groupAddress"); + return; + } + + switch (groupAddr.ss_family) { + case AF_INET: + if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for interfaceAddr"); + return; + } + + interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr; + groupIpAddr = (struct sockaddr_in*) &groupAddr; + + memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr)); + memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr)); + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + break; + case AF_INET6: + if (interfaceIndex == -1) { + netty_unix_errors_throwIOException(env, "Unable to find network index"); + return; + } + mreq6.ipv6mr_interface = interfaceIndex; + + groupIp6Addr = (struct sockaddr_in6*) &groupAddr; + memcpy(&mreq6.ipv6mr_multiaddr, &groupIp6Addr->sin6_addr, sizeof(groupIp6Addr->sin6_addr)); + netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mreq6, sizeof(mreq6)); + break; + default: + netty_unix_errors_throwIOException(env, "Address family not supported"); + break; + } +} + +static void netty_epoll_linuxsocket_joinSsmGroup(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex, jbyteArray sourceAddress) { + struct sockaddr_storage groupAddr; + socklen_t groupAddrSize; + struct sockaddr_storage interfaceAddr; + socklen_t interfaceAddrSize; + struct sockaddr_storage sourceAddr; + socklen_t sourceAddrSize; + struct sockaddr_in* groupIpAddr; + struct sockaddr_in* interfaceIpAddr; + struct sockaddr_in* sourceIpAddr; + struct ip_mreq_source mreq; + + struct group_source_req mreq6; + + if (netty_unix_socket_initSockaddr(env, ipv6, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for groupAddress"); + return; + } + + if (netty_unix_socket_initSockaddr(env, ipv6, sourceAddress, scopeId, 0, &sourceAddr, &sourceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for sourceAddress"); + return; + } + + switch (groupAddr.ss_family) { + case AF_INET: + if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for interfaceAddress"); + return; + } + interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr; + groupIpAddr = (struct sockaddr_in*) &groupAddr; + sourceIpAddr = (struct sockaddr_in*) &sourceAddr; + memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr)); + memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr)); + memcpy(&mreq.imr_sourceaddr, &sourceIpAddr->sin_addr, sizeof(sourceIpAddr->sin_addr)); + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq)); + break; + case AF_INET6: + if (interfaceIndex == -1) { + netty_unix_errors_throwIOException(env, "Unable to find network index"); + return; + } + mreq6.gsr_group = groupAddr; + mreq6.gsr_interface = interfaceIndex; + mreq6.gsr_source = sourceAddr; + netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, MCAST_JOIN_SOURCE_GROUP, &mreq6, sizeof(mreq6)); + break; + default: + netty_unix_errors_throwIOException(env, "Address family not supported"); + break; + } +} + +static void netty_epoll_linuxsocket_leaveGroup(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex) { + struct sockaddr_storage groupAddr; + socklen_t groupAddrSize; + + struct sockaddr_storage interfaceAddr; + socklen_t interfaceAddrSize; + struct sockaddr_in* groupIpAddr; + struct sockaddr_in* interfaceIpAddr; + struct ip_mreq mreq; + + struct sockaddr_in6* groupIp6Addr; + struct ipv6_mreq mreq6; + + if (netty_unix_socket_initSockaddr(env, ipv6, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for groupAddress"); + return; + } + + switch (groupAddr.ss_family) { + case AF_INET: + if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for interfaceAddress"); + return; + } + interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr; + groupIpAddr = (struct sockaddr_in*) &groupAddr; + + memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr)); + memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr)); + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mreq, sizeof(mreq)); + break; + case AF_INET6: + if (interfaceIndex == -1) { + netty_unix_errors_throwIOException(env, "Unable to find network index"); + return; + } + mreq6.ipv6mr_interface = interfaceIndex; + + groupIp6Addr = (struct sockaddr_in6*) &groupAddr; + memcpy(&mreq6.ipv6mr_multiaddr, &groupIp6Addr->sin6_addr, sizeof(groupIp6Addr->sin6_addr)); + netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_LEAVE_GROUP, &mreq6, sizeof(mreq6)); + break; + default: + netty_unix_errors_throwIOException(env, "Address family not supported"); + break; + } +} + +static void netty_epoll_linuxsocket_leaveSsmGroup(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex, jbyteArray sourceAddress) { + struct sockaddr_storage groupAddr; + socklen_t groupAddrSize; + struct sockaddr_storage interfaceAddr; + socklen_t interfaceAddrSize; + struct sockaddr_storage sourceAddr; + socklen_t sourceAddrSize; + struct sockaddr_in* groupIpAddr; + struct sockaddr_in* interfaceIpAddr; + struct sockaddr_in* sourceIpAddr; + + struct ip_mreq_source mreq; + struct group_source_req mreq6; + + + if (netty_unix_socket_initSockaddr(env, ipv6, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for groupAddress"); + return; + } + + if (netty_unix_socket_initSockaddr(env, ipv6, sourceAddress, scopeId, 0, &sourceAddr, &sourceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for sourceAddress"); + return; + } + + switch (groupAddr.ss_family) { + case AF_INET: + if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for interfaceAddress"); + return; + } + interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr; + + groupIpAddr = (struct sockaddr_in*) &groupAddr; + sourceIpAddr = (struct sockaddr_in*) &sourceAddr; + memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr)); + memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr)); + memcpy(&mreq.imr_sourceaddr, &sourceIpAddr->sin_addr, sizeof(sourceIpAddr->sin_addr)); + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_DROP_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq)); + break; + case AF_INET6: + if (interfaceIndex == -1) { + netty_unix_errors_throwIOException(env, "Unable to find network index"); + return; + } + + mreq6.gsr_group = groupAddr; + mreq6.gsr_interface = interfaceIndex; + mreq6.gsr_source = sourceAddr; + netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, MCAST_LEAVE_SOURCE_GROUP, &mreq6, sizeof(mreq6)); + break; + default: + netty_unix_errors_throwIOException(env, "Address family not supported"); + break; + } +} + +static void netty_epoll_linuxsocket_setTcpMd5Sig(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray address, jint scopeId, jbyteArray key) { struct sockaddr_storage addr; socklen_t addrSize; - if (netty_unix_socket_initSockaddr(env, address, scopeId, 0, &addr, &addrSize) == -1) { + if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, 0, &addr, &addrSize) == -1) { netty_unix_errors_throwIOException(env, "Could not init sockaddr"); return; } @@ -159,6 +392,14 @@ static void netty_epoll_linuxsocket_setTcpMd5Sig(JNIEnv* env, jclass clazz, jint } } +static jint netty_epoll_linuxsocket_getTimeToLive(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + static jint netty_epoll_linuxsocket_getTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd) { int optval; if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)) == -1) { @@ -358,6 +599,9 @@ static jlong netty_epoll_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd // JNI Method Registration Table Begin static const JNINativeMethod fixed_method_table[] = { + { "setTimeToLive", "(II)V", (void *) netty_epoll_linuxsocket_setTimeToLive }, + { "getTimeToLive", "(I)I", (void *) netty_epoll_linuxsocket_getTimeToLive }, + { "setInterface", "(IZ[BII)V", (void *) netty_epoll_linuxsocket_setInterface }, { "setTcpCork", "(II)V", (void *) netty_epoll_linuxsocket_setTcpCork }, { "setSoBusyPoll", "(II)V", (void *) netty_epoll_linuxsocket_setSoBusyPoll }, { "setTcpQuickAck", "(II)V", (void *) netty_epoll_linuxsocket_setTcpQuickAck }, @@ -386,7 +630,11 @@ static const JNINativeMethod fixed_method_table[] = { { "isIpTransparent", "(I)I", (void *) netty_epoll_linuxsocket_isIpTransparent }, { "isIpRecvOrigDestAddr", "(I)I", (void *) netty_epoll_linuxsocket_isIpRecvOrigDestAddr }, { "getTcpInfo", "(I[J)V", (void *) netty_epoll_linuxsocket_getTcpInfo }, - { "setTcpMd5Sig", "(I[BI[B)V", (void *) netty_epoll_linuxsocket_setTcpMd5Sig } + { "setTcpMd5Sig", "(IZ[BI[B)V", (void *) netty_epoll_linuxsocket_setTcpMd5Sig }, + { "joinGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_joinGroup }, + { "joinSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_joinSsmGroup }, + { "leaveGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_leaveGroup }, + { "leaveSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_leaveSsmGroup } // "sendFile" has a dynamic signature }; diff --git a/transport-native-epoll/src/main/c/netty_epoll_native.c b/transport-native-epoll/src/main/c/netty_epoll_native.c index 96e55e1ecc..9ac47f5cba 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_native.c +++ b/transport-native-epoll/src/main/c/netty_epoll_native.c @@ -265,7 +265,7 @@ static jint netty_epoll_native_epollCtlDel0(JNIEnv* env, jclass clazz, jint efd, return res; } -static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jobjectArray packets, jint offset, jint len) { +static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobjectArray packets, jint offset, jint len) { struct mmsghdr msg[len]; struct sockaddr_storage addr[len]; socklen_t addrSize; @@ -280,7 +280,7 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, job jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId); jint port = (*env)->GetIntField(env, packet, packetPortFieldId); - if (netty_unix_socket_initSockaddr(env, address, scopeId, port, &addr[i], &addrSize) == -1) { + if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr[i], &addrSize) == -1) { return -1; } @@ -435,7 +435,7 @@ static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) { char* dynamicTypeName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/epoll/NativeDatagramPacketArray$NativeDatagramPacket;II)I"); JNINativeMethod* dynamicMethod = &dynamicMethods[fixed_method_table_size]; dynamicMethod->name = "sendmmsg0"; - dynamicMethod->signature = netty_unix_util_prepend("(I[L", dynamicTypeName); + dynamicMethod->signature = netty_unix_util_prepend("(IZ[L", dynamicTypeName); dynamicMethod->fnPtr = (void *) netty_epoll_native_sendmmsg0; free(dynamicTypeName); return dynamicMethods; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 8a7835edb3..74917c95ac 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -28,19 +28,21 @@ import io.netty.channel.EventLoop; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramPacket; +import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.unix.DatagramSocketAddress; import io.netty.channel.unix.Errors; import io.netty.channel.unix.IovArray; +import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannelUtil; import io.netty.util.internal.StringUtil; import java.io.IOException; +import java.net.Inet4Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.PortUnreachableException; import java.net.SocketAddress; -import java.net.SocketException; import java.nio.ByteBuffer; import static io.netty.channel.epoll.LinuxSocket.newSocketDgram; @@ -59,20 +61,47 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements StringUtil.simpleClassName(InetSocketAddress.class) + ">, " + StringUtil.simpleClassName(ByteBuf.class) + ')'; + final InternetProtocolFamily family; private final EpollDatagramChannelConfig config; private volatile boolean connected; + /** + * Create a new instance which selects the {@link InternetProtocolFamily} to use depending + * on the Operation Systems default which will be chosen. + */ public EpollDatagramChannel(EventLoop eventLoop) { - super(eventLoop, newSocketDgram()); + this(eventLoop, null); + } + + /** + * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend + * on the Operation Systems default which will be chosen. + */ + public EpollDatagramChannel(EventLoop eventLoop, InternetProtocolFamily family) { + super(eventLoop, family == null ? + newSocketDgram(Socket.isIPv6Preferred()) : newSocketDgram(family == InternetProtocolFamily.IPv6)); + this.family = internetProtocolFamily(family); config = new EpollDatagramChannelConfig(this); } - public EpollDatagramChannel(EventLoop eventLoop, int fd) { - this(eventLoop, new LinuxSocket(fd)); + private static InternetProtocolFamily internetProtocolFamily(InternetProtocolFamily family) { + if (family == null) { + return Socket.isIPv6Preferred() ? InternetProtocolFamily.IPv6 : InternetProtocolFamily.IPv4; + } + return family; } - EpollDatagramChannel(EventLoop eventLoop, LinuxSocket fd) { + /** + * Create a new instance which selects the {@link InternetProtocolFamily} to use depending + * on the Operation Systems default which will be chosen. + */ + public EpollDatagramChannel(EventLoop eventLoop, int fd) { + this(eventLoop, new LinuxSocket(fd), null); + } + + private EpollDatagramChannel(EventLoop eventLoop, LinuxSocket fd, InternetProtocolFamily family) { super(null, eventLoop, fd, true); + this.family = internetProtocolFamily(family); config = new EpollDatagramChannelConfig(this); } @@ -113,7 +142,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements return joinGroup( multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise); - } catch (SocketException e) { + } catch (IOException e) { promise.setFailure(e); } return promise; @@ -145,7 +174,12 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements requireNonNull(multicastAddress, "multicastAddress"); requireNonNull(networkInterface, "networkInterface"); - promise.setFailure(new UnsupportedOperationException("Multicast not supported")); + try { + socket.joinGroup(multicastAddress, networkInterface, source); + promise.setSuccess(); + } catch (IOException e) { + promise.setFailure(e); + } return promise; } @@ -159,7 +193,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements try { return leaveGroup( multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise); - } catch (SocketException e) { + } catch (IOException e) { promise.setFailure(e); } return promise; @@ -191,8 +225,12 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements requireNonNull(multicastAddress, "multicastAddress"); requireNonNull(networkInterface, "networkInterface"); - promise.setFailure(new UnsupportedOperationException("Multicast not supported")); - + try { + socket.leaveGroup(multicastAddress, networkInterface, source); + promise.setSuccess(); + } catch (IOException e) { + promise.setFailure(e); + } return promise; } @@ -240,6 +278,13 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements @Override protected void doBind(SocketAddress localAddress) throws Exception { + if (localAddress instanceof InetSocketAddress) { + InetSocketAddress socketAddress = (InetSocketAddress) localAddress; + if (socketAddress.getAddress().isAnyLocalAddress() && + socketAddress.getAddress() instanceof Inet4Address && Socket.isIPv6Preferred()) { + localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort()); + } + } super.doBind(localAddress); active = true; } @@ -267,7 +312,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets(); while (cnt > 0) { - int send = Native.sendmmsg(socket.intValue(), packets, offset, cnt); + int send = socket.sendmmsg(packets, offset, cnt); if (send == 0) { // Did not write all messages. setFlag(Native.EPOLLOUT); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java index 778b555fa1..02dabcbfc4 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java @@ -326,12 +326,21 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme @Override public int getTimeToLive() { - return -1; + try { + return ((EpollDatagramChannel) channel).socket.getTimeToLive(); + } catch (IOException e) { + throw new ChannelException(e); + } } @Override public EpollDatagramChannelConfig setTimeToLive(int ttl) { - throw new UnsupportedOperationException("Multicast not supported"); + try { + ((EpollDatagramChannel) channel).socket.setTimeToLive(ttl); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } } @Override @@ -341,7 +350,12 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme @Override public EpollDatagramChannelConfig setInterface(InetAddress interfaceAddress) { - throw new UnsupportedOperationException("Multicast not supported"); + try { + ((EpollDatagramChannel) channel).socket.setInterface(interfaceAddress); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } } @Override @@ -351,7 +365,13 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme @Override public EpollDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface) { - throw new UnsupportedOperationException("Multicast not supported"); + try { + EpollDatagramChannel datagramChannel = (EpollDatagramChannel) channel; + datagramChannel.socket.setNetworkInterface(networkInterface, datagramChannel.family); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } } @Override diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java index 64eff33a50..0fb0624b63 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java @@ -15,13 +15,20 @@ */ package io.netty.channel.epoll; +import io.netty.channel.ChannelException; import io.netty.channel.DefaultFileRegion; import io.netty.channel.unix.NativeInetAddress; import io.netty.channel.unix.PeerCredentials; import io.netty.channel.unix.Socket; +import io.netty.channel.socket.InternetProtocolFamily; +import io.netty.util.internal.PlatformDependent; import java.io.IOException; import java.net.InetAddress; +import java.net.Inet6Address; +import java.net.NetworkInterface; +import java.net.UnknownHostException; +import java.util.Enumeration; import static io.netty.channel.unix.Errors.ioResult; @@ -29,12 +36,77 @@ import static io.netty.channel.unix.Errors.ioResult; * A socket which provides access Linux native methods. */ final class LinuxSocket extends Socket { + static final InetAddress INET6_ANY = unsafeInetAddrByName("::"); + private static final InetAddress INET_ANY = unsafeInetAddrByName("0.0.0.0"); private static final long MAX_UINT32_T = 0xFFFFFFFFL; LinuxSocket(int fd) { super(fd); } + int sendmmsg(NativeDatagramPacketArray.NativeDatagramPacket[] msgs, + int offset, int len) throws IOException { + return Native.sendmmsg(intValue(), ipv6, msgs, offset, len); + } + + void setTimeToLive(int ttl) throws IOException { + setTimeToLive(intValue(), ttl); + } + + void setInterface(InetAddress address) throws IOException { + final NativeInetAddress a = NativeInetAddress.newInstance(address); + setInterface(intValue(), ipv6, a.address(), a.scopeId(), interfaceIndex(address)); + } + + void setNetworkInterface(NetworkInterface netInterface, InternetProtocolFamily family) throws IOException { + InetAddress address = deriveInetAddress(netInterface, family == InternetProtocolFamily.IPv6); + if (address.equals(family == InternetProtocolFamily.IPv4 ? INET_ANY : INET6_ANY)) { + throw new IOException("NetworkInterface does not support " + family); + } + final NativeInetAddress nativeAddress = NativeInetAddress.newInstance(address); + setInterface(intValue(), ipv6, nativeAddress.address(), nativeAddress.scopeId(), interfaceIndex(netInterface)); + } + + void joinGroup(InetAddress group, NetworkInterface netInterface, InetAddress source) throws IOException { + final NativeInetAddress g = NativeInetAddress.newInstance(group); + final boolean isIpv6 = group instanceof Inet6Address; + final NativeInetAddress i = NativeInetAddress.newInstance(deriveInetAddress(netInterface, isIpv6)); + if (source != null) { + final NativeInetAddress s = NativeInetAddress.newInstance(source); + joinSsmGroup(intValue(), ipv6, g.address(), i.address(), + g.scopeId(), interfaceIndex(netInterface), s.address()); + } else { + joinGroup(intValue(), ipv6, g.address(), i.address(), g.scopeId(), interfaceIndex(netInterface)); + } + } + + void leaveGroup(InetAddress group, NetworkInterface netInterface, InetAddress source) throws IOException { + final NativeInetAddress g = NativeInetAddress.newInstance(group); + final boolean isIpv6 = group instanceof Inet6Address; + final NativeInetAddress i = NativeInetAddress.newInstance(deriveInetAddress(netInterface, isIpv6)); + if (source != null) { + final NativeInetAddress s = NativeInetAddress.newInstance(source); + leaveSsmGroup(intValue(), ipv6, g.address(), i.address(), + g.scopeId(), interfaceIndex(netInterface), s.address()); + } else { + leaveGroup(intValue(), ipv6, g.address(), i.address(), g.scopeId(), interfaceIndex(netInterface)); + } + } + + private static int interfaceIndex(NetworkInterface networkInterface) { + return PlatformDependent.javaVersion() >= 7 ? networkInterface.getIndex() : -1; + } + + private static int interfaceIndex(InetAddress address) throws IOException { + if (PlatformDependent.javaVersion() >= 7) { + NetworkInterface iface = NetworkInterface.getByInetAddress(address); + if (iface != null) { + return iface.getIndex(); + } + } + return -1; + } + void setTcpDeferAccept(int deferAccept) throws IOException { setTcpDeferAccept(intValue(), deferAccept); } @@ -98,13 +170,17 @@ final class LinuxSocket extends Socket { setIpRecvOrigDestAddr(intValue(), enabled ? 1 : 0); } + int getTimeToLive() throws IOException { + return getTimeToLive(intValue()); + } + void getTcpInfo(EpollTcpInfo info) throws IOException { getTcpInfo(intValue(), info.info); } void setTcpMd5Sig(InetAddress address, byte[] key) throws IOException { final NativeInetAddress a = NativeInetAddress.newInstance(address); - setTcpMd5Sig(intValue(), a.address(), a.scopeId(), key); + setTcpMd5Sig(intValue(), ipv6, a.address(), a.scopeId(), key); } boolean isTcpCork() throws IOException { @@ -171,18 +247,57 @@ final class LinuxSocket extends Socket { return ioResult("sendfile", (int) res); } + private static InetAddress deriveInetAddress(NetworkInterface netInterface, boolean ipv6) { + final InetAddress ipAny = ipv6 ? INET6_ANY : INET_ANY; + if (netInterface != null) { + final Enumeration ias = netInterface.getInetAddresses(); + while (ias.hasMoreElements()) { + final InetAddress ia = ias.nextElement(); + final boolean isV6 = ia instanceof Inet6Address; + if (isV6 == ipv6) { + return ia; + } + } + } + return ipAny; + } + + public static LinuxSocket newSocketStream(boolean ipv6) { + return new LinuxSocket(newSocketStream0(ipv6)); + } + public static LinuxSocket newSocketStream() { - return new LinuxSocket(newSocketStream0()); + return newSocketStream(isIPv6Preferred()); + } + + public static LinuxSocket newSocketDgram(boolean ipv6) { + return new LinuxSocket(newSocketDgram0(ipv6)); } public static LinuxSocket newSocketDgram() { - return new LinuxSocket(newSocketDgram0()); + return newSocketDgram(isIPv6Preferred()); } public static LinuxSocket newSocketDomain() { return new LinuxSocket(newSocketDomain0()); } + private static InetAddress unsafeInetAddrByName(String inetName) { + try { + return InetAddress.getByName(inetName); + } catch (UnknownHostException uhe) { + throw new ChannelException(uhe); + } + } + + private static native void joinGroup(int fd, boolean ipv6, byte[] group, byte[] interfaceAddress, + int scopeId, int interfaceIndex) throws IOException; + private static native void joinSsmGroup(int fd, boolean ipv6, byte[] group, byte[] interfaceAddress, + int scopeId, int interfaceIndex, byte[] source) throws IOException; + private static native void leaveGroup(int fd, boolean ipv6, byte[] group, byte[] interfaceAddress, + int scopeId, int interfaceIndex) throws IOException; + private static native void leaveSsmGroup(int fd, boolean ipv6, byte[] group, byte[] interfaceAddress, + int scopeId, int interfaceIndex, byte[] source) throws IOException; private static native long sendFile(int socketFd, DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException; @@ -195,6 +310,7 @@ final class LinuxSocket extends Socket { private static native int getTcpKeepIntvl(int fd) throws IOException; private static native int getTcpKeepCnt(int fd) throws IOException; private static native int getTcpUserTimeout(int fd) throws IOException; + private static native int getTimeToLive(int fd) throws IOException; private static native int isIpFreeBind(int fd) throws IOException; private static native int isIpTransparent(int fd) throws IOException; private static native int isIpRecvOrigDestAddr(int fd) throws IOException; @@ -216,5 +332,9 @@ final class LinuxSocket extends Socket { private static native void setIpFreeBind(int fd, int freeBind) throws IOException; private static native void setIpTransparent(int fd, int transparent) throws IOException; private static native void setIpRecvOrigDestAddr(int fd, int transparent) throws IOException; - private static native void setTcpMd5Sig(int fd, byte[] address, int scopeId, byte[] key) throws IOException; + private static native void setTcpMd5Sig( + int fd, boolean ipv6, byte[] address, int scopeId, byte[] key) throws IOException; + private static native void setInterface( + int fd, boolean ipv6, byte[] interfaceAddress, int scopeId, int networkInterfaceIndex) throws IOException; + private static native void setTimeToLive(int fd, int ttl) throws IOException; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 971b7d721c..fed3e81e5e 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -152,9 +152,15 @@ public final class Native { private static native int splice0(int fd, long offIn, int fdOut, long offOut, long len); - public static int sendmmsg( - int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len) throws IOException { - int res = sendmmsg0(fd, msgs, offset, len); + @Deprecated + public static int sendmmsg(int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, + int offset, int len) throws IOException { + return sendmmsg(fd, Socket.isIPv6Preferred(), msgs, offset, len); + } + + static int sendmmsg(int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, + int offset, int len) throws IOException { + int res = sendmmsg0(fd, ipv6, msgs, offset, len); if (res >= 0) { return res; } @@ -162,7 +168,7 @@ public final class Native { } private static native int sendmmsg0( - int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len); + int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len); // epoll_event related public static native int sizeofEpollEvent(); diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramMulticastIPv6Test.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramMulticastIPv6Test.java new file mode 100644 index 0000000000..9497078b13 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramMulticastIPv6Test.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.DatagramMulticastIPv6Test; + +import java.util.List; + +public class EpollDatagramMulticastIPv6Test extends DatagramMulticastIPv6Test { + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily()); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramMulticastTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramMulticastTest.java new file mode 100644 index 0000000000..e3ae768ba1 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramMulticastTest.java @@ -0,0 +1,29 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.DatagramMulticastTest; + +import java.util.List; + +public class EpollDatagramMulticastTest extends DatagramMulticastTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily()); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java index 2ba1b8f13a..9c5a7ed4a3 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java @@ -115,7 +115,17 @@ class EpollSocketTestPermutation extends SocketTestPermutation { return NioDatagramChannel.class.getSimpleName() + ".class"; } }), - () -> new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollDatagramChannel.class) + () -> new Bootstrap().group(EPOLL_WORKER_GROUP).channelFactory(new ChannelFactory() { + @Override + public Channel newChannel(EventLoop eventLoop) { + return new EpollDatagramChannel(eventLoop, family); + } + + @Override + public String toString() { + return InternetProtocolFamily.class.getSimpleName() + ".class"; + } + }) ); return combo(bfs, bfs); } diff --git a/transport-native-unix-common/src/main/c/netty_unix_socket.c b/transport-native-unix-common/src/main/c/netty_unix_socket.c index b3133d672b..d1d850a86e 100644 --- a/transport-native-unix-common/src/main/c/netty_unix_socket.c +++ b/transport-native-unix-common/src/main/c/netty_unix_socket.c @@ -190,6 +190,22 @@ static void netty_unix_socket_initialize(JNIEnv* env, jclass clazz, jboolean ipv } } +static jboolean netty_unix_socket_isIPv6Preferred(JNIEnv* env, jclass clazz) { + return socketType == AF_INET6; +} + + +static jboolean netty_unix_socket_isIPv6(JNIEnv* env, jclass clazz, jint fd) { + struct sockaddr_storage addr; + socklen_t addrlen = sizeof(addr); + if (getsockname(fd, (struct sockaddr*) &addr, &addrlen) == 0) { + return ((struct sockaddr*) &addr)->sa_family == AF_INET6; + } + + netty_unix_errors_throwChannelExceptionErrorNo(env, "getsockname(...) failed: ", errno); + return JNI_FALSE; +} + static void netty_unix_socket_optionHandleError(JNIEnv* env, int err, char* method) { if (err == EBADF) { netty_unix_errors_throwClosedChannelException(env); @@ -206,11 +222,11 @@ static int netty_unix_socket_setOption0(jint fd, int level, int optname, const v return setsockopt(fd, level, optname, optval, len); } -static jint _socket(JNIEnv* env, jclass clazz, int type) { - int fd = nettyNonBlockingSocket(socketType, type, 0); +static jint _socket(JNIEnv* env, jclass clazz, int domain, int type) { + int fd = nettyNonBlockingSocket(domain, type, 0); if (fd == -1) { return -errno; - } else if (socketType == AF_INET6) { + } else if (domain == AF_INET6) { // Try to allow listen /connect ipv4 and ipv6 int optval = 0; if (netty_unix_socket_setOption0(fd, IPPROTO_IPV6, IPV6_V6ONLY, &optval, sizeof(optval)) < 0) { @@ -229,7 +245,7 @@ static jint _socket(JNIEnv* env, jclass clazz, int type) { return fd; } -int netty_unix_socket_initSockaddr(JNIEnv* env, jbyteArray address, jint scopeId, jint jport, +int netty_unix_socket_initSockaddr(JNIEnv* env, jboolean ipv6, jbyteArray address, jint scopeId, jint jport, const struct sockaddr_storage* addr, socklen_t* addrSize) { uint16_t port = htons((uint16_t) jport); // We use 16 bytes as this allows us to fit ipv6, ipv4 and ipv4 mapped ipv6 addresses in the array. @@ -251,7 +267,7 @@ int netty_unix_socket_initSockaddr(JNIEnv* env, jbyteArray address, jint scopeId // https://bugs.openjdk.java.net/browse/JDK-8057586 (*env)->GetByteArrayRegion(env, address, 0, len, addressBytes); - if (socketType == AF_INET6) { + if (ipv6 == JNI_TRUE) { struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr; *addrSize = sizeof(struct sockaddr_in6); ip6addr->sin6_family = AF_INET6; @@ -274,10 +290,10 @@ int netty_unix_socket_initSockaddr(JNIEnv* env, jbyteArray address, jint scopeId return 0; } -static jint _sendTo(JNIEnv* env, jint fd, void* buffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) { +static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) { struct sockaddr_storage addr; socklen_t addrSize; - if (netty_unix_socket_initSockaddr(env, address, scopeId, port, &addr, &addrSize) == -1) { + if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) { return -1; } @@ -412,10 +428,10 @@ static jint netty_unix_socket_shutdown(JNIEnv* env, jclass clazz, jint fd, jbool return 0; } -static jint netty_unix_socket_bind(JNIEnv* env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) { +static jint netty_unix_socket_bind(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray address, jint scopeId, jint port) { struct sockaddr_storage addr; socklen_t addrSize; - if (netty_unix_socket_initSockaddr(env, address, scopeId, port, &addr, &addrSize) == -1) { + if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) { return -1; } @@ -432,10 +448,10 @@ static jint netty_unix_socket_listen(JNIEnv* env, jclass clazz, jint fd, jint ba return 0; } -static jint netty_unix_socket_connect(JNIEnv* env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) { +static jint netty_unix_socket_connect(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray address, jint scopeId, jint port) { struct sockaddr_storage addr; socklen_t addrSize; - if (netty_unix_socket_initSockaddr(env, address, scopeId, port, &addr, &addrSize) == -1) { + if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) { // A runtime exception was thrown return -1; } @@ -470,7 +486,7 @@ static jint netty_unix_socket_finishConnect(JNIEnv* env, jclass clazz, jint fd) return -optval; } -static jint netty_unix_socket_disconnect(JNIEnv* env, jclass clazz, jint fd) { +static jint netty_unix_socket_disconnect(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6) { struct sockaddr_storage addr; int len; @@ -478,7 +494,7 @@ static jint netty_unix_socket_disconnect(JNIEnv* env, jclass clazz, jint fd) { // You can disconnect connection-less sockets by using AF_UNSPEC. // See man 2 connect. - if (socketType == AF_INET6) { + if (ipv6 == JNI_TRUE) { struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) &addr; ip6addr->sin6_family = AF_UNSPEC; len = sizeof(struct sockaddr_in6); @@ -562,12 +578,14 @@ static jbyteArray netty_unix_socket_localAddress(JNIEnv* env, jclass clazz, jint return createInetSocketAddressArray(env, &addr); } -static jint netty_unix_socket_newSocketDgramFd(JNIEnv* env, jclass clazz) { - return _socket(env, clazz, SOCK_DGRAM); +static jint netty_unix_socket_newSocketDgramFd(JNIEnv* env, jclass clazz, jboolean ipv6) { + int domain = ipv6 == JNI_TRUE ? AF_INET6 : AF_INET; + return _socket(env, clazz, domain, SOCK_DGRAM); } -static jint netty_unix_socket_newSocketStreamFd(JNIEnv* env, jclass clazz) { - return _socket(env, clazz, SOCK_STREAM); +static jint netty_unix_socket_newSocketStreamFd(JNIEnv* env, jclass clazz, jboolean ipv6) { + int domain = ipv6 == JNI_TRUE ? AF_INET6 : AF_INET; + return _socket(env, clazz, domain, SOCK_STREAM); } static jint netty_unix_socket_newSocketDomainFd(JNIEnv* env, jclass clazz) { @@ -578,19 +596,19 @@ static jint netty_unix_socket_newSocketDomainFd(JNIEnv* env, jclass clazz) { return fd; } -static jint netty_unix_socket_sendTo(JNIEnv* env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) { +static jint netty_unix_socket_sendTo(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) { // We check that GetDirectBufferAddress will not return NULL in OnLoad - return _sendTo(env, fd, (*env)->GetDirectBufferAddress(env, jbuffer), pos, limit, address, scopeId, port); + return _sendTo(env, fd, ipv6, (*env)->GetDirectBufferAddress(env, jbuffer), pos, limit, address, scopeId, port); } -static jint netty_unix_socket_sendToAddress(JNIEnv* env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) { - return _sendTo(env, fd, (void *) (intptr_t) memoryAddress, pos, limit, address, scopeId, port); +static jint netty_unix_socket_sendToAddress(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) { + return _sendTo(env, fd, ipv6, (void *) (intptr_t) memoryAddress, pos, limit, address, scopeId, port); } -static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port) { +static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port) { struct sockaddr_storage addr; socklen_t addrSize; - if (netty_unix_socket_initSockaddr(env, address, scopeId, port, &addr, &addrSize) == -1) { + if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) { return -1; } @@ -800,8 +818,8 @@ static void netty_unix_socket_setSoLinger(JNIEnv* env, jclass clazz, jint fd, ji netty_unix_socket_setOption(env, fd, SOL_SOCKET, SO_LINGER, &solinger, sizeof(solinger)); } -static void netty_unix_socket_setTrafficClass(JNIEnv* env, jclass clazz, jint fd, jint optval) { - if (socketType == AF_INET6) { +static void netty_unix_socket_setTrafficClass(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jint optval) { + if (ipv6 == JNI_TRUE) { // This call will put an exception on the stack to be processed once the JNI calls completes if // setsockopt failed and return a negative value. int rc = netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_TCLASS, &optval, sizeof(optval)); @@ -867,9 +885,9 @@ static jint netty_unix_socket_getSoLinger(JNIEnv* env, jclass clazz, jint fd) { } } -static jint netty_unix_socket_getTrafficClass(JNIEnv* env, jclass clazz, jint fd) { +static jint netty_unix_socket_getTrafficClass(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6) { int optval; - if (socketType == AF_INET6) { + if (ipv6 == JNI_TRUE) { if (netty_unix_socket_getOption0(fd, IPPROTO_IPV6, IPV6_TCLASS, &optval, sizeof(optval)) == -1) { if (errno == ENOPROTOOPT) { if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval)) == -1) { @@ -926,20 +944,20 @@ static jint netty_unix_socket_isBroadcast(JNIEnv* env, jclass clazz, jint fd) { // JNI Method Registration Table Begin static const JNINativeMethod fixed_method_table[] = { { "shutdown", "(IZZ)I", (void *) netty_unix_socket_shutdown }, - { "bind", "(I[BII)I", (void *) netty_unix_socket_bind }, + { "bind", "(IZ[BII)I", (void *) netty_unix_socket_bind }, { "listen", "(II)I", (void *) netty_unix_socket_listen }, - { "connect", "(I[BII)I", (void *) netty_unix_socket_connect }, + { "connect", "(IZ[BII)I", (void *) netty_unix_socket_connect }, { "finishConnect", "(I)I", (void *) netty_unix_socket_finishConnect }, - { "disconnect", "(I)I", (void *) netty_unix_socket_disconnect}, + { "disconnect", "(IZ)I", (void *) netty_unix_socket_disconnect}, { "accept", "(I[B)I", (void *) netty_unix_socket_accept }, { "remoteAddress", "(I)[B", (void *) netty_unix_socket_remoteAddress }, { "localAddress", "(I)[B", (void *) netty_unix_socket_localAddress }, - { "newSocketDgramFd", "()I", (void *) netty_unix_socket_newSocketDgramFd }, - { "newSocketStreamFd", "()I", (void *) netty_unix_socket_newSocketStreamFd }, + { "newSocketDgramFd", "(Z)I", (void *) netty_unix_socket_newSocketDgramFd }, + { "newSocketStreamFd", "(Z)I", (void *) netty_unix_socket_newSocketStreamFd }, { "newSocketDomainFd", "()I", (void *) netty_unix_socket_newSocketDomainFd }, - { "sendTo", "(ILjava/nio/ByteBuffer;II[BII)I", (void *) netty_unix_socket_sendTo }, - { "sendToAddress", "(IJII[BII)I", (void *) netty_unix_socket_sendToAddress }, - { "sendToAddresses", "(IJI[BII)I", (void *) netty_unix_socket_sendToAddresses }, + { "sendTo", "(IZLjava/nio/ByteBuffer;II[BII)I", (void *) netty_unix_socket_sendTo }, + { "sendToAddress", "(IZJII[BII)I", (void *) netty_unix_socket_sendToAddress }, + { "sendToAddresses", "(IZJI[BII)I", (void *) netty_unix_socket_sendToAddresses }, // "recvFrom" has a dynamic signature // "recvFromAddress" has a dynamic signature { "recvFd", "(I)I", (void *) netty_unix_socket_recvFd }, @@ -954,7 +972,7 @@ static const JNINativeMethod fixed_method_table[] = { { "setSendBufferSize", "(II)V", (void *) netty_unix_socket_setSendBufferSize }, { "setKeepAlive", "(II)V", (void *) netty_unix_socket_setKeepAlive }, { "setSoLinger", "(II)V", (void *) netty_unix_socket_setSoLinger }, - { "setTrafficClass", "(II)V", (void *) netty_unix_socket_setTrafficClass }, + { "setTrafficClass", "(IZI)V", (void *) netty_unix_socket_setTrafficClass }, { "isKeepAlive", "(I)I", (void *) netty_unix_socket_isKeepAlive }, { "isTcpNoDelay", "(I)I", (void *) netty_unix_socket_isTcpNoDelay }, { "isBroadcast", "(I)I", (void *) netty_unix_socket_isBroadcast }, @@ -963,9 +981,11 @@ static const JNINativeMethod fixed_method_table[] = { { "getReceiveBufferSize", "(I)I", (void *) netty_unix_socket_getReceiveBufferSize }, { "getSendBufferSize", "(I)I", (void *) netty_unix_socket_getSendBufferSize }, { "getSoLinger", "(I)I", (void *) netty_unix_socket_getSoLinger }, - { "getTrafficClass", "(I)I", (void *) netty_unix_socket_getTrafficClass }, + { "getTrafficClass", "(IZ)I", (void *) netty_unix_socket_getTrafficClass }, { "getSoError", "(I)I", (void *) netty_unix_socket_getSoError }, - { "initialize", "(Z)V", (void *) netty_unix_socket_initialize } + { "initialize", "(Z)V", (void *) netty_unix_socket_initialize }, + { "isIPv6Preferred", "()Z", (void *) netty_unix_socket_isIPv6Preferred }, + { "isIPv6", "(I)Z", (void *) netty_unix_socket_isIPv6 } }; static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); diff --git a/transport-native-unix-common/src/main/c/netty_unix_socket.h b/transport-native-unix-common/src/main/c/netty_unix_socket.h index 4c60f6d8ce..88a03b2d16 100644 --- a/transport-native-unix-common/src/main/c/netty_unix_socket.h +++ b/transport-native-unix-common/src/main/c/netty_unix_socket.h @@ -20,7 +20,7 @@ #include // External C methods -int netty_unix_socket_initSockaddr(JNIEnv* env, jbyteArray address, jint scopeId, jint jport, const struct sockaddr_storage* addr, socklen_t* addrSize); +int netty_unix_socket_initSockaddr(JNIEnv* env, jboolean ipv6, jbyteArray address, jint scopeId, jint jport, const struct sockaddr_storage* addr, socklen_t* addrSize); int netty_unix_socket_getOption(JNIEnv* env, jint fd, int level, int optname, void* optval, socklen_t optlen); int netty_unix_socket_setOption(JNIEnv* env, jint fd, int level, int optname, const void* optval, socklen_t len); diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java index c4ae46ee20..10a68d4b34 100644 --- a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java @@ -48,8 +48,11 @@ public class Socket extends FileDescriptor { public static final int UDS_SUN_PATH_SIZE = udsSunPathSize(); + protected final boolean ipv6; + public Socket(int fd) { super(fd); + this.ipv6 = isIPv6(fd); } public final void shutdown() throws IOException { @@ -114,7 +117,7 @@ public class Socket extends FileDescriptor { scopeId = 0; address = ipv4MappedIpv6Address(addr.getAddress()); } - int res = sendTo(fd, buf, pos, limit, address, scopeId, port); + int res = sendTo(fd, ipv6, buf, pos, limit, address, scopeId, port); if (res >= 0) { return res; } @@ -138,7 +141,7 @@ public class Socket extends FileDescriptor { scopeId = 0; address = ipv4MappedIpv6Address(addr.getAddress()); } - int res = sendToAddress(fd, memoryAddress, pos, limit, address, scopeId, port); + int res = sendToAddress(fd, ipv6, memoryAddress, pos, limit, address, scopeId, port); if (res >= 0) { return res; } @@ -161,7 +164,7 @@ public class Socket extends FileDescriptor { scopeId = 0; address = ipv4MappedIpv6Address(addr.getAddress()); } - int res = sendToAddresses(fd, memoryAddress, length, address, scopeId, port); + int res = sendToAddresses(fd, ipv6, memoryAddress, length, address, scopeId, port); if (res >= 0) { return res; } @@ -213,7 +216,7 @@ public class Socket extends FileDescriptor { if (socketAddress instanceof InetSocketAddress) { InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; NativeInetAddress address = NativeInetAddress.newInstance(inetSocketAddress.getAddress()); - res = connect(fd, address.address, address.scopeId, inetSocketAddress.getPort()); + res = connect(fd, ipv6, address.address, address.scopeId, inetSocketAddress.getPort()); } else if (socketAddress instanceof DomainSocketAddress) { DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) socketAddress; res = connectDomainSocket(fd, unixDomainSocketAddress.path().getBytes(CharsetUtil.UTF_8)); @@ -243,7 +246,7 @@ public class Socket extends FileDescriptor { } public final void disconnect() throws IOException { - int res = disconnect(fd); + int res = disconnect(fd, ipv6); if (res < 0) { throwConnectException("disconnect", res); } @@ -253,7 +256,7 @@ public class Socket extends FileDescriptor { if (socketAddress instanceof InetSocketAddress) { InetSocketAddress addr = (InetSocketAddress) socketAddress; NativeInetAddress address = NativeInetAddress.newInstance(addr.getAddress()); - int res = bind(fd, address.address, address.scopeId, addr.getPort()); + int res = bind(fd, ipv6, address.address, address.scopeId, addr.getPort()); if (res < 0) { throw newIOException("bind", res); } @@ -338,7 +341,7 @@ public class Socket extends FileDescriptor { } public final int getTrafficClass() throws IOException { - return getTrafficClass(fd); + return getTrafficClass(fd, ipv6); } public final void setKeepAlive(boolean keepAlive) throws IOException { @@ -374,9 +377,13 @@ public class Socket extends FileDescriptor { } public final void setTrafficClass(int trafficClass) throws IOException { - setTrafficClass(fd, trafficClass); + setTrafficClass(fd, ipv6, trafficClass); } + public static native boolean isIPv6Preferred(); + + private static native boolean isIPv6(int fd); + @Override public String toString() { return "Socket{" + @@ -405,7 +412,11 @@ public class Socket extends FileDescriptor { } protected static int newSocketStream0() { - int res = newSocketStreamFd(); + return newSocketStream0(isIPv6Preferred()); + } + + protected static int newSocketStream0(boolean ipv6) { + int res = newSocketStreamFd(ipv6); if (res < 0) { throw new ChannelException(newIOException("newSocketStream", res)); } @@ -413,7 +424,11 @@ public class Socket extends FileDescriptor { } protected static int newSocketDgram0() { - int res = newSocketDgramFd(); + return newSocketDgram0(isIPv6Preferred()); + } + + protected static int newSocketDgram0(boolean ipv6) { + int res = newSocketDgramFd(ipv6); if (res < 0) { throw new ChannelException(newIOException("newSocketDgram", res)); } @@ -429,11 +444,11 @@ public class Socket extends FileDescriptor { } private static native int shutdown(int fd, boolean read, boolean write); - private static native int connect(int fd, byte[] address, int scopeId, int port); + private static native int connect(int fd, boolean ipv6, byte[] address, int scopeId, int port); private static native int connectDomainSocket(int fd, byte[] path); private static native int finishConnect(int fd); - private static native int disconnect(int fd); - private static native int bind(int fd, byte[] address, int scopeId, int port); + private static native int disconnect(int fd, boolean ipv6); + private static native int bind(int fd, boolean ipv6, byte[] address, int scopeId, int port); private static native int bindDomainSocket(int fd, byte[] path); private static native int listen(int fd, int backlog); private static native int accept(int fd, byte[] addr); @@ -442,11 +457,11 @@ public class Socket extends FileDescriptor { private static native byte[] localAddress(int fd); private static native int sendTo( - int fd, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port); + int fd, boolean ipv6, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port); private static native int sendToAddress( - int fd, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port); + int fd, boolean ipv6, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port); private static native int sendToAddresses( - int fd, long memoryAddress, int length, byte[] address, int scopeId, int port); + int fd, boolean ipv6, long memoryAddress, int length, byte[] address, int scopeId, int port); private static native DatagramSocketAddress recvFrom( int fd, ByteBuffer buf, int pos, int limit) throws IOException; @@ -455,8 +470,8 @@ public class Socket extends FileDescriptor { private static native int recvFd(int fd); private static native int sendFd(int socketFd, int fd); - private static native int newSocketStreamFd(); - private static native int newSocketDgramFd(); + private static native int newSocketStreamFd(boolean ipv6); + private static native int newSocketDgramFd(boolean ipv6); private static native int newSocketDomainFd(); private static native int isReuseAddress(int fd) throws IOException; @@ -468,7 +483,7 @@ public class Socket extends FileDescriptor { private static native int isBroadcast(int fd) throws IOException; private static native int getSoLinger(int fd) throws IOException; private static native int getSoError(int fd) throws IOException; - private static native int getTrafficClass(int fd) throws IOException; + private static native int getTrafficClass(int fd, boolean ipv6) throws IOException; private static native void setReuseAddress(int fd, int reuseAddress) throws IOException; private static native void setReusePort(int fd, int reuseAddress) throws IOException; @@ -478,6 +493,6 @@ public class Socket extends FileDescriptor { private static native void setTcpNoDelay(int fd, int tcpNoDelay) throws IOException; private static native void setSoLinger(int fd, int soLinger) throws IOException; private static native void setBroadcast(int fd, int broadcast) throws IOException; - private static native void setTrafficClass(int fd, int trafficClass) throws IOException; + private static native void setTrafficClass(int fd, boolean ipv6, int trafficClass) throws IOException; private static native void initialize(boolean ipv4Preferred); }