From 02c460be1466a30cbe136952e0b2fcbb971b8a90 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 29 Mar 2021 08:51:08 +0200 Subject: [PATCH] Add support for UDP_GRO (#11120) Motivation: UDP_GRO can improve performance when reading UDP datagrams. This patch adds support for it. See https://lwn.net/Articles/768995/ Modifications: - Add recvmsg(...) - Add support for UDP_GRO in recvmsg(...) and recvmmsg(...) - Remove usage of recvfrom(...) and just always use recvmsg(...) or recvmmsg(...) to simplify things - Refactor some code for sharing - Add EpollChannelOption.UDP_GRO and the getter / setter in EpollDatagramConfig Result: UDP_GRO is supported when the underlying system supports it. --- .../src/main/c/netty_epoll_linuxsocket.c | 26 ++- .../src/main/c/netty_epoll_native.c | 114 +++++++--- .../channel/epoll/EpollChannelOption.java | 1 + .../channel/epoll/EpollDatagramChannel.java | 195 +++++++++++------- .../epoll/EpollDatagramChannelConfig.java | 36 +++- .../io/netty/channel/epoll/LinuxSocket.java | 14 ++ .../java/io/netty/channel/epoll/Native.java | 11 + .../epoll/NativeDatagramPacketArray.java | 12 +- .../epoll/EpollDatagramUnicastTest.java | 41 +++- 9 files changed, 329 insertions(+), 121 deletions(-) diff --git a/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c b/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c index 6f5a245bd4..084743021d 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c +++ b/transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c @@ -25,9 +25,9 @@ #include #include #include +#include // SOL_UDP #include #include // TCP_NOTSENT_LOWAT is a linux specific define - #include "netty_epoll_linuxsocket.h" #include "netty_unix_errors.h" #include "netty_unix_filedescriptor.h" @@ -57,6 +57,11 @@ #define SO_BUSY_POLL 46 #endif +// UDP_GRO is defined in linux 5. We define this here so older kernels can compile. +#ifndef UDP_GRO +#define UDP_GRO 104 +#endif + static jclass peerCredentialsClass = NULL; static jmethodID peerCredentialsMethodId = NULL; @@ -610,6 +615,19 @@ static jobject netty_epoll_linuxsocket_getPeerCredentials(JNIEnv *env, jclass cl return (*env)->NewObject(env, peerCredentialsClass, peerCredentialsMethodId, credentials.pid, credentials.uid, gids); } +static jint netty_epoll_linuxsocket_isUdpGro(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, SOL_UDP, UDP_GRO, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +static void netty_epoll_linuxsocket_setUdpGro(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, SOL_UDP, UDP_GRO, &optval, sizeof(optval)); +} + + static jlong netty_epoll_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd, jobject fileRegion, jlong base_off, jlong off, jlong len) { jobject fileChannel = (*env)->GetObjectField(env, fileRegion, fileChannelFieldId); if (fileChannel == NULL) { @@ -642,6 +660,7 @@ static jlong netty_epoll_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd return res; } + // JNI Registered Methods End // JNI Method Registration Table Begin @@ -682,7 +701,10 @@ static const JNINativeMethod fixed_method_table[] = { { "joinGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_joinGroup }, { "joinSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_joinSsmGroup }, { "leaveGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_leaveGroup }, - { "leaveSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_leaveSsmGroup } + { "leaveSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_leaveSsmGroup }, + { "isUdpGro", "(I)I", (void *) netty_epoll_linuxsocket_isUdpGro }, + { "setUdpGro", "(II)V", (void *) netty_epoll_linuxsocket_setUdpGro } + // "sendFile" has a dynamic signature }; diff --git a/transport-native-epoll/src/main/c/netty_epoll_native.c b/transport-native-epoll/src/main/c/netty_epoll_native.c index 0f5e19fcde..0667382539 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_native.c +++ b/transport-native-epoll/src/main/c/netty_epoll_native.c @@ -71,6 +71,12 @@ #define UDP_SEGMENT 103 #endif +// UDP_GRO is defined in linux 5. We define this here so older kernels can compile. +#ifndef UDP_GRO +#define UDP_GRO 104 +#endif + + // optional extern int epoll_create1(int flags) __attribute__((weak)); @@ -373,6 +379,73 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo return (jint) res; } +static void init_packet(JNIEnv* env, jobject packet, struct msghdr* msg, int len) { + jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId); + + (*env)->SetIntField(env, packet, packetCountFieldId, len); + + struct sockaddr_storage* addr = (struct sockaddr_storage*) msg->msg_name; + + if (addr->ss_family == AF_INET) { + struct sockaddr_in* ipaddr = (struct sockaddr_in*) addr; + + (*env)->SetByteArrayRegion(env, address, 0, 4, (jbyte*) &ipaddr->sin_addr.s_addr); + (*env)->SetIntField(env, packet, packetAddrLenFieldId, 4); + (*env)->SetIntField(env, packet, packetScopeIdFieldId, 0); + (*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ipaddr->sin_port)); + } else { + int addrLen = netty_unix_socket_ipAddressLength(addr); + struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr; + + if (addrLen == 4) { + // IPV4 mapped IPV6 address + jbyte* addr = (jbyte*) &ip6addr->sin6_addr.s6_addr; + (*env)->SetByteArrayRegion(env, address, 0, 4, addr + 12); + } else { + (*env)->SetByteArrayRegion(env, address, 0, 16, (jbyte*) &ip6addr->sin6_addr.s6_addr); + } + (*env)->SetIntField(env, packet, packetAddrLenFieldId, addrLen); + (*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id); + (*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port)); + } + struct cmsghdr *cmsg = NULL; + uint16_t gso_size = 0; + uint16_t *gsosizeptr = NULL; + for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) { + if (cmsg->cmsg_level == SOL_UDP && cmsg->cmsg_type == UDP_GRO) { + gsosizeptr = (uint16_t *) CMSG_DATA(cmsg); + gso_size = *gsosizeptr; + break; + } + } + (*env)->SetIntField(env, packet, packetSegmentSizeFieldId, gso_size); +} + +static jint netty_epoll_native_recvmsg0(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject packet) { + struct msghdr msg = { 0 }; + struct sockaddr_storage sock_address; + int addrSize = sizeof(sock_address); + // Enough space for GRO + char control[CMSG_SPACE(sizeof(uint16_t))] = { 0 }; + msg.msg_name = &sock_address; + msg.msg_namelen = (socklen_t) addrSize; + msg.msg_iov = (struct iovec*) (intptr_t) (*env)->GetLongField(env, packet, packetMemoryAddressFieldId); + msg.msg_iovlen = (*env)->GetIntField(env, packet, packetCountFieldId); + msg.msg_control = control; + msg.msg_controllen = sizeof(control); + ssize_t res; + int err; + do { + res = recvmsg(fd, &msg, 0); + // keep on reading if it was interrupted + } while (res == -1 && ((err = errno) == EINTR)); + if (res < 0) { + return -err; + } + init_packet(env, packet, &msg, res); + return (jint) res; +} + static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobjectArray packets, jint offset, jint len) { struct mmsghdr msg[len]; memset(msg, 0, sizeof(msg)); @@ -406,36 +479,7 @@ static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo for (i = 0; i < res; i++) { jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset); - jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId); - - (*env)->SetIntField(env, packet, packetCountFieldId, msg[i].msg_len); - - struct sockaddr_storage* addr = (struct sockaddr_storage*) msg[i].msg_hdr.msg_name; - - if (addr->ss_family == AF_INET) { - struct sockaddr_in* ipaddr = (struct sockaddr_in*) addr; - - (*env)->SetByteArrayRegion(env, address, 0, 4, (jbyte*) &ipaddr->sin_addr.s_addr); - (*env)->SetIntField(env, packet, packetAddrLenFieldId, 4); - (*env)->SetIntField(env, packet, packetScopeIdFieldId, 0); - (*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ipaddr->sin_port)); - } else { - int addrLen = netty_unix_socket_ipAddressLength(addr); - struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr; - - if (addrLen == 4) { - // IPV4 mapped IPV6 address - jbyte* addr = (jbyte*) &ip6addr->sin6_addr.s6_addr; - (*env)->SetByteArrayRegion(env, address, 0, 4, addr + 12); - } else { - (*env)->SetByteArrayRegion(env, address, 0, 16, (jbyte*) &ip6addr->sin6_addr.s6_addr); - } - (*env)->SetIntField(env, packet, packetAddrLenFieldId, addrLen); - (*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id); - (*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port)); - } - // TODO: Support this also for recvmmsg - (*env)->SetIntField(env, packet, packetSegmentSizeFieldId, 0); + init_packet(env, packet, &msg[i].msg_hdr, msg[i].msg_len); } return (jint) res; @@ -451,6 +495,7 @@ static jstring netty_epoll_native_kernelVersion(JNIEnv* env, jclass clazz) { netty_unix_errors_throwRuntimeExceptionErrorNo(env, "uname() failed: ", errno); return NULL; } + static jboolean netty_epoll_native_isSupportingSendmmsg(JNIEnv* env, jclass clazz) { if (SYS_sendmmsg == -1) { return JNI_FALSE; @@ -575,7 +620,7 @@ static const JNINativeMethod fixed_method_table[] = { static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); static jint dynamicMethodsTableSize() { - return fixed_method_table_size + 2; // 2 is for the dynamic method signatures. + return fixed_method_table_size + 3; // 3 is for the dynamic method signatures. } static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) { @@ -602,6 +647,13 @@ static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) { dynamicMethod->fnPtr = (void *) netty_epoll_native_recvmmsg0; netty_jni_util_free_dynamic_name(&dynamicTypeName); + ++dynamicMethod; + NETTY_JNI_UTIL_PREPEND(packagePrefix, "io/netty/channel/epoll/NativeDatagramPacketArray$NativeDatagramPacket;)I", dynamicTypeName, error); + NETTY_JNI_UTIL_PREPEND("(IZL", dynamicTypeName, dynamicMethod->signature, error); + dynamicMethod->name = "recvmsg0"; + dynamicMethod->fnPtr = (void *) netty_epoll_native_recvmsg0; + netty_jni_util_free_dynamic_name(&dynamicTypeName); + return dynamicMethods; error: free(dynamicTypeName); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java index d9c373c4f9..2f1e47d896 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java @@ -45,6 +45,7 @@ public final class EpollChannelOption extends UnixChannelOption { public static final ChannelOption SO_BUSY_POLL = valueOf(EpollChannelOption.class, "SO_BUSY_POLL"); public static final ChannelOption> TCP_MD5SIG = valueOf("TCP_MD5SIG"); public static final ChannelOption MAX_DATAGRAM_PAYLOAD_SIZE = valueOf("MAX_DATAGRAM_PAYLOAD_SIZE"); + public static final ChannelOption UDP_GRO = valueOf("UDP_GRO"); @SuppressWarnings({ "unused", "deprecation" }) private EpollChannelOption() { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index e8d3fe79c9..127f64d4cd 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -28,10 +28,8 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultAddressedEnvelope; import io.netty.channel.EventLoop; import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.InternetProtocolFamily; -import io.netty.channel.unix.DatagramSocketAddress; import io.netty.channel.unix.Errors; import io.netty.channel.unix.Errors.NativeIoException; import io.netty.channel.unix.Socket; @@ -450,7 +448,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements @Override void epollInReady() { assert eventLoop().inEventLoop(); - DatagramChannelConfig config = config(); + EpollDatagramChannelConfig config = config(); if (shouldBreakEpollInReady(config)) { clearEpollIn0(); return; @@ -467,25 +465,25 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements try { boolean connected = isConnected(); do { - ByteBuf byteBuf = allocHandle.allocate(allocator); final boolean read; int datagramSize = config().getMaxDatagramPayloadSize(); + ByteBuf byteBuf = allocHandle.allocate(allocator); // Only try to use recvmmsg if its really supported by the running system. int numDatagram = Native.IS_SUPPORTING_RECVMMSG ? datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize : 0; - try { if (numDatagram <= 1) { - if (connected) { - read = connectedRead(allocHandle, byteBuf, datagramSize); + if (!connected || config.isUdpGro()) { + read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf); } else { - read = read(allocHandle, byteBuf, datagramSize); + read = connectedRead(allocHandle, byteBuf, datagramSize); } } else { // Try to use scattering reads via recvmmsg(...) syscall. - read = scatteringRead(allocHandle, byteBuf, datagramSize, numDatagram); + read = scatteringRead(allocHandle, cleanDatagramPacketArray(), + byteBuf, datagramSize, numDatagram); } } catch (NativeIoException e) { if (connected) { @@ -569,13 +567,104 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements return e; } - private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, + private static void addDatagramPacketToOut(DatagramPacket packet, + RecyclableArrayList out) { + if (packet instanceof SegmentedDatagramPacket) { + SegmentedDatagramPacket segmentedDatagramPacket = (SegmentedDatagramPacket) packet; + ByteBuf content = segmentedDatagramPacket.content(); + InetSocketAddress recipient = segmentedDatagramPacket.recipient(); + InetSocketAddress sender = segmentedDatagramPacket.sender(); + int segmentSize = segmentedDatagramPacket.segmentSize(); + do { + out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(), + segmentSize)), recipient, sender)); + } while (content.isReadable()); + + segmentedDatagramPacket.release(); + } else { + out.add(packet); + } + } + + private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) { + if (byteBuf != null) { + byteBuf.release(); + } + if (packetList != null) { + for (int i = 0; i < packetList.size(); i++) { + ReferenceCountUtil.release(packetList.get(i)); + } + packetList.recycle(); + } + } + + private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle, + int bytesRead, DatagramPacket packet) { + handle.lastBytesRead(bytesRead); + handle.incMessagesRead(1); + pipeline.fireChannelRead(packet); + } + + private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle, + int bytesRead, RecyclableArrayList packetList) { + int messagesRead = packetList.size(); + handle.lastBytesRead(bytesRead); + handle.incMessagesRead(messagesRead); + for (int i = 0; i < messagesRead; i++) { + pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER)); + } + } + + private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle, + NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException { + RecyclableArrayList datagramPackets = null; + try { + int writable = byteBuf.writableBytes(); + + boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable); + assert added; + + allocHandle.attemptedBytesRead(writable); + + NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0]; + + int bytesReceived = socket.recvmsg(msg); + if (bytesReceived == 0) { + allocHandle.lastBytesRead(-1); + return false; + } + byteBuf.writerIndex(bytesReceived); + InetSocketAddress local = localAddress(); + DatagramPacket packet = msg.newDatagramPacket(byteBuf, local); + if (!(packet instanceof SegmentedDatagramPacket)) { + processPacket(pipeline(), allocHandle, bytesReceived, packet); + byteBuf = null; + } else { + // Its important that we process all received data out of the NativeDatagramPacketArray + // before we call fireChannelRead(...). This is because the user may call flush() + // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again. + datagramPackets = RecyclableArrayList.newInstance(); + addDatagramPacketToOut(packet, datagramPackets); + // null out byteBuf as addDatagramPacketToOut did take ownership of the ByteBuf / packet and transfered + // it into the RecyclableArrayList. + byteBuf = null; + + processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets); + datagramPackets.recycle(); + datagramPackets = null; + } + + return true; + } finally { + releaseAndRecycle(byteBuf, datagramPackets); + } + } + + private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array, ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException { - RecyclableArrayList bufferPackets = null; + RecyclableArrayList datagramPackets = null; try { int offset = byteBuf.writerIndex(); - NativeDatagramPacketArray array = cleanDatagramPacketArray(); - for (int i = 0; i < numDatagram; i++, offset += datagramSize) { if (!array.addWritable(byteBuf, offset, datagramSize)) { break; @@ -597,82 +686,30 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements if (received == 1) { // Single packet fast-path DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local); - allocHandle.lastBytesRead(datagramSize); - allocHandle.incMessagesRead(1); - pipeline().fireChannelRead(packet); - byteBuf = null; - return true; + if (!(packet instanceof SegmentedDatagramPacket)) { + processPacket(pipeline(), allocHandle, datagramSize, packet); + byteBuf = null; + return true; + } } - // Its important that we process all received data out of the NativeDatagramPacketArray // before we call fireChannelRead(...). This is because the user may call flush() // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again. - bufferPackets = RecyclableArrayList.newInstance(); + datagramPackets = RecyclableArrayList.newInstance(); for (int i = 0; i < received; i++) { DatagramPacket packet = packets[i].newDatagramPacket(byteBuf.readRetainedSlice(datagramSize), local); - bufferPackets.add(packet); + addDatagramPacketToOut(packet, datagramPackets); } - - allocHandle.lastBytesRead(bytesReceived); - allocHandle.incMessagesRead(received); - - for (int i = 0; i < received; i++) { - pipeline().fireChannelRead(bufferPackets.set(i, Unpooled.EMPTY_BUFFER)); - } - bufferPackets.recycle(); - bufferPackets = null; - return true; - } finally { - if (byteBuf != null) { - byteBuf.release(); - } - if (bufferPackets != null) { - for (int i = 0; i < bufferPackets.size(); i++) { - ReferenceCountUtil.release(bufferPackets.get(i)); - } - bufferPackets.recycle(); - } - } - } - - private boolean read(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize) - throws IOException { - try { - int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize) - : byteBuf.writableBytes(); - allocHandle.attemptedBytesRead(writable); - int writerIndex = byteBuf.writerIndex(); - final DatagramSocketAddress remoteAddress; - if (byteBuf.hasMemoryAddress()) { - // has a memory address so use optimized call - remoteAddress = socket.recvFromAddress( - byteBuf.memoryAddress(), writerIndex, writerIndex + writable); - } else { - ByteBuffer nioData = byteBuf.internalNioBuffer(writerIndex, writable); - remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit()); - } - - if (remoteAddress == null) { - allocHandle.lastBytesRead(-1); - return false; - } - InetSocketAddress localAddress = remoteAddress.localAddress(); - if (localAddress == null) { - localAddress = localAddress(); - } - int received = remoteAddress.receivedAmount(); - allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ? - received : writable); - byteBuf.writerIndex(writerIndex + received); - allocHandle.incMessagesRead(1); - - pipeline().fireChannelRead(new DatagramPacket(byteBuf, localAddress, remoteAddress)); + // Ass we did use readRetainedSlice(...) before we should now release the byteBuf and null it out. + byteBuf.release(); byteBuf = null; + + processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets); + datagramPackets.recycle(); + datagramPackets = null; return true; } finally { - if (byteBuf != null) { - byteBuf.release(); - } + releaseAndRecycle(byteBuf, datagramPackets); } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java index fed5ae37e9..45ac63d697 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java @@ -17,7 +17,6 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelException; import io.netty.channel.ChannelOption; import io.netty.channel.FixedRecvByteBufAllocator; @@ -52,7 +51,8 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL, ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, EpollChannelOption.SO_REUSEPORT, EpollChannelOption.IP_FREEBIND, EpollChannelOption.IP_TRANSPARENT, - EpollChannelOption.IP_RECVORIGDSTADDR, EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE); + EpollChannelOption.IP_RECVORIGDSTADDR, EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE, + EpollChannelOption.UDP_GRO); } @SuppressWarnings({ "unchecked", "deprecation" }) @@ -103,6 +103,9 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme if (option == EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) { return (T) Integer.valueOf(getMaxDatagramPayloadSize()); } + if (option == EpollChannelOption.UDP_GRO) { + return (T) Boolean.valueOf(isUdpGro()); + } return super.getOption(option); } @@ -141,6 +144,8 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme setIpRecvOrigDestAddr((Boolean) value); } else if (option == EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) { setMaxDatagramPayloadSize((Integer) value); + } else if (option == EpollChannelOption.UDP_GRO) { + setUdpGro((Boolean) value); } else { return super.setOption(option, value); } @@ -522,6 +527,33 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme return maxDatagramSize; } + private volatile boolean gro; + + /** + * Enable / disable UDP_GRO. + * @param gro {@code true} if {@code UDP_GRO} should be enabled, {@code false} otherwise. + * @return this. + */ + public EpollDatagramChannelConfig setUdpGro(boolean gro) { + try { + ((EpollDatagramChannel) channel).socket.setUdpGro(gro); + } catch (IOException e) { + throw new ChannelException(e); + } + this.gro = gro; + return this; + } + + /** + * Returns if {@code UDP_GRO} is enabled. + * @return {@code true} if enabled, {@code false} otherwise. + */ + public boolean isUdpGro() { + // We don't do a syscall here but just return the cached value due a kernel bug: + // https://lore.kernel.org/netdev/20210325195614.800687-1-norman_maurer@apple.com/T/#u + return gro; + } + @Override public EpollDatagramChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) { super.setMaxMessagesPerWrite(maxMessagesPerWrite); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java index 4296d79898..73fd631150 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/LinuxSocket.java @@ -59,6 +59,10 @@ final class LinuxSocket extends Socket { return Native.recvmmsg(intValue(), ipv6, msgs, offset, len); } + int recvmsg(NativeDatagramPacketArray.NativeDatagramPacket msg) throws IOException { + return Native.recvmsg(intValue(), ipv6, msg); + } + void setTimeToLive(int ttl) throws IOException { setTimeToLive(intValue(), ttl); } @@ -286,6 +290,14 @@ final class LinuxSocket extends Socket { setIpMulticastLoop(intValue(), ipv6, loopbackModeDisabled ? 0 : 1); } + boolean isUdpGro() throws IOException { + return isUdpGro(intValue()) != 0; + } + + void setUdpGro(boolean gro) throws IOException { + setUdpGro(intValue(), gro ? 1 : 0); + } + long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException { // Open the file-region as it may be created via the lazy constructor. This is needed as we directly access // the FileChannel field via JNI. @@ -389,4 +401,6 @@ final class LinuxSocket extends Socket { private static native int getIpMulticastLoop(int fd, boolean ipv6) throws IOException; private static native void setIpMulticastLoop(int fd, boolean ipv6, int enabled) throws IOException; private static native void setTimeToLive(int fd, int ttl) throws IOException; + private static native int isUdpGro(int fd) throws IOException; + private static native void setUdpGro(int fd, int gro) throws IOException; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 598572dd14..55c92811c8 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -242,6 +242,17 @@ public final class Native { private static native int recvmmsg0( int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len); + static int recvmsg(int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket packet) throws IOException { + int res = recvmsg0(fd, ipv6, packet); + if (res >= 0) { + return res; + } + return ioResult("recvmsg", res); + } + + private static native int recvmsg0( + int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket msg); + // epoll_event related public static native int sizeofEpollEvent(); public static native int offsetofEpollData(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java index b56f281721..d95af907f9 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java @@ -183,7 +183,7 @@ final class NativeDatagramPacketArray { } } - DatagramPacket newDatagramPacket(ByteBuf buffer, InetSocketAddress localAddress) throws UnknownHostException { + DatagramPacket newDatagramPacket(ByteBuf buffer, InetSocketAddress recipient) throws UnknownHostException { final InetAddress address; if (addrLen == ipv4Bytes.length) { System.arraycopy(addr, 0, ipv4Bytes, 0, addrLen); @@ -191,8 +191,14 @@ final class NativeDatagramPacketArray { } else { address = Inet6Address.getByAddress(null, addr, scopeId); } - return new DatagramPacket(buffer.writerIndex(count), - localAddress, new InetSocketAddress(address, port)); + InetSocketAddress sender = new InetSocketAddress(address, port); + buffer.writerIndex(count); + + // UDP_GRO + if (segmentSize > 0) { + return new SegmentedDatagramPacket(buffer, segmentSize, recipient, sender); + } + return new DatagramPacket(buffer, recipient, sender); } } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java index 7617c3aafc..3be503f185 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java @@ -21,6 +21,8 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOption; +import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.InternetProtocolFamily; @@ -30,12 +32,12 @@ import org.junit.Assume; import org.junit.Test; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; public class EpollDatagramUnicastTest extends DatagramUnicastTest { @@ -57,7 +59,7 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest { } public void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb) throws Throwable { - testSendSegmentedDatagramPacket(sb, cb, false); + testSegmentedDatagramPacket(sb, cb, false, false); } @Test @@ -66,15 +68,37 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest { } public void testSendSegmentedDatagramPacketComposite(Bootstrap sb, Bootstrap cb) throws Throwable { - testSendSegmentedDatagramPacket(sb, cb, true); + testSegmentedDatagramPacket(sb, cb, true, false); } - private void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb, boolean composite) + @Test + public void testSendAndReceiveSegmentedDatagramPacket() throws Throwable { + run(); + } + + public void testSendAndReceiveSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb) throws Throwable { + testSegmentedDatagramPacket(sb, cb, false, true); + } + + @Test + public void testSendAndReceiveSegmentedDatagramPacketComposite() throws Throwable { + run(); + } + + public void testSendAndReceiveSegmentedDatagramPacketComposite(Bootstrap sb, Bootstrap cb) throws Throwable { + testSegmentedDatagramPacket(sb, cb, true, true); + } + + private void testSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb, boolean composite, boolean gro) throws Throwable { if (!(cb.group() instanceof EpollEventLoopGroup)) { // Only supported for the native epoll transport. return; } + if (gro && !(sb.group() instanceof EpollEventLoopGroup)) { + // Only supported for the native epoll transport. + return; + } Assume.assumeTrue(SegmentedDatagramPacket.isSupported()); Channel sc = null; Channel cc = null; @@ -94,6 +118,12 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest { int bufferCapacity = numBuffers * segmentSize; final CountDownLatch latch = new CountDownLatch(numBuffers); AtomicReference errorRef = new AtomicReference(); + if (gro) { + // Enable GRO and also ensure we can read everything with one read as otherwise + // we will drop things on the floor. + sb.option(EpollChannelOption.UDP_GRO, true); + sb.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferCapacity)); + } sc = sb.handler(new SimpleChannelInboundHandler() { @Override public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) { @@ -103,6 +133,9 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest { } }).bind(newSocketAddress()).sync().channel(); + if (sc instanceof EpollDatagramChannel) { + assertEquals(gro, sc.config().getOption(EpollChannelOption.UDP_GRO)); + } InetSocketAddress addr = sendToAddress((InetSocketAddress) sc.localAddress()); final ByteBuf buffer; if (composite) {