Added UDP multicast (with caveats: getInterface, getNetworkInterface, block or loopback-mode-disabled operations).

Motivation:

Provide epoll/native multicast to support high load multicast users (we are using it for a high load telecomm app at my day job).

Modification:

Added support for source specific and any source multicast for epoll transport. Some caveats: no support for disabling loop back mode, retrieval of interface and block operation, all of which tend to be less frequently used.

Result:

Provides epoll transport multicast for common use cases.

Co-authored-by: Norman Maurer <norman_maurer@apple.com>
This commit is contained in:
Steve Buzzard 2019-04-08 14:13:39 -04:00 committed by Norman Maurer
parent 137a3e7137
commit 70731bfa7e
14 changed files with 634 additions and 96 deletions

View File

@ -35,7 +35,7 @@ public abstract class AbstractDatagramTest extends AbstractComboTestsuiteTest<Bo
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
return SocketTestPermutation.INSTANCE.datagram(InternetProtocolFamily.IPv4);
return SocketTestPermutation.INSTANCE.datagram(internetProtocolFamily());
}
@Override

View File

@ -26,20 +26,15 @@ import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.oio.OioDatagramChannel;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.util.NetUtil;
import io.netty.util.internal.SocketUtils;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Enumeration;
import java.util.List;
@ -73,6 +68,7 @@ public class DatagramMulticastTest extends AbstractDatagramTest {
sb.option(ChannelOption.IP_MULTICAST_IF, iface);
sb.option(ChannelOption.SO_REUSEADDR, true);
cb.option(ChannelOption.IP_MULTICAST_IF, iface);
cb.option(ChannelOption.SO_REUSEADDR, true);

View File

@ -64,6 +64,32 @@ static jfieldID fdFieldId = NULL;
static jfieldID fileDescriptorFieldId = NULL;
// JNI Registered Methods Begin
static void netty_epoll_linuxsocket_setTimeToLive(JNIEnv* env, jclass clazz, jint fd, jint optval) {
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval));
}
static void netty_epoll_linuxsocket_setInterface(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex) {
struct sockaddr_storage interfaceAddr;
socklen_t interfaceAddrSize;
struct sockaddr_in* interfaceIpAddr;
if (ipv6 == JNI_TRUE) {
if (interfaceIndex == -1) {
netty_unix_errors_throwIOException(env, "Unable to find network index");
return;
}
netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_MULTICAST_IF, &interfaceIndex, sizeof(interfaceIndex));
} else {
if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) {
netty_unix_errors_throwIOException(env, "Could not init sockaddr");
return;
}
interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr;
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_MULTICAST_IF, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr));
}
}
static void netty_epoll_linuxsocket_setTcpCork(JNIEnv* env, jclass clazz, jint fd, jint optval) {
netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
}
@ -120,10 +146,217 @@ static void netty_epoll_linuxsocket_setSoBusyPoll(JNIEnv* env, jclass clazz, jin
netty_unix_socket_setOption(env, fd, SOL_SOCKET, SO_BUSY_POLL, &optval, sizeof(optval));
}
static void netty_epoll_linuxsocket_setTcpMd5Sig(JNIEnv* env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jbyteArray key) {
static void netty_epoll_linuxsocket_joinGroup(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex) {
struct sockaddr_storage groupAddr;
socklen_t groupAddrSize;
struct sockaddr_storage interfaceAddr;
socklen_t interfaceAddrSize;
struct sockaddr_in* groupIpAddr;
struct sockaddr_in* interfaceIpAddr;
struct ip_mreq mreq;
struct sockaddr_in6* groupIp6Addr;
struct ipv6_mreq mreq6;
if (netty_unix_socket_initSockaddr(env, ipv6, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) {
netty_unix_errors_throwIOException(env, "Could not init sockaddr for groupAddress");
return;
}
switch (groupAddr.ss_family) {
case AF_INET:
if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) {
netty_unix_errors_throwIOException(env, "Could not init sockaddr for interfaceAddr");
return;
}
interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr;
groupIpAddr = (struct sockaddr_in*) &groupAddr;
memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr));
memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr));
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
break;
case AF_INET6:
if (interfaceIndex == -1) {
netty_unix_errors_throwIOException(env, "Unable to find network index");
return;
}
mreq6.ipv6mr_interface = interfaceIndex;
groupIp6Addr = (struct sockaddr_in6*) &groupAddr;
memcpy(&mreq6.ipv6mr_multiaddr, &groupIp6Addr->sin6_addr, sizeof(groupIp6Addr->sin6_addr));
netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mreq6, sizeof(mreq6));
break;
default:
netty_unix_errors_throwIOException(env, "Address family not supported");
break;
}
}
static void netty_epoll_linuxsocket_joinSsmGroup(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex, jbyteArray sourceAddress) {
struct sockaddr_storage groupAddr;
socklen_t groupAddrSize;
struct sockaddr_storage interfaceAddr;
socklen_t interfaceAddrSize;
struct sockaddr_storage sourceAddr;
socklen_t sourceAddrSize;
struct sockaddr_in* groupIpAddr;
struct sockaddr_in* interfaceIpAddr;
struct sockaddr_in* sourceIpAddr;
struct ip_mreq_source mreq;
struct group_source_req mreq6;
if (netty_unix_socket_initSockaddr(env, ipv6, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) {
netty_unix_errors_throwIOException(env, "Could not init sockaddr for groupAddress");
return;
}
if (netty_unix_socket_initSockaddr(env, ipv6, sourceAddress, scopeId, 0, &sourceAddr, &sourceAddrSize) == -1) {
netty_unix_errors_throwIOException(env, "Could not init sockaddr for sourceAddress");
return;
}
switch (groupAddr.ss_family) {
case AF_INET:
if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) {
netty_unix_errors_throwIOException(env, "Could not init sockaddr for interfaceAddress");
return;
}
interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr;
groupIpAddr = (struct sockaddr_in*) &groupAddr;
sourceIpAddr = (struct sockaddr_in*) &sourceAddr;
memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr));
memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr));
memcpy(&mreq.imr_sourceaddr, &sourceIpAddr->sin_addr, sizeof(sourceIpAddr->sin_addr));
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq));
break;
case AF_INET6:
if (interfaceIndex == -1) {
netty_unix_errors_throwIOException(env, "Unable to find network index");
return;
}
mreq6.gsr_group = groupAddr;
mreq6.gsr_interface = interfaceIndex;
mreq6.gsr_source = sourceAddr;
netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, MCAST_JOIN_SOURCE_GROUP, &mreq6, sizeof(mreq6));
break;
default:
netty_unix_errors_throwIOException(env, "Address family not supported");
break;
}
}
static void netty_epoll_linuxsocket_leaveGroup(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex) {
struct sockaddr_storage groupAddr;
socklen_t groupAddrSize;
struct sockaddr_storage interfaceAddr;
socklen_t interfaceAddrSize;
struct sockaddr_in* groupIpAddr;
struct sockaddr_in* interfaceIpAddr;
struct ip_mreq mreq;
struct sockaddr_in6* groupIp6Addr;
struct ipv6_mreq mreq6;
if (netty_unix_socket_initSockaddr(env, ipv6, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) {
netty_unix_errors_throwIOException(env, "Could not init sockaddr for groupAddress");
return;
}
switch (groupAddr.ss_family) {
case AF_INET:
if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) {
netty_unix_errors_throwIOException(env, "Could not init sockaddr for interfaceAddress");
return;
}
interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr;
groupIpAddr = (struct sockaddr_in*) &groupAddr;
memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr));
memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr));
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mreq, sizeof(mreq));
break;
case AF_INET6:
if (interfaceIndex == -1) {
netty_unix_errors_throwIOException(env, "Unable to find network index");
return;
}
mreq6.ipv6mr_interface = interfaceIndex;
groupIp6Addr = (struct sockaddr_in6*) &groupAddr;
memcpy(&mreq6.ipv6mr_multiaddr, &groupIp6Addr->sin6_addr, sizeof(groupIp6Addr->sin6_addr));
netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_LEAVE_GROUP, &mreq6, sizeof(mreq6));
break;
default:
netty_unix_errors_throwIOException(env, "Address family not supported");
break;
}
}
static void netty_epoll_linuxsocket_leaveSsmGroup(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex, jbyteArray sourceAddress) {
struct sockaddr_storage groupAddr;
socklen_t groupAddrSize;
struct sockaddr_storage interfaceAddr;
socklen_t interfaceAddrSize;
struct sockaddr_storage sourceAddr;
socklen_t sourceAddrSize;
struct sockaddr_in* groupIpAddr;
struct sockaddr_in* interfaceIpAddr;
struct sockaddr_in* sourceIpAddr;
struct ip_mreq_source mreq;
struct group_source_req mreq6;
if (netty_unix_socket_initSockaddr(env, ipv6, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) {
netty_unix_errors_throwIOException(env, "Could not init sockaddr for groupAddress");
return;
}
if (netty_unix_socket_initSockaddr(env, ipv6, sourceAddress, scopeId, 0, &sourceAddr, &sourceAddrSize) == -1) {
netty_unix_errors_throwIOException(env, "Could not init sockaddr for sourceAddress");
return;
}
switch (groupAddr.ss_family) {
case AF_INET:
if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) {
netty_unix_errors_throwIOException(env, "Could not init sockaddr for interfaceAddress");
return;
}
interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr;
groupIpAddr = (struct sockaddr_in*) &groupAddr;
sourceIpAddr = (struct sockaddr_in*) &sourceAddr;
memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr));
memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr));
memcpy(&mreq.imr_sourceaddr, &sourceIpAddr->sin_addr, sizeof(sourceIpAddr->sin_addr));
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_DROP_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq));
break;
case AF_INET6:
if (interfaceIndex == -1) {
netty_unix_errors_throwIOException(env, "Unable to find network index");
return;
}
mreq6.gsr_group = groupAddr;
mreq6.gsr_interface = interfaceIndex;
mreq6.gsr_source = sourceAddr;
netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, MCAST_LEAVE_SOURCE_GROUP, &mreq6, sizeof(mreq6));
break;
default:
netty_unix_errors_throwIOException(env, "Address family not supported");
break;
}
}
static void netty_epoll_linuxsocket_setTcpMd5Sig(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray address, jint scopeId, jbyteArray key) {
struct sockaddr_storage addr;
socklen_t addrSize;
if (netty_unix_socket_initSockaddr(env, address, scopeId, 0, &addr, &addrSize) == -1) {
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, 0, &addr, &addrSize) == -1) {
netty_unix_errors_throwIOException(env, "Could not init sockaddr");
return;
}
@ -159,6 +392,14 @@ static void netty_epoll_linuxsocket_setTcpMd5Sig(JNIEnv* env, jclass clazz, jint
}
}
static jint netty_epoll_linuxsocket_getTimeToLive(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}
static jint netty_epoll_linuxsocket_getTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)) == -1) {
@ -358,6 +599,9 @@ static jlong netty_epoll_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd
// JNI Method Registration Table Begin
static const JNINativeMethod fixed_method_table[] = {
{ "setTimeToLive", "(II)V", (void *) netty_epoll_linuxsocket_setTimeToLive },
{ "getTimeToLive", "(I)I", (void *) netty_epoll_linuxsocket_getTimeToLive },
{ "setInterface", "(IZ[BII)V", (void *) netty_epoll_linuxsocket_setInterface },
{ "setTcpCork", "(II)V", (void *) netty_epoll_linuxsocket_setTcpCork },
{ "setSoBusyPoll", "(II)V", (void *) netty_epoll_linuxsocket_setSoBusyPoll },
{ "setTcpQuickAck", "(II)V", (void *) netty_epoll_linuxsocket_setTcpQuickAck },
@ -386,7 +630,11 @@ static const JNINativeMethod fixed_method_table[] = {
{ "isIpTransparent", "(I)I", (void *) netty_epoll_linuxsocket_isIpTransparent },
{ "isIpRecvOrigDestAddr", "(I)I", (void *) netty_epoll_linuxsocket_isIpRecvOrigDestAddr },
{ "getTcpInfo", "(I[J)V", (void *) netty_epoll_linuxsocket_getTcpInfo },
{ "setTcpMd5Sig", "(I[BI[B)V", (void *) netty_epoll_linuxsocket_setTcpMd5Sig }
{ "setTcpMd5Sig", "(IZ[BI[B)V", (void *) netty_epoll_linuxsocket_setTcpMd5Sig },
{ "joinGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_joinGroup },
{ "joinSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_joinSsmGroup },
{ "leaveGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_leaveGroup },
{ "leaveSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_leaveSsmGroup }
// "sendFile" has a dynamic signature
};

View File

@ -265,7 +265,7 @@ static jint netty_epoll_native_epollCtlDel0(JNIEnv* env, jclass clazz, jint efd,
return res;
}
static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jobjectArray packets, jint offset, jint len) {
static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobjectArray packets, jint offset, jint len) {
struct mmsghdr msg[len];
struct sockaddr_storage addr[len];
socklen_t addrSize;
@ -280,7 +280,7 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, job
jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId);
jint port = (*env)->GetIntField(env, packet, packetPortFieldId);
if (netty_unix_socket_initSockaddr(env, address, scopeId, port, &addr[i], &addrSize) == -1) {
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr[i], &addrSize) == -1) {
return -1;
}
@ -435,7 +435,7 @@ static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) {
char* dynamicTypeName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/epoll/NativeDatagramPacketArray$NativeDatagramPacket;II)I");
JNINativeMethod* dynamicMethod = &dynamicMethods[fixed_method_table_size];
dynamicMethod->name = "sendmmsg0";
dynamicMethod->signature = netty_unix_util_prepend("(I[L", dynamicTypeName);
dynamicMethod->signature = netty_unix_util_prepend("(IZ[L", dynamicTypeName);
dynamicMethod->fnPtr = (void *) netty_epoll_native_sendmmsg0;
free(dynamicTypeName);
return dynamicMethods;

View File

@ -27,19 +27,21 @@ import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.unix.DatagramSocketAddress;
import io.netty.channel.unix.Errors;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.Socket;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.PortUnreachableException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
@ -57,20 +59,47 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
StringUtil.simpleClassName(ByteBuf.class) + ')';
final InternetProtocolFamily family;
private final EpollDatagramChannelConfig config;
private volatile boolean connected;
/**
* Create a new instance which selects the {@link InternetProtocolFamily} to use depending
* on the Operation Systems default which will be chosen.
*/
public EpollDatagramChannel() {
super(newSocketDgram());
this(null);
}
/**
* Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
* on the Operation Systems default which will be chosen.
*/
public EpollDatagramChannel(InternetProtocolFamily family) {
super(family == null ?
newSocketDgram(Socket.isIPv6Preferred()) : newSocketDgram(family == InternetProtocolFamily.IPv6));
this.family = internetProtocolFamily(family);
config = new EpollDatagramChannelConfig(this);
}
public EpollDatagramChannel(int fd) {
this(new LinuxSocket(fd));
private static InternetProtocolFamily internetProtocolFamily(InternetProtocolFamily family) {
if (family == null) {
return Socket.isIPv6Preferred() ? InternetProtocolFamily.IPv6 : InternetProtocolFamily.IPv4;
}
return family;
}
EpollDatagramChannel(LinuxSocket fd) {
/**
* Create a new instance which selects the {@link InternetProtocolFamily} to use depending
* on the Operation Systems default which will be chosen.
*/
public EpollDatagramChannel(int fd) {
this(new LinuxSocket(fd), null);
}
private EpollDatagramChannel(LinuxSocket fd, InternetProtocolFamily family) {
super(null, fd, true);
this.family = internetProtocolFamily(family);
config = new EpollDatagramChannelConfig(this);
}
@ -111,7 +140,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
return joinGroup(
multicastAddress,
NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
} catch (SocketException e) {
} catch (IOException e) {
promise.setFailure(e);
}
return promise;
@ -149,7 +178,12 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
throw new NullPointerException("networkInterface");
}
promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
try {
socket.joinGroup(multicastAddress, networkInterface, source);
promise.setSuccess();
} catch (IOException e) {
promise.setFailure(e);
}
return promise;
}
@ -163,7 +197,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
try {
return leaveGroup(
multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
} catch (SocketException e) {
} catch (IOException e) {
promise.setFailure(e);
}
return promise;
@ -199,8 +233,12 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
throw new NullPointerException("networkInterface");
}
promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
try {
socket.leaveGroup(multicastAddress, networkInterface, source);
promise.setSuccess();
} catch (IOException e) {
promise.setFailure(e);
}
return promise;
}
@ -255,6 +293,13 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (localAddress instanceof InetSocketAddress) {
InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
if (socketAddress.getAddress().isAnyLocalAddress() &&
socketAddress.getAddress() instanceof Inet4Address && Socket.isIPv6Preferred()) {
localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
}
}
super.doBind(localAddress);
active = true;
}
@ -282,7 +327,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
while (cnt > 0) {
int send = Native.sendmmsg(socket.intValue(), packets, offset, cnt);
int send = socket.sendmmsg(packets, offset, cnt);
if (send == 0) {
// Did not write all messages.
setFlag(Native.EPOLLOUT);

View File

@ -326,12 +326,21 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
@Override
public int getTimeToLive() {
return -1;
try {
return ((EpollDatagramChannel) channel).socket.getTimeToLive();
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public EpollDatagramChannelConfig setTimeToLive(int ttl) {
throw new UnsupportedOperationException("Multicast not supported");
try {
((EpollDatagramChannel) channel).socket.setTimeToLive(ttl);
return this;
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
@ -341,7 +350,12 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
@Override
public EpollDatagramChannelConfig setInterface(InetAddress interfaceAddress) {
throw new UnsupportedOperationException("Multicast not supported");
try {
((EpollDatagramChannel) channel).socket.setInterface(interfaceAddress);
return this;
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
@ -351,7 +365,13 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
@Override
public EpollDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface) {
throw new UnsupportedOperationException("Multicast not supported");
try {
EpollDatagramChannel datagramChannel = (EpollDatagramChannel) channel;
datagramChannel.socket.setNetworkInterface(networkInterface, datagramChannel.family);
return this;
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override

View File

@ -15,13 +15,20 @@
*/
package io.netty.channel.epoll;
import io.netty.channel.ChannelException;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.unix.NativeInetAddress;
import io.netty.channel.unix.PeerCredentials;
import io.netty.channel.unix.Socket;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Inet6Address;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.Enumeration;
import static io.netty.channel.unix.Errors.ioResult;
@ -29,12 +36,77 @@ import static io.netty.channel.unix.Errors.ioResult;
* A socket which provides access Linux native methods.
*/
final class LinuxSocket extends Socket {
static final InetAddress INET6_ANY = unsafeInetAddrByName("::");
private static final InetAddress INET_ANY = unsafeInetAddrByName("0.0.0.0");
private static final long MAX_UINT32_T = 0xFFFFFFFFL;
LinuxSocket(int fd) {
super(fd);
}
int sendmmsg(NativeDatagramPacketArray.NativeDatagramPacket[] msgs,
int offset, int len) throws IOException {
return Native.sendmmsg(intValue(), ipv6, msgs, offset, len);
}
void setTimeToLive(int ttl) throws IOException {
setTimeToLive(intValue(), ttl);
}
void setInterface(InetAddress address) throws IOException {
final NativeInetAddress a = NativeInetAddress.newInstance(address);
setInterface(intValue(), ipv6, a.address(), a.scopeId(), interfaceIndex(address));
}
void setNetworkInterface(NetworkInterface netInterface, InternetProtocolFamily family) throws IOException {
InetAddress address = deriveInetAddress(netInterface, family == InternetProtocolFamily.IPv6);
if (address.equals(family == InternetProtocolFamily.IPv4 ? INET_ANY : INET6_ANY)) {
throw new IOException("NetworkInterface does not support " + family);
}
final NativeInetAddress nativeAddress = NativeInetAddress.newInstance(address);
setInterface(intValue(), ipv6, nativeAddress.address(), nativeAddress.scopeId(), interfaceIndex(netInterface));
}
void joinGroup(InetAddress group, NetworkInterface netInterface, InetAddress source) throws IOException {
final NativeInetAddress g = NativeInetAddress.newInstance(group);
final boolean isIpv6 = group instanceof Inet6Address;
final NativeInetAddress i = NativeInetAddress.newInstance(deriveInetAddress(netInterface, isIpv6));
if (source != null) {
final NativeInetAddress s = NativeInetAddress.newInstance(source);
joinSsmGroup(intValue(), ipv6, g.address(), i.address(),
g.scopeId(), interfaceIndex(netInterface), s.address());
} else {
joinGroup(intValue(), ipv6, g.address(), i.address(), g.scopeId(), interfaceIndex(netInterface));
}
}
void leaveGroup(InetAddress group, NetworkInterface netInterface, InetAddress source) throws IOException {
final NativeInetAddress g = NativeInetAddress.newInstance(group);
final boolean isIpv6 = group instanceof Inet6Address;
final NativeInetAddress i = NativeInetAddress.newInstance(deriveInetAddress(netInterface, isIpv6));
if (source != null) {
final NativeInetAddress s = NativeInetAddress.newInstance(source);
leaveSsmGroup(intValue(), ipv6, g.address(), i.address(),
g.scopeId(), interfaceIndex(netInterface), s.address());
} else {
leaveGroup(intValue(), ipv6, g.address(), i.address(), g.scopeId(), interfaceIndex(netInterface));
}
}
private static int interfaceIndex(NetworkInterface networkInterface) {
return PlatformDependent.javaVersion() >= 7 ? networkInterface.getIndex() : -1;
}
private static int interfaceIndex(InetAddress address) throws IOException {
if (PlatformDependent.javaVersion() >= 7) {
NetworkInterface iface = NetworkInterface.getByInetAddress(address);
if (iface != null) {
return iface.getIndex();
}
}
return -1;
}
void setTcpDeferAccept(int deferAccept) throws IOException {
setTcpDeferAccept(intValue(), deferAccept);
}
@ -98,13 +170,17 @@ final class LinuxSocket extends Socket {
setIpRecvOrigDestAddr(intValue(), enabled ? 1 : 0);
}
int getTimeToLive() throws IOException {
return getTimeToLive(intValue());
}
void getTcpInfo(EpollTcpInfo info) throws IOException {
getTcpInfo(intValue(), info.info);
}
void setTcpMd5Sig(InetAddress address, byte[] key) throws IOException {
final NativeInetAddress a = NativeInetAddress.newInstance(address);
setTcpMd5Sig(intValue(), a.address(), a.scopeId(), key);
setTcpMd5Sig(intValue(), ipv6, a.address(), a.scopeId(), key);
}
boolean isTcpCork() throws IOException {
@ -171,18 +247,57 @@ final class LinuxSocket extends Socket {
return ioResult("sendfile", (int) res);
}
private static InetAddress deriveInetAddress(NetworkInterface netInterface, boolean ipv6) {
final InetAddress ipAny = ipv6 ? INET6_ANY : INET_ANY;
if (netInterface != null) {
final Enumeration<InetAddress> ias = netInterface.getInetAddresses();
while (ias.hasMoreElements()) {
final InetAddress ia = ias.nextElement();
final boolean isV6 = ia instanceof Inet6Address;
if (isV6 == ipv6) {
return ia;
}
}
}
return ipAny;
}
public static LinuxSocket newSocketStream(boolean ipv6) {
return new LinuxSocket(newSocketStream0(ipv6));
}
public static LinuxSocket newSocketStream() {
return new LinuxSocket(newSocketStream0());
return newSocketStream(isIPv6Preferred());
}
public static LinuxSocket newSocketDgram(boolean ipv6) {
return new LinuxSocket(newSocketDgram0(ipv6));
}
public static LinuxSocket newSocketDgram() {
return new LinuxSocket(newSocketDgram0());
return newSocketDgram(isIPv6Preferred());
}
public static LinuxSocket newSocketDomain() {
return new LinuxSocket(newSocketDomain0());
}
private static InetAddress unsafeInetAddrByName(String inetName) {
try {
return InetAddress.getByName(inetName);
} catch (UnknownHostException uhe) {
throw new ChannelException(uhe);
}
}
private static native void joinGroup(int fd, boolean ipv6, byte[] group, byte[] interfaceAddress,
int scopeId, int interfaceIndex) throws IOException;
private static native void joinSsmGroup(int fd, boolean ipv6, byte[] group, byte[] interfaceAddress,
int scopeId, int interfaceIndex, byte[] source) throws IOException;
private static native void leaveGroup(int fd, boolean ipv6, byte[] group, byte[] interfaceAddress,
int scopeId, int interfaceIndex) throws IOException;
private static native void leaveSsmGroup(int fd, boolean ipv6, byte[] group, byte[] interfaceAddress,
int scopeId, int interfaceIndex, byte[] source) throws IOException;
private static native long sendFile(int socketFd, DefaultFileRegion src, long baseOffset,
long offset, long length) throws IOException;
@ -195,6 +310,7 @@ final class LinuxSocket extends Socket {
private static native int getTcpKeepIntvl(int fd) throws IOException;
private static native int getTcpKeepCnt(int fd) throws IOException;
private static native int getTcpUserTimeout(int fd) throws IOException;
private static native int getTimeToLive(int fd) throws IOException;
private static native int isIpFreeBind(int fd) throws IOException;
private static native int isIpTransparent(int fd) throws IOException;
private static native int isIpRecvOrigDestAddr(int fd) throws IOException;
@ -216,5 +332,9 @@ final class LinuxSocket extends Socket {
private static native void setIpFreeBind(int fd, int freeBind) throws IOException;
private static native void setIpTransparent(int fd, int transparent) throws IOException;
private static native void setIpRecvOrigDestAddr(int fd, int transparent) throws IOException;
private static native void setTcpMd5Sig(int fd, byte[] address, int scopeId, byte[] key) throws IOException;
private static native void setTcpMd5Sig(
int fd, boolean ipv6, byte[] address, int scopeId, byte[] key) throws IOException;
private static native void setInterface(
int fd, boolean ipv6, byte[] interfaceAddress, int scopeId, int networkInterfaceIndex) throws IOException;
private static native void setTimeToLive(int fd, int ttl) throws IOException;
}

View File

@ -152,9 +152,15 @@ public final class Native {
private static native int splice0(int fd, long offIn, int fdOut, long offOut, long len);
public static int sendmmsg(
int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len) throws IOException {
int res = sendmmsg0(fd, msgs, offset, len);
@Deprecated
public static int sendmmsg(int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs,
int offset, int len) throws IOException {
return sendmmsg(fd, Socket.isIPv6Preferred(), msgs, offset, len);
}
static int sendmmsg(int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket[] msgs,
int offset, int len) throws IOException {
int res = sendmmsg0(fd, ipv6, msgs, offset, len);
if (res >= 0) {
return res;
}
@ -162,7 +168,7 @@ public final class Native {
}
private static native int sendmmsg0(
int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len);
int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len);
// epoll_event related
public static native int sizeofEpollEvent();

View File

@ -0,0 +1,30 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.DatagramMulticastIPv6Test;
import java.util.List;
public class EpollDatagramMulticastIPv6Test extends DatagramMulticastIPv6Test {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily());
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.DatagramMulticastTest;
import java.util.List;
public class EpollDatagramMulticastTest extends DatagramMulticastTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily());
}
}

View File

@ -36,7 +36,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@ -143,7 +142,17 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollDatagramChannel.class);
return new Bootstrap().group(EPOLL_WORKER_GROUP).channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
return new EpollDatagramChannel(family);
}
@Override
public String toString() {
return InternetProtocolFamily.class.getSimpleName() + ".class";
}
});
}
}
);

View File

@ -190,6 +190,22 @@ static void netty_unix_socket_initialize(JNIEnv* env, jclass clazz, jboolean ipv
}
}
static jboolean netty_unix_socket_isIPv6Preferred(JNIEnv* env, jclass clazz) {
return socketType == AF_INET6;
}
static jboolean netty_unix_socket_isIPv6(JNIEnv* env, jclass clazz, jint fd) {
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
if (getsockname(fd, (struct sockaddr*) &addr, &addrlen) == 0) {
return ((struct sockaddr*) &addr)->sa_family == AF_INET6;
}
netty_unix_errors_throwChannelExceptionErrorNo(env, "getsockname(...) failed: ", errno);
return JNI_FALSE;
}
static void netty_unix_socket_optionHandleError(JNIEnv* env, int err, char* method) {
if (err == EBADF) {
netty_unix_errors_throwClosedChannelException(env);
@ -206,11 +222,11 @@ static int netty_unix_socket_setOption0(jint fd, int level, int optname, const v
return setsockopt(fd, level, optname, optval, len);
}
static jint _socket(JNIEnv* env, jclass clazz, int type) {
int fd = nettyNonBlockingSocket(socketType, type, 0);
static jint _socket(JNIEnv* env, jclass clazz, int domain, int type) {
int fd = nettyNonBlockingSocket(domain, type, 0);
if (fd == -1) {
return -errno;
} else if (socketType == AF_INET6) {
} else if (domain == AF_INET6) {
// Try to allow listen /connect ipv4 and ipv6
int optval = 0;
if (netty_unix_socket_setOption0(fd, IPPROTO_IPV6, IPV6_V6ONLY, &optval, sizeof(optval)) < 0) {
@ -229,7 +245,7 @@ static jint _socket(JNIEnv* env, jclass clazz, int type) {
return fd;
}
int netty_unix_socket_initSockaddr(JNIEnv* env, jbyteArray address, jint scopeId, jint jport,
int netty_unix_socket_initSockaddr(JNIEnv* env, jboolean ipv6, jbyteArray address, jint scopeId, jint jport,
const struct sockaddr_storage* addr, socklen_t* addrSize) {
uint16_t port = htons((uint16_t) jport);
// We use 16 bytes as this allows us to fit ipv6, ipv4 and ipv4 mapped ipv6 addresses in the array.
@ -251,7 +267,7 @@ int netty_unix_socket_initSockaddr(JNIEnv* env, jbyteArray address, jint scopeId
// https://bugs.openjdk.java.net/browse/JDK-8057586
(*env)->GetByteArrayRegion(env, address, 0, len, addressBytes);
if (socketType == AF_INET6) {
if (ipv6 == JNI_TRUE) {
struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr;
*addrSize = sizeof(struct sockaddr_in6);
ip6addr->sin6_family = AF_INET6;
@ -274,10 +290,10 @@ int netty_unix_socket_initSockaddr(JNIEnv* env, jbyteArray address, jint scopeId
return 0;
}
static jint _sendTo(JNIEnv* env, jint fd, void* buffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) {
static jint _sendTo(JNIEnv* env, jint fd, jboolean ipv6, void* buffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) {
struct sockaddr_storage addr;
socklen_t addrSize;
if (netty_unix_socket_initSockaddr(env, address, scopeId, port, &addr, &addrSize) == -1) {
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) {
return -1;
}
@ -412,10 +428,10 @@ static jint netty_unix_socket_shutdown(JNIEnv* env, jclass clazz, jint fd, jbool
return 0;
}
static jint netty_unix_socket_bind(JNIEnv* env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) {
static jint netty_unix_socket_bind(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray address, jint scopeId, jint port) {
struct sockaddr_storage addr;
socklen_t addrSize;
if (netty_unix_socket_initSockaddr(env, address, scopeId, port, &addr, &addrSize) == -1) {
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) {
return -1;
}
@ -432,10 +448,10 @@ static jint netty_unix_socket_listen(JNIEnv* env, jclass clazz, jint fd, jint ba
return 0;
}
static jint netty_unix_socket_connect(JNIEnv* env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) {
static jint netty_unix_socket_connect(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray address, jint scopeId, jint port) {
struct sockaddr_storage addr;
socklen_t addrSize;
if (netty_unix_socket_initSockaddr(env, address, scopeId, port, &addr, &addrSize) == -1) {
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) {
// A runtime exception was thrown
return -1;
}
@ -470,7 +486,7 @@ static jint netty_unix_socket_finishConnect(JNIEnv* env, jclass clazz, jint fd)
return -optval;
}
static jint netty_unix_socket_disconnect(JNIEnv* env, jclass clazz, jint fd) {
static jint netty_unix_socket_disconnect(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6) {
struct sockaddr_storage addr;
int len;
@ -478,7 +494,7 @@ static jint netty_unix_socket_disconnect(JNIEnv* env, jclass clazz, jint fd) {
// You can disconnect connection-less sockets by using AF_UNSPEC.
// See man 2 connect.
if (socketType == AF_INET6) {
if (ipv6 == JNI_TRUE) {
struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) &addr;
ip6addr->sin6_family = AF_UNSPEC;
len = sizeof(struct sockaddr_in6);
@ -562,12 +578,14 @@ static jbyteArray netty_unix_socket_localAddress(JNIEnv* env, jclass clazz, jint
return createInetSocketAddressArray(env, &addr);
}
static jint netty_unix_socket_newSocketDgramFd(JNIEnv* env, jclass clazz) {
return _socket(env, clazz, SOCK_DGRAM);
static jint netty_unix_socket_newSocketDgramFd(JNIEnv* env, jclass clazz, jboolean ipv6) {
int domain = ipv6 == JNI_TRUE ? AF_INET6 : AF_INET;
return _socket(env, clazz, domain, SOCK_DGRAM);
}
static jint netty_unix_socket_newSocketStreamFd(JNIEnv* env, jclass clazz) {
return _socket(env, clazz, SOCK_STREAM);
static jint netty_unix_socket_newSocketStreamFd(JNIEnv* env, jclass clazz, jboolean ipv6) {
int domain = ipv6 == JNI_TRUE ? AF_INET6 : AF_INET;
return _socket(env, clazz, domain, SOCK_STREAM);
}
static jint netty_unix_socket_newSocketDomainFd(JNIEnv* env, jclass clazz) {
@ -578,19 +596,19 @@ static jint netty_unix_socket_newSocketDomainFd(JNIEnv* env, jclass clazz) {
return fd;
}
static jint netty_unix_socket_sendTo(JNIEnv* env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) {
static jint netty_unix_socket_sendTo(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) {
// We check that GetDirectBufferAddress will not return NULL in OnLoad
return _sendTo(env, fd, (*env)->GetDirectBufferAddress(env, jbuffer), pos, limit, address, scopeId, port);
return _sendTo(env, fd, ipv6, (*env)->GetDirectBufferAddress(env, jbuffer), pos, limit, address, scopeId, port);
}
static jint netty_unix_socket_sendToAddress(JNIEnv* env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) {
return _sendTo(env, fd, (void *) (intptr_t) memoryAddress, pos, limit, address, scopeId, port);
static jint netty_unix_socket_sendToAddress(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) {
return _sendTo(env, fd, ipv6, (void *) (intptr_t) memoryAddress, pos, limit, address, scopeId, port);
}
static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port) {
static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port) {
struct sockaddr_storage addr;
socklen_t addrSize;
if (netty_unix_socket_initSockaddr(env, address, scopeId, port, &addr, &addrSize) == -1) {
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr, &addrSize) == -1) {
return -1;
}
@ -800,8 +818,8 @@ static void netty_unix_socket_setSoLinger(JNIEnv* env, jclass clazz, jint fd, ji
netty_unix_socket_setOption(env, fd, SOL_SOCKET, SO_LINGER, &solinger, sizeof(solinger));
}
static void netty_unix_socket_setTrafficClass(JNIEnv* env, jclass clazz, jint fd, jint optval) {
if (socketType == AF_INET6) {
static void netty_unix_socket_setTrafficClass(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jint optval) {
if (ipv6 == JNI_TRUE) {
// This call will put an exception on the stack to be processed once the JNI calls completes if
// setsockopt failed and return a negative value.
int rc = netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_TCLASS, &optval, sizeof(optval));
@ -867,9 +885,9 @@ static jint netty_unix_socket_getSoLinger(JNIEnv* env, jclass clazz, jint fd) {
}
}
static jint netty_unix_socket_getTrafficClass(JNIEnv* env, jclass clazz, jint fd) {
static jint netty_unix_socket_getTrafficClass(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6) {
int optval;
if (socketType == AF_INET6) {
if (ipv6 == JNI_TRUE) {
if (netty_unix_socket_getOption0(fd, IPPROTO_IPV6, IPV6_TCLASS, &optval, sizeof(optval)) == -1) {
if (errno == ENOPROTOOPT) {
if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval)) == -1) {
@ -926,20 +944,20 @@ static jint netty_unix_socket_isBroadcast(JNIEnv* env, jclass clazz, jint fd) {
// JNI Method Registration Table Begin
static const JNINativeMethod fixed_method_table[] = {
{ "shutdown", "(IZZ)I", (void *) netty_unix_socket_shutdown },
{ "bind", "(I[BII)I", (void *) netty_unix_socket_bind },
{ "bind", "(IZ[BII)I", (void *) netty_unix_socket_bind },
{ "listen", "(II)I", (void *) netty_unix_socket_listen },
{ "connect", "(I[BII)I", (void *) netty_unix_socket_connect },
{ "connect", "(IZ[BII)I", (void *) netty_unix_socket_connect },
{ "finishConnect", "(I)I", (void *) netty_unix_socket_finishConnect },
{ "disconnect", "(I)I", (void *) netty_unix_socket_disconnect},
{ "disconnect", "(IZ)I", (void *) netty_unix_socket_disconnect},
{ "accept", "(I[B)I", (void *) netty_unix_socket_accept },
{ "remoteAddress", "(I)[B", (void *) netty_unix_socket_remoteAddress },
{ "localAddress", "(I)[B", (void *) netty_unix_socket_localAddress },
{ "newSocketDgramFd", "()I", (void *) netty_unix_socket_newSocketDgramFd },
{ "newSocketStreamFd", "()I", (void *) netty_unix_socket_newSocketStreamFd },
{ "newSocketDgramFd", "(Z)I", (void *) netty_unix_socket_newSocketDgramFd },
{ "newSocketStreamFd", "(Z)I", (void *) netty_unix_socket_newSocketStreamFd },
{ "newSocketDomainFd", "()I", (void *) netty_unix_socket_newSocketDomainFd },
{ "sendTo", "(ILjava/nio/ByteBuffer;II[BII)I", (void *) netty_unix_socket_sendTo },
{ "sendToAddress", "(IJII[BII)I", (void *) netty_unix_socket_sendToAddress },
{ "sendToAddresses", "(IJI[BII)I", (void *) netty_unix_socket_sendToAddresses },
{ "sendTo", "(IZLjava/nio/ByteBuffer;II[BII)I", (void *) netty_unix_socket_sendTo },
{ "sendToAddress", "(IZJII[BII)I", (void *) netty_unix_socket_sendToAddress },
{ "sendToAddresses", "(IZJI[BII)I", (void *) netty_unix_socket_sendToAddresses },
// "recvFrom" has a dynamic signature
// "recvFromAddress" has a dynamic signature
{ "recvFd", "(I)I", (void *) netty_unix_socket_recvFd },
@ -954,7 +972,7 @@ static const JNINativeMethod fixed_method_table[] = {
{ "setSendBufferSize", "(II)V", (void *) netty_unix_socket_setSendBufferSize },
{ "setKeepAlive", "(II)V", (void *) netty_unix_socket_setKeepAlive },
{ "setSoLinger", "(II)V", (void *) netty_unix_socket_setSoLinger },
{ "setTrafficClass", "(II)V", (void *) netty_unix_socket_setTrafficClass },
{ "setTrafficClass", "(IZI)V", (void *) netty_unix_socket_setTrafficClass },
{ "isKeepAlive", "(I)I", (void *) netty_unix_socket_isKeepAlive },
{ "isTcpNoDelay", "(I)I", (void *) netty_unix_socket_isTcpNoDelay },
{ "isBroadcast", "(I)I", (void *) netty_unix_socket_isBroadcast },
@ -963,9 +981,11 @@ static const JNINativeMethod fixed_method_table[] = {
{ "getReceiveBufferSize", "(I)I", (void *) netty_unix_socket_getReceiveBufferSize },
{ "getSendBufferSize", "(I)I", (void *) netty_unix_socket_getSendBufferSize },
{ "getSoLinger", "(I)I", (void *) netty_unix_socket_getSoLinger },
{ "getTrafficClass", "(I)I", (void *) netty_unix_socket_getTrafficClass },
{ "getTrafficClass", "(IZ)I", (void *) netty_unix_socket_getTrafficClass },
{ "getSoError", "(I)I", (void *) netty_unix_socket_getSoError },
{ "initialize", "(Z)V", (void *) netty_unix_socket_initialize }
{ "initialize", "(Z)V", (void *) netty_unix_socket_initialize },
{ "isIPv6Preferred", "()Z", (void *) netty_unix_socket_isIPv6Preferred },
{ "isIPv6", "(I)Z", (void *) netty_unix_socket_isIPv6 }
};
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);

View File

@ -20,7 +20,7 @@
#include <jni.h>
// External C methods
int netty_unix_socket_initSockaddr(JNIEnv* env, jbyteArray address, jint scopeId, jint jport, const struct sockaddr_storage* addr, socklen_t* addrSize);
int netty_unix_socket_initSockaddr(JNIEnv* env, jboolean ipv6, jbyteArray address, jint scopeId, jint jport, const struct sockaddr_storage* addr, socklen_t* addrSize);
int netty_unix_socket_getOption(JNIEnv* env, jint fd, int level, int optname, void* optval, socklen_t optlen);
int netty_unix_socket_setOption(JNIEnv* env, jint fd, int level, int optname, const void* optval, socklen_t len);

View File

@ -48,8 +48,11 @@ public class Socket extends FileDescriptor {
public static final int UDS_SUN_PATH_SIZE = udsSunPathSize();
protected final boolean ipv6;
public Socket(int fd) {
super(fd);
this.ipv6 = isIPv6(fd);
}
public final void shutdown() throws IOException {
@ -114,7 +117,7 @@ public class Socket extends FileDescriptor {
scopeId = 0;
address = ipv4MappedIpv6Address(addr.getAddress());
}
int res = sendTo(fd, buf, pos, limit, address, scopeId, port);
int res = sendTo(fd, ipv6, buf, pos, limit, address, scopeId, port);
if (res >= 0) {
return res;
}
@ -138,7 +141,7 @@ public class Socket extends FileDescriptor {
scopeId = 0;
address = ipv4MappedIpv6Address(addr.getAddress());
}
int res = sendToAddress(fd, memoryAddress, pos, limit, address, scopeId, port);
int res = sendToAddress(fd, ipv6, memoryAddress, pos, limit, address, scopeId, port);
if (res >= 0) {
return res;
}
@ -161,7 +164,7 @@ public class Socket extends FileDescriptor {
scopeId = 0;
address = ipv4MappedIpv6Address(addr.getAddress());
}
int res = sendToAddresses(fd, memoryAddress, length, address, scopeId, port);
int res = sendToAddresses(fd, ipv6, memoryAddress, length, address, scopeId, port);
if (res >= 0) {
return res;
}
@ -213,7 +216,7 @@ public class Socket extends FileDescriptor {
if (socketAddress instanceof InetSocketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
NativeInetAddress address = NativeInetAddress.newInstance(inetSocketAddress.getAddress());
res = connect(fd, address.address, address.scopeId, inetSocketAddress.getPort());
res = connect(fd, ipv6, address.address, address.scopeId, inetSocketAddress.getPort());
} else if (socketAddress instanceof DomainSocketAddress) {
DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) socketAddress;
res = connectDomainSocket(fd, unixDomainSocketAddress.path().getBytes(CharsetUtil.UTF_8));
@ -243,7 +246,7 @@ public class Socket extends FileDescriptor {
}
public final void disconnect() throws IOException {
int res = disconnect(fd);
int res = disconnect(fd, ipv6);
if (res < 0) {
throwConnectException("disconnect", res);
}
@ -253,7 +256,7 @@ public class Socket extends FileDescriptor {
if (socketAddress instanceof InetSocketAddress) {
InetSocketAddress addr = (InetSocketAddress) socketAddress;
NativeInetAddress address = NativeInetAddress.newInstance(addr.getAddress());
int res = bind(fd, address.address, address.scopeId, addr.getPort());
int res = bind(fd, ipv6, address.address, address.scopeId, addr.getPort());
if (res < 0) {
throw newIOException("bind", res);
}
@ -338,7 +341,7 @@ public class Socket extends FileDescriptor {
}
public final int getTrafficClass() throws IOException {
return getTrafficClass(fd);
return getTrafficClass(fd, ipv6);
}
public final void setKeepAlive(boolean keepAlive) throws IOException {
@ -374,9 +377,13 @@ public class Socket extends FileDescriptor {
}
public final void setTrafficClass(int trafficClass) throws IOException {
setTrafficClass(fd, trafficClass);
setTrafficClass(fd, ipv6, trafficClass);
}
public static native boolean isIPv6Preferred();
private static native boolean isIPv6(int fd);
@Override
public String toString() {
return "Socket{" +
@ -405,7 +412,11 @@ public class Socket extends FileDescriptor {
}
protected static int newSocketStream0() {
int res = newSocketStreamFd();
return newSocketStream0(isIPv6Preferred());
}
protected static int newSocketStream0(boolean ipv6) {
int res = newSocketStreamFd(ipv6);
if (res < 0) {
throw new ChannelException(newIOException("newSocketStream", res));
}
@ -413,7 +424,11 @@ public class Socket extends FileDescriptor {
}
protected static int newSocketDgram0() {
int res = newSocketDgramFd();
return newSocketDgram0(isIPv6Preferred());
}
protected static int newSocketDgram0(boolean ipv6) {
int res = newSocketDgramFd(ipv6);
if (res < 0) {
throw new ChannelException(newIOException("newSocketDgram", res));
}
@ -429,11 +444,11 @@ public class Socket extends FileDescriptor {
}
private static native int shutdown(int fd, boolean read, boolean write);
private static native int connect(int fd, byte[] address, int scopeId, int port);
private static native int connect(int fd, boolean ipv6, byte[] address, int scopeId, int port);
private static native int connectDomainSocket(int fd, byte[] path);
private static native int finishConnect(int fd);
private static native int disconnect(int fd);
private static native int bind(int fd, byte[] address, int scopeId, int port);
private static native int disconnect(int fd, boolean ipv6);
private static native int bind(int fd, boolean ipv6, byte[] address, int scopeId, int port);
private static native int bindDomainSocket(int fd, byte[] path);
private static native int listen(int fd, int backlog);
private static native int accept(int fd, byte[] addr);
@ -442,11 +457,11 @@ public class Socket extends FileDescriptor {
private static native byte[] localAddress(int fd);
private static native int sendTo(
int fd, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port);
int fd, boolean ipv6, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port);
private static native int sendToAddress(
int fd, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port);
int fd, boolean ipv6, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port);
private static native int sendToAddresses(
int fd, long memoryAddress, int length, byte[] address, int scopeId, int port);
int fd, boolean ipv6, long memoryAddress, int length, byte[] address, int scopeId, int port);
private static native DatagramSocketAddress recvFrom(
int fd, ByteBuffer buf, int pos, int limit) throws IOException;
@ -455,8 +470,8 @@ public class Socket extends FileDescriptor {
private static native int recvFd(int fd);
private static native int sendFd(int socketFd, int fd);
private static native int newSocketStreamFd();
private static native int newSocketDgramFd();
private static native int newSocketStreamFd(boolean ipv6);
private static native int newSocketDgramFd(boolean ipv6);
private static native int newSocketDomainFd();
private static native int isReuseAddress(int fd) throws IOException;
@ -468,7 +483,7 @@ public class Socket extends FileDescriptor {
private static native int isBroadcast(int fd) throws IOException;
private static native int getSoLinger(int fd) throws IOException;
private static native int getSoError(int fd) throws IOException;
private static native int getTrafficClass(int fd) throws IOException;
private static native int getTrafficClass(int fd, boolean ipv6) throws IOException;
private static native void setReuseAddress(int fd, int reuseAddress) throws IOException;
private static native void setReusePort(int fd, int reuseAddress) throws IOException;
@ -478,6 +493,6 @@ public class Socket extends FileDescriptor {
private static native void setTcpNoDelay(int fd, int tcpNoDelay) throws IOException;
private static native void setSoLinger(int fd, int soLinger) throws IOException;
private static native void setBroadcast(int fd, int broadcast) throws IOException;
private static native void setTrafficClass(int fd, int trafficClass) throws IOException;
private static native void setTrafficClass(int fd, boolean ipv6, int trafficClass) throws IOException;
private static native void initialize(boolean ipv4Preferred);
}