Fix support for IP_RECVORIGDSTADDR when using native epoll transport (#11173)

Motivation:

While adding support for GRO (b05fdf3ff8) we broke support for IP_RECVORIGDSTADDR when using the native transport. Beside this we also didnt correctly handle IP_RECVORIGDSTADDR when recvmmsg was used.

Modifications:

- Fix support for IP_RECVORIGDSTADDR when using the native epoll transport for normal reads (recvmsg) but also for scattering reads (recvmmsg)
- Remove code from unix code-base as the support is linux specific and we not need the code there anymore

Result:

Fixes https://github.com/netty/netty/issues/11141
This commit is contained in:
Norman Maurer 2021-04-21 16:04:12 +02:00 committed by GitHub
parent 697952e3e6
commit e27831d767
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 128 additions and 114 deletions

View File

@ -76,6 +76,11 @@
#define UDP_GRO 104
#endif
#ifdef IP_RECVORIGDSTADDR
#if !defined(SOL_IP) && defined(IPPROTO_IP)
#define SOL_IP IPPROTO_IP
#endif /* !SOL_IP && IPPROTO_IP */
#endif // IP_RECVORIGDSTADDR
// optional
extern int epoll_create1(int flags) __attribute__((weak));
@ -115,11 +120,16 @@ struct mmsghdr {
#endif // SYS_sendmmsg
// Those are initialized in the init(...) method and cached for performance reasons
static jfieldID packetAddrFieldId = NULL;
static jfieldID packetAddrLenFieldId = NULL;
static jfieldID packetSenderAddrFieldId = NULL;
static jfieldID packetSenderAddrLenFieldId = NULL;
static jfieldID packetSenderScopeIdFieldId = NULL;
static jfieldID packetSenderPortFieldId = NULL;
static jfieldID packetRecipientAddrFieldId = NULL;
static jfieldID packetRecipientAddrLenFieldId = NULL;
static jfieldID packetRecipientScopeIdFieldId = NULL;
static jfieldID packetRecipientPortFieldId = NULL;
static jfieldID packetSegmentSizeFieldId = NULL;
static jfieldID packetScopeIdFieldId = NULL;
static jfieldID packetPortFieldId = NULL;
static jfieldID packetMemoryAddressFieldId = NULL;
static jfieldID packetCountFieldId = NULL;
@ -344,8 +354,8 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
for (i = 0; i < len; i++) {
jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset);
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId);
jint addrLen = (*env)->GetIntField(env, packet, packetAddrLenFieldId);
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetRecipientAddrFieldId);
jint addrLen = (*env)->GetIntField(env, packet, packetRecipientAddrLenFieldId);
jint packetSegmentSize = (*env)->GetIntField(env, packet, packetSegmentSizeFieldId);
if (packetSegmentSize > 0) {
msg[i].msg_hdr.msg_control = controls[i];
@ -357,8 +367,8 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
*((uint16_t *) CMSG_DATA(cm)) = packetSegmentSize;
}
if (addrLen != 0) {
jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId);
jint port = (*env)->GetIntField(env, packet, packetPortFieldId);
jint scopeId = (*env)->GetIntField(env, packet, packetRecipientScopeIdFieldId);
jint port = (*env)->GetIntField(env, packet, packetRecipientPortFieldId);
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr[i], &addrSize) == -1) {
return -1;
@ -385,20 +395,17 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
return (jint) res;
}
static void init_packet(JNIEnv* env, jobject packet, struct msghdr* msg, int len) {
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId);
(*env)->SetIntField(env, packet, packetCountFieldId, len);
struct sockaddr_storage* addr = (struct sockaddr_storage*) msg->msg_name;
static void init_packet_address(JNIEnv* env, jobject packet, struct sockaddr_storage* addr, jfieldID addrFieldId,
jfieldID addrLenFieldId, jfieldID scopeIdFieldId, jfieldID portFieldId) {
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, addrFieldId);
if (addr->ss_family == AF_INET) {
struct sockaddr_in* ipaddr = (struct sockaddr_in*) addr;
(*env)->SetByteArrayRegion(env, address, 0, 4, (jbyte*) &ipaddr->sin_addr.s_addr);
(*env)->SetIntField(env, packet, packetAddrLenFieldId, 4);
(*env)->SetIntField(env, packet, packetScopeIdFieldId, 0);
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ipaddr->sin_port));
(*env)->SetIntField(env, packet, addrLenFieldId, 4);
(*env)->SetIntField(env, packet, scopeIdFieldId, 0);
(*env)->SetIntField(env, packet, portFieldId, ntohs(ipaddr->sin_port));
} else {
int addrLen = netty_unix_socket_ipAddressLength(addr);
struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr;
@ -410,10 +417,17 @@ static void init_packet(JNIEnv* env, jobject packet, struct msghdr* msg, int len
} else {
(*env)->SetByteArrayRegion(env, address, 0, 16, (jbyte*) &ip6addr->sin6_addr.s6_addr);
}
(*env)->SetIntField(env, packet, packetAddrLenFieldId, addrLen);
(*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id);
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port));
(*env)->SetIntField(env, packet, addrLenFieldId, addrLen);
(*env)->SetIntField(env, packet, scopeIdFieldId, ip6addr->sin6_scope_id);
(*env)->SetIntField(env, packet, portFieldId, ntohs(ip6addr->sin6_port));
}
}
static void init_packet(JNIEnv* env, jobject packet, struct msghdr* msg, int len) {
(*env)->SetIntField(env, packet, packetCountFieldId, len);
init_packet_address(env, packet, (struct sockaddr_storage*) msg->msg_name, packetSenderAddrFieldId, packetSenderAddrLenFieldId, packetSenderScopeIdFieldId, packetSenderPortFieldId);
struct cmsghdr *cmsg = NULL;
uint16_t gso_size = 0;
uint16_t *gsosizeptr = NULL;
@ -421,8 +435,12 @@ static void init_packet(JNIEnv* env, jobject packet, struct msghdr* msg, int len
if (cmsg->cmsg_level == SOL_UDP && cmsg->cmsg_type == UDP_GRO) {
gsosizeptr = (uint16_t *) CMSG_DATA(cmsg);
gso_size = *gsosizeptr;
break;
}
#ifdef IP_RECVORIGDSTADDR
else if (cmsg->cmsg_level == SOL_IP && cmsg->cmsg_type == IP_RECVORIGDSTADDR) {
init_packet_address(env, packet, (struct sockaddr_storage*) CMSG_DATA(cmsg), packetRecipientAddrFieldId, packetRecipientAddrLenFieldId, packetRecipientScopeIdFieldId, packetRecipientPortFieldId);
}
#endif // IP_RECVORIGDSTADDR
}
(*env)->SetIntField(env, packet, packetSegmentSizeFieldId, gso_size);
}
@ -431,8 +449,8 @@ static jint netty_epoll_native_recvmsg0(JNIEnv* env, jclass clazz, jint fd, jboo
struct msghdr msg = { 0 };
struct sockaddr_storage sock_address;
int addrSize = sizeof(sock_address);
// Enough space for GRO
char control[CMSG_SPACE(sizeof(uint16_t))] = { 0 };
// Enough space for GRO and IP_RECVORIGDSTADDR
char control[CMSG_SPACE(sizeof(uint16_t)) + sizeof(struct sockaddr_storage)] = { 0 };
msg.msg_name = &sock_address;
msg.msg_namelen = (socklen_t) addrSize;
msg.msg_iov = (struct iovec*) (intptr_t) (*env)->GetLongField(env, packet, packetMemoryAddressFieldId);
@ -458,6 +476,16 @@ static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
struct sockaddr_storage addr[len];
int addrSize = sizeof(addr);
memset(addr, 0, addrSize);
int storageSize = sizeof(struct sockaddr_storage);
char* cntrlbuf = NULL;
#ifdef IP_RECVORIGDSTADDR
int readLocalAddr = 0;
if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_RECVORIGDSTADDR,
&readLocalAddr, sizeof(readLocalAddr)) < 0) {
cntrlbuf = malloc(sizeof(char) * storageSize * len);
}
#endif // IP_RECVORIGDSTADDR
int i;
@ -468,6 +496,11 @@ static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
msg[i].msg_hdr.msg_name = addr + i;
msg[i].msg_hdr.msg_namelen = (socklen_t) addrSize;
if (cntrlbuf != NULL) {
msg[i].msg_hdr.msg_control = cntrlbuf + i * storageSize;
msg[i].msg_hdr.msg_controllen = storageSize;
}
}
ssize_t res;
@ -479,15 +512,18 @@ static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
// keep on reading if it was interrupted
} while (res == -1 && ((err = errno) == EINTR));
if (res < 0) {
return -err;
}
if (res >= 0) {
for (i = 0; i < res; i++) {
jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset);
init_packet(env, packet, &msg[i].msg_hdr, msg[i].msg_len);
}
}
// Free the control message buffer if needed.
free(cntrlbuf);
if (res < 0) {
return -err;
}
return (jint) res;
}
@ -735,11 +771,15 @@ static jint netty_epoll_native_JNI_OnLoad(JNIEnv* env, const char* packagePrefix
NETTY_JNI_UTIL_FIND_CLASS(env, nativeDatagramPacketCls, nettyClassName, done);
netty_jni_util_free_dynamic_name(&nettyClassName);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetAddrFieldId, "addr", "[B", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetAddrLenFieldId, "addrLen", "I", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetSenderAddrFieldId, "senderAddr", "[B", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetSenderAddrLenFieldId, "senderAddrLen", "I", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetSenderScopeIdFieldId, "senderScopeId", "I", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetSenderPortFieldId, "senderPort", "I", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetRecipientAddrFieldId, "recipientAddr", "[B", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetRecipientAddrLenFieldId, "recipientAddrLen", "I", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetRecipientScopeIdFieldId, "recipientScopeId", "I", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetRecipientPortFieldId, "recipientPort", "I", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetSegmentSizeFieldId, "segmentSize", "I", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetScopeIdFieldId, "scopeId", "I", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetPortFieldId, "port", "I", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetMemoryAddressFieldId, "memoryAddress", "J", done);
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetCountFieldId, "count", "I", done);
@ -763,11 +803,15 @@ done:
if (linuxsocketOnLoadCalled == 1) {
netty_epoll_linuxsocket_JNI_OnUnLoad(env, packagePrefix);
}
packetAddrFieldId = NULL;
packetAddrLenFieldId = NULL;
packetSenderAddrFieldId = NULL;
packetSenderAddrLenFieldId = NULL;
packetSenderScopeIdFieldId = NULL;
packetSenderPortFieldId = NULL;
packetRecipientAddrFieldId = NULL;
packetRecipientAddrLenFieldId = NULL;
packetRecipientScopeIdFieldId = NULL;
packetRecipientPortFieldId = NULL;
packetSegmentSizeFieldId = NULL;
packetScopeIdFieldId = NULL;
packetPortFieldId = NULL;
packetMemoryAddressFieldId = NULL;
packetCountFieldId = NULL;
}
@ -790,11 +834,15 @@ static void netty_epoll_native_JNI_OnUnload(JNIEnv* env) {
staticPackagePrefix = NULL;
}
packetAddrFieldId = NULL;
packetAddrLenFieldId = NULL;
packetSenderAddrFieldId = NULL;
packetSenderAddrLenFieldId = NULL;
packetSenderScopeIdFieldId = NULL;
packetSenderPortFieldId = NULL;
packetRecipientAddrFieldId = NULL;
packetRecipientAddrLenFieldId = NULL;
packetRecipientScopeIdFieldId = NULL;
packetRecipientPortFieldId = NULL;
packetSegmentSizeFieldId = NULL;
packetScopeIdFieldId = NULL;
packetPortFieldId = NULL;
packetMemoryAddressFieldId = NULL;
packetCountFieldId = NULL;
}

View File

@ -142,6 +142,18 @@ final class NativeDatagramPacketArray {
}
}
private static InetSocketAddress newAddress(byte[] addr, int addrLen, int port, int scopeId, byte[] ipv4Bytes)
throws UnknownHostException {
final InetAddress address;
if (addrLen == ipv4Bytes.length) {
System.arraycopy(addr, 0, ipv4Bytes, 0, addrLen);
address = InetAddress.getByAddress(ipv4Bytes);
} else {
address = Inet6Address.getByAddress(null, addr, scopeId);
}
return new InetSocketAddress(address, port);
}
/**
* Used to pass needed data to JNI.
*/
@ -152,45 +164,50 @@ final class NativeDatagramPacketArray {
private long memoryAddress;
private int count;
private final byte[] addr = new byte[16];
private final byte[] senderAddr = new byte[16];
private int senderAddrLen;
private int senderScopeId;
private int senderPort;
private final byte[] recipientAddr = new byte[16];
private int recipientAddrLen;
private int recipientScopeId;
private int recipientPort;
private int segmentSize;
private int addrLen;
private int scopeId;
private int port;
private void init(long memoryAddress, int count, int segmentSize, InetSocketAddress recipient) {
this.memoryAddress = memoryAddress;
this.count = count;
this.segmentSize = segmentSize;
this.senderScopeId = 0;
this.senderPort = 0;
this.senderAddrLen = 0;
if (recipient == null) {
this.scopeId = 0;
this.port = 0;
this.addrLen = 0;
this.recipientScopeId = 0;
this.recipientPort = 0;
this.recipientAddrLen = 0;
} else {
InetAddress address = recipient.getAddress();
if (address instanceof Inet6Address) {
System.arraycopy(address.getAddress(), 0, addr, 0, addr.length);
scopeId = ((Inet6Address) address).getScopeId();
System.arraycopy(address.getAddress(), 0, recipientAddr, 0, recipientAddr.length);
recipientScopeId = ((Inet6Address) address).getScopeId();
} else {
copyIpv4MappedIpv6Address(address.getAddress(), addr);
scopeId = 0;
copyIpv4MappedIpv6Address(address.getAddress(), recipientAddr);
recipientScopeId = 0;
}
addrLen = addr.length;
port = recipient.getPort();
recipientAddrLen = recipientAddr.length;
recipientPort = recipient.getPort();
}
}
DatagramPacket newDatagramPacket(ByteBuf buffer, InetSocketAddress recipient) throws UnknownHostException {
final InetAddress address;
if (addrLen == ipv4Bytes.length) {
System.arraycopy(addr, 0, ipv4Bytes, 0, addrLen);
address = InetAddress.getByAddress(ipv4Bytes);
} else {
address = Inet6Address.getByAddress(null, addr, scopeId);
InetSocketAddress sender = newAddress(senderAddr, senderAddrLen, senderPort, senderScopeId, ipv4Bytes);
if (recipientAddrLen != 0) {
recipient = newAddress(recipientAddr, recipientAddrLen, recipientPort, recipientScopeId, ipv4Bytes);
}
InetSocketAddress sender = new InetSocketAddress(address, port);
buffer.writerIndex(count);
// UDP_GRO

View File

@ -339,43 +339,9 @@ static jobject _recvFrom(JNIEnv* env, jint fd, void* buffer, jint pos, jint limi
socklen_t addrlen = sizeof(addr);
ssize_t res;
int err;
jobject local = NULL;
#ifdef IP_RECVORIGDSTADDR
struct sockaddr_storage daddr;
struct iovec iov;
struct cmsghdr* cmsg;
struct msghdr msg;
char cntrlbuf[64];
int readLocalAddr;
if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_RECVORIGDSTADDR,
&readLocalAddr, sizeof(readLocalAddr)) < 0) {
readLocalAddr = 0;
}
if (readLocalAddr) {
iov.iov_base = buffer + pos;
iov.iov_len = (size_t) (limit - pos);
msg.msg_name = (struct sockaddr*) &addr;
msg.msg_namelen = addrlen;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = cntrlbuf;
msg.msg_controllen = sizeof(cntrlbuf);
}
#endif
do {
#ifdef IP_RECVORIGDSTADDR
if (readLocalAddr) {
res = recvmsg(fd, &msg, 0);
} else {
#endif
res = recvfrom(fd, buffer + pos, (size_t) (limit - pos), 0, (struct sockaddr*) &addr, &addrlen);
#ifdef IP_RECVORIGDSTADDR
}
#endif
// Keep on reading if we was interrupted
} while (res == -1 && ((err = errno) == EINTR));
@ -396,24 +362,7 @@ static jobject _recvFrom(JNIEnv* env, jint fd, void* buffer, jint pos, jint limi
return NULL;
}
#ifdef IP_RECVORIGDSTADDR
#if !defined(SOL_IP) && defined(IPPROTO_IP)
#define SOL_IP IPPROTO_IP
#endif /* !SOL_IP && IPPROTO_IP */
if (readLocalAddr) {
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level == SOL_IP && cmsg->cmsg_type == IP_RECVORIGDSTADDR) {
memcpy (&daddr, CMSG_DATA(cmsg), sizeof (struct sockaddr_storage));
local = createDatagramSocketAddress(env, &daddr, res, NULL);
if (local == NULL) {
return NULL;
}
break;
}
}
}
#endif
return createDatagramSocketAddress(env, &addr, res, local);
return createDatagramSocketAddress(env, &addr, res, NULL);
}
void netty_unix_socket_getOptionHandleError(JNIEnv* env, int err) {