Add support for UDP_GRO (#11120)

Motivation:

UDP_GRO can improve performance when reading UDP datagrams. This patch adds support for it.

See https://lwn.net/Articles/768995/

Modifications:

- Add recvmsg(...)
- Add support for UDP_GRO in recvmsg(...) and recvmmsg(...)
- Remove usage of recvfrom(...) and just always use recvmsg(...) or recvmmsg(...) to simplify things
- Refactor some code for sharing
- Add EpollChannelOption.UDP_GRO and the getter / setter in EpollDatagramConfig

Result:

UDP_GRO is supported when the underlying system supports it.
This commit is contained in:
Norman Maurer 2021-03-29 08:51:08 +02:00
parent 50a83ec8a8
commit 02c460be14
9 changed files with 329 additions and 121 deletions

View File

@ -25,9 +25,9 @@
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/udp.h> // SOL_UDP
#include <sys/sendfile.h> #include <sys/sendfile.h>
#include <linux/tcp.h> // TCP_NOTSENT_LOWAT is a linux specific define #include <linux/tcp.h> // TCP_NOTSENT_LOWAT is a linux specific define
#include "netty_epoll_linuxsocket.h" #include "netty_epoll_linuxsocket.h"
#include "netty_unix_errors.h" #include "netty_unix_errors.h"
#include "netty_unix_filedescriptor.h" #include "netty_unix_filedescriptor.h"
@ -57,6 +57,11 @@
#define SO_BUSY_POLL 46 #define SO_BUSY_POLL 46
#endif #endif
// UDP_GRO is defined in linux 5. We define this here so older kernels can compile.
#ifndef UDP_GRO
#define UDP_GRO 104
#endif
static jclass peerCredentialsClass = NULL; static jclass peerCredentialsClass = NULL;
static jmethodID peerCredentialsMethodId = NULL; static jmethodID peerCredentialsMethodId = NULL;
@ -610,6 +615,19 @@ static jobject netty_epoll_linuxsocket_getPeerCredentials(JNIEnv *env, jclass cl
return (*env)->NewObject(env, peerCredentialsClass, peerCredentialsMethodId, credentials.pid, credentials.uid, gids); return (*env)->NewObject(env, peerCredentialsClass, peerCredentialsMethodId, credentials.pid, credentials.uid, gids);
} }
static jint netty_epoll_linuxsocket_isUdpGro(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (netty_unix_socket_getOption(env, fd, SOL_UDP, UDP_GRO, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}
static void netty_epoll_linuxsocket_setUdpGro(JNIEnv* env, jclass clazz, jint fd, jint optval) {
netty_unix_socket_setOption(env, fd, SOL_UDP, UDP_GRO, &optval, sizeof(optval));
}
static jlong netty_epoll_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd, jobject fileRegion, jlong base_off, jlong off, jlong len) { static jlong netty_epoll_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd, jobject fileRegion, jlong base_off, jlong off, jlong len) {
jobject fileChannel = (*env)->GetObjectField(env, fileRegion, fileChannelFieldId); jobject fileChannel = (*env)->GetObjectField(env, fileRegion, fileChannelFieldId);
if (fileChannel == NULL) { if (fileChannel == NULL) {
@ -642,6 +660,7 @@ static jlong netty_epoll_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd
return res; return res;
} }
// JNI Registered Methods End // JNI Registered Methods End
// JNI Method Registration Table Begin // JNI Method Registration Table Begin
@ -682,7 +701,10 @@ static const JNINativeMethod fixed_method_table[] = {
{ "joinGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_joinGroup }, { "joinGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_joinGroup },
{ "joinSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_joinSsmGroup }, { "joinSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_joinSsmGroup },
{ "leaveGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_leaveGroup }, { "leaveGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_leaveGroup },
{ "leaveSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_leaveSsmGroup } { "leaveSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_leaveSsmGroup },
{ "isUdpGro", "(I)I", (void *) netty_epoll_linuxsocket_isUdpGro },
{ "setUdpGro", "(II)V", (void *) netty_epoll_linuxsocket_setUdpGro }
// "sendFile" has a dynamic signature // "sendFile" has a dynamic signature
}; };

View File

@ -71,6 +71,12 @@
#define UDP_SEGMENT 103 #define UDP_SEGMENT 103
#endif #endif
// UDP_GRO is defined in linux 5. We define this here so older kernels can compile.
#ifndef UDP_GRO
#define UDP_GRO 104
#endif
// optional // optional
extern int epoll_create1(int flags) __attribute__((weak)); extern int epoll_create1(int flags) __attribute__((weak));
@ -373,6 +379,73 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
return (jint) res; return (jint) res;
} }
static void init_packet(JNIEnv* env, jobject packet, struct msghdr* msg, int len) {
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId);
(*env)->SetIntField(env, packet, packetCountFieldId, len);
struct sockaddr_storage* addr = (struct sockaddr_storage*) msg->msg_name;
if (addr->ss_family == AF_INET) {
struct sockaddr_in* ipaddr = (struct sockaddr_in*) addr;
(*env)->SetByteArrayRegion(env, address, 0, 4, (jbyte*) &ipaddr->sin_addr.s_addr);
(*env)->SetIntField(env, packet, packetAddrLenFieldId, 4);
(*env)->SetIntField(env, packet, packetScopeIdFieldId, 0);
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ipaddr->sin_port));
} else {
int addrLen = netty_unix_socket_ipAddressLength(addr);
struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr;
if (addrLen == 4) {
// IPV4 mapped IPV6 address
jbyte* addr = (jbyte*) &ip6addr->sin6_addr.s6_addr;
(*env)->SetByteArrayRegion(env, address, 0, 4, addr + 12);
} else {
(*env)->SetByteArrayRegion(env, address, 0, 16, (jbyte*) &ip6addr->sin6_addr.s6_addr);
}
(*env)->SetIntField(env, packet, packetAddrLenFieldId, addrLen);
(*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id);
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port));
}
struct cmsghdr *cmsg = NULL;
uint16_t gso_size = 0;
uint16_t *gsosizeptr = NULL;
for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
if (cmsg->cmsg_level == SOL_UDP && cmsg->cmsg_type == UDP_GRO) {
gsosizeptr = (uint16_t *) CMSG_DATA(cmsg);
gso_size = *gsosizeptr;
break;
}
}
(*env)->SetIntField(env, packet, packetSegmentSizeFieldId, gso_size);
}
static jint netty_epoll_native_recvmsg0(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject packet) {
struct msghdr msg = { 0 };
struct sockaddr_storage sock_address;
int addrSize = sizeof(sock_address);
// Enough space for GRO
char control[CMSG_SPACE(sizeof(uint16_t))] = { 0 };
msg.msg_name = &sock_address;
msg.msg_namelen = (socklen_t) addrSize;
msg.msg_iov = (struct iovec*) (intptr_t) (*env)->GetLongField(env, packet, packetMemoryAddressFieldId);
msg.msg_iovlen = (*env)->GetIntField(env, packet, packetCountFieldId);
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
ssize_t res;
int err;
do {
res = recvmsg(fd, &msg, 0);
// keep on reading if it was interrupted
} while (res == -1 && ((err = errno) == EINTR));
if (res < 0) {
return -err;
}
init_packet(env, packet, &msg, res);
return (jint) res;
}
static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobjectArray packets, jint offset, jint len) { static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobjectArray packets, jint offset, jint len) {
struct mmsghdr msg[len]; struct mmsghdr msg[len];
memset(msg, 0, sizeof(msg)); memset(msg, 0, sizeof(msg));
@ -406,36 +479,7 @@ static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
for (i = 0; i < res; i++) { for (i = 0; i < res; i++) {
jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset); jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset);
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId); init_packet(env, packet, &msg[i].msg_hdr, msg[i].msg_len);
(*env)->SetIntField(env, packet, packetCountFieldId, msg[i].msg_len);
struct sockaddr_storage* addr = (struct sockaddr_storage*) msg[i].msg_hdr.msg_name;
if (addr->ss_family == AF_INET) {
struct sockaddr_in* ipaddr = (struct sockaddr_in*) addr;
(*env)->SetByteArrayRegion(env, address, 0, 4, (jbyte*) &ipaddr->sin_addr.s_addr);
(*env)->SetIntField(env, packet, packetAddrLenFieldId, 4);
(*env)->SetIntField(env, packet, packetScopeIdFieldId, 0);
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ipaddr->sin_port));
} else {
int addrLen = netty_unix_socket_ipAddressLength(addr);
struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr;
if (addrLen == 4) {
// IPV4 mapped IPV6 address
jbyte* addr = (jbyte*) &ip6addr->sin6_addr.s6_addr;
(*env)->SetByteArrayRegion(env, address, 0, 4, addr + 12);
} else {
(*env)->SetByteArrayRegion(env, address, 0, 16, (jbyte*) &ip6addr->sin6_addr.s6_addr);
}
(*env)->SetIntField(env, packet, packetAddrLenFieldId, addrLen);
(*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id);
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port));
}
// TODO: Support this also for recvmmsg
(*env)->SetIntField(env, packet, packetSegmentSizeFieldId, 0);
} }
return (jint) res; return (jint) res;
@ -451,6 +495,7 @@ static jstring netty_epoll_native_kernelVersion(JNIEnv* env, jclass clazz) {
netty_unix_errors_throwRuntimeExceptionErrorNo(env, "uname() failed: ", errno); netty_unix_errors_throwRuntimeExceptionErrorNo(env, "uname() failed: ", errno);
return NULL; return NULL;
} }
static jboolean netty_epoll_native_isSupportingSendmmsg(JNIEnv* env, jclass clazz) { static jboolean netty_epoll_native_isSupportingSendmmsg(JNIEnv* env, jclass clazz) {
if (SYS_sendmmsg == -1) { if (SYS_sendmmsg == -1) {
return JNI_FALSE; return JNI_FALSE;
@ -575,7 +620,7 @@ static const JNINativeMethod fixed_method_table[] = {
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);
static jint dynamicMethodsTableSize() { static jint dynamicMethodsTableSize() {
return fixed_method_table_size + 2; // 2 is for the dynamic method signatures. return fixed_method_table_size + 3; // 3 is for the dynamic method signatures.
} }
static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) { static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) {
@ -602,6 +647,13 @@ static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) {
dynamicMethod->fnPtr = (void *) netty_epoll_native_recvmmsg0; dynamicMethod->fnPtr = (void *) netty_epoll_native_recvmmsg0;
netty_jni_util_free_dynamic_name(&dynamicTypeName); netty_jni_util_free_dynamic_name(&dynamicTypeName);
++dynamicMethod;
NETTY_JNI_UTIL_PREPEND(packagePrefix, "io/netty/channel/epoll/NativeDatagramPacketArray$NativeDatagramPacket;)I", dynamicTypeName, error);
NETTY_JNI_UTIL_PREPEND("(IZL", dynamicTypeName, dynamicMethod->signature, error);
dynamicMethod->name = "recvmsg0";
dynamicMethod->fnPtr = (void *) netty_epoll_native_recvmsg0;
netty_jni_util_free_dynamic_name(&dynamicTypeName);
return dynamicMethods; return dynamicMethods;
error: error:
free(dynamicTypeName); free(dynamicTypeName);

View File

@ -45,6 +45,7 @@ public final class EpollChannelOption<T> extends UnixChannelOption<T> {
public static final ChannelOption<Integer> SO_BUSY_POLL = valueOf(EpollChannelOption.class, "SO_BUSY_POLL"); public static final ChannelOption<Integer> SO_BUSY_POLL = valueOf(EpollChannelOption.class, "SO_BUSY_POLL");
public static final ChannelOption<Map<InetAddress, byte[]>> TCP_MD5SIG = valueOf("TCP_MD5SIG"); public static final ChannelOption<Map<InetAddress, byte[]>> TCP_MD5SIG = valueOf("TCP_MD5SIG");
public static final ChannelOption<Integer> MAX_DATAGRAM_PAYLOAD_SIZE = valueOf("MAX_DATAGRAM_PAYLOAD_SIZE"); public static final ChannelOption<Integer> MAX_DATAGRAM_PAYLOAD_SIZE = valueOf("MAX_DATAGRAM_PAYLOAD_SIZE");
public static final ChannelOption<Boolean> UDP_GRO = valueOf("UDP_GRO");
@SuppressWarnings({ "unused", "deprecation" }) @SuppressWarnings({ "unused", "deprecation" })
private EpollChannelOption() { private EpollChannelOption() {

View File

@ -28,10 +28,8 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultAddressedEnvelope; import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.unix.DatagramSocketAddress;
import io.netty.channel.unix.Errors; import io.netty.channel.unix.Errors;
import io.netty.channel.unix.Errors.NativeIoException; import io.netty.channel.unix.Errors.NativeIoException;
import io.netty.channel.unix.Socket; import io.netty.channel.unix.Socket;
@ -450,7 +448,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
@Override @Override
void epollInReady() { void epollInReady() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
DatagramChannelConfig config = config(); EpollDatagramChannelConfig config = config();
if (shouldBreakEpollInReady(config)) { if (shouldBreakEpollInReady(config)) {
clearEpollIn0(); clearEpollIn0();
return; return;
@ -467,25 +465,25 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
try { try {
boolean connected = isConnected(); boolean connected = isConnected();
do { do {
ByteBuf byteBuf = allocHandle.allocate(allocator);
final boolean read; final boolean read;
int datagramSize = config().getMaxDatagramPayloadSize(); int datagramSize = config().getMaxDatagramPayloadSize();
ByteBuf byteBuf = allocHandle.allocate(allocator);
// Only try to use recvmmsg if its really supported by the running system. // Only try to use recvmmsg if its really supported by the running system.
int numDatagram = Native.IS_SUPPORTING_RECVMMSG ? int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize : datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize :
0; 0;
try { try {
if (numDatagram <= 1) { if (numDatagram <= 1) {
if (connected) { if (!connected || config.isUdpGro()) {
read = connectedRead(allocHandle, byteBuf, datagramSize); read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf);
} else { } else {
read = read(allocHandle, byteBuf, datagramSize); read = connectedRead(allocHandle, byteBuf, datagramSize);
} }
} else { } else {
// Try to use scattering reads via recvmmsg(...) syscall. // Try to use scattering reads via recvmmsg(...) syscall.
read = scatteringRead(allocHandle, byteBuf, datagramSize, numDatagram); read = scatteringRead(allocHandle, cleanDatagramPacketArray(),
byteBuf, datagramSize, numDatagram);
} }
} catch (NativeIoException e) { } catch (NativeIoException e) {
if (connected) { if (connected) {
@ -569,13 +567,104 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
return e; return e;
} }
private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, private static void addDatagramPacketToOut(DatagramPacket packet,
RecyclableArrayList out) {
if (packet instanceof SegmentedDatagramPacket) {
SegmentedDatagramPacket segmentedDatagramPacket = (SegmentedDatagramPacket) packet;
ByteBuf content = segmentedDatagramPacket.content();
InetSocketAddress recipient = segmentedDatagramPacket.recipient();
InetSocketAddress sender = segmentedDatagramPacket.sender();
int segmentSize = segmentedDatagramPacket.segmentSize();
do {
out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(),
segmentSize)), recipient, sender));
} while (content.isReadable());
segmentedDatagramPacket.release();
} else {
out.add(packet);
}
}
private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) {
if (byteBuf != null) {
byteBuf.release();
}
if (packetList != null) {
for (int i = 0; i < packetList.size(); i++) {
ReferenceCountUtil.release(packetList.get(i));
}
packetList.recycle();
}
}
private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
int bytesRead, DatagramPacket packet) {
handle.lastBytesRead(bytesRead);
handle.incMessagesRead(1);
pipeline.fireChannelRead(packet);
}
private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
int bytesRead, RecyclableArrayList packetList) {
int messagesRead = packetList.size();
handle.lastBytesRead(bytesRead);
handle.incMessagesRead(messagesRead);
for (int i = 0; i < messagesRead; i++) {
pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER));
}
}
private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle,
NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException {
RecyclableArrayList datagramPackets = null;
try {
int writable = byteBuf.writableBytes();
boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable);
assert added;
allocHandle.attemptedBytesRead(writable);
NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
int bytesReceived = socket.recvmsg(msg);
if (bytesReceived == 0) {
allocHandle.lastBytesRead(-1);
return false;
}
byteBuf.writerIndex(bytesReceived);
InetSocketAddress local = localAddress();
DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
if (!(packet instanceof SegmentedDatagramPacket)) {
processPacket(pipeline(), allocHandle, bytesReceived, packet);
byteBuf = null;
} else {
// Its important that we process all received data out of the NativeDatagramPacketArray
// before we call fireChannelRead(...). This is because the user may call flush()
// in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
datagramPackets = RecyclableArrayList.newInstance();
addDatagramPacketToOut(packet, datagramPackets);
// null out byteBuf as addDatagramPacketToOut did take ownership of the ByteBuf / packet and transfered
// it into the RecyclableArrayList.
byteBuf = null;
processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
datagramPackets.recycle();
datagramPackets = null;
}
return true;
} finally {
releaseAndRecycle(byteBuf, datagramPackets);
}
}
private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array,
ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException { ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
RecyclableArrayList bufferPackets = null; RecyclableArrayList datagramPackets = null;
try { try {
int offset = byteBuf.writerIndex(); int offset = byteBuf.writerIndex();
NativeDatagramPacketArray array = cleanDatagramPacketArray();
for (int i = 0; i < numDatagram; i++, offset += datagramSize) { for (int i = 0; i < numDatagram; i++, offset += datagramSize) {
if (!array.addWritable(byteBuf, offset, datagramSize)) { if (!array.addWritable(byteBuf, offset, datagramSize)) {
break; break;
@ -597,82 +686,30 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
if (received == 1) { if (received == 1) {
// Single packet fast-path // Single packet fast-path
DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local); DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
allocHandle.lastBytesRead(datagramSize); if (!(packet instanceof SegmentedDatagramPacket)) {
allocHandle.incMessagesRead(1); processPacket(pipeline(), allocHandle, datagramSize, packet);
pipeline().fireChannelRead(packet);
byteBuf = null; byteBuf = null;
return true; return true;
} }
}
// Its important that we process all received data out of the NativeDatagramPacketArray // Its important that we process all received data out of the NativeDatagramPacketArray
// before we call fireChannelRead(...). This is because the user may call flush() // before we call fireChannelRead(...). This is because the user may call flush()
// in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again. // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
bufferPackets = RecyclableArrayList.newInstance(); datagramPackets = RecyclableArrayList.newInstance();
for (int i = 0; i < received; i++) { for (int i = 0; i < received; i++) {
DatagramPacket packet = packets[i].newDatagramPacket(byteBuf.readRetainedSlice(datagramSize), local); DatagramPacket packet = packets[i].newDatagramPacket(byteBuf.readRetainedSlice(datagramSize), local);
bufferPackets.add(packet); addDatagramPacketToOut(packet, datagramPackets);
} }
// Ass we did use readRetainedSlice(...) before we should now release the byteBuf and null it out.
allocHandle.lastBytesRead(bytesReceived);
allocHandle.incMessagesRead(received);
for (int i = 0; i < received; i++) {
pipeline().fireChannelRead(bufferPackets.set(i, Unpooled.EMPTY_BUFFER));
}
bufferPackets.recycle();
bufferPackets = null;
return true;
} finally {
if (byteBuf != null) {
byteBuf.release(); byteBuf.release();
}
if (bufferPackets != null) {
for (int i = 0; i < bufferPackets.size(); i++) {
ReferenceCountUtil.release(bufferPackets.get(i));
}
bufferPackets.recycle();
}
}
}
private boolean read(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
throws IOException {
try {
int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
: byteBuf.writableBytes();
allocHandle.attemptedBytesRead(writable);
int writerIndex = byteBuf.writerIndex();
final DatagramSocketAddress remoteAddress;
if (byteBuf.hasMemoryAddress()) {
// has a memory address so use optimized call
remoteAddress = socket.recvFromAddress(
byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
} else {
ByteBuffer nioData = byteBuf.internalNioBuffer(writerIndex, writable);
remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
}
if (remoteAddress == null) {
allocHandle.lastBytesRead(-1);
return false;
}
InetSocketAddress localAddress = remoteAddress.localAddress();
if (localAddress == null) {
localAddress = localAddress();
}
int received = remoteAddress.receivedAmount();
allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
received : writable);
byteBuf.writerIndex(writerIndex + received);
allocHandle.incMessagesRead(1);
pipeline().fireChannelRead(new DatagramPacket(byteBuf, localAddress, remoteAddress));
byteBuf = null; byteBuf = null;
processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
datagramPackets.recycle();
datagramPackets = null;
return true; return true;
} finally { } finally {
if (byteBuf != null) { releaseAndRecycle(byteBuf, datagramPackets);
byteBuf.release();
}
} }
} }

View File

@ -17,7 +17,6 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.FixedRecvByteBufAllocator;
@ -52,7 +51,8 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL, ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL,
ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION,
EpollChannelOption.SO_REUSEPORT, EpollChannelOption.IP_FREEBIND, EpollChannelOption.IP_TRANSPARENT, EpollChannelOption.SO_REUSEPORT, EpollChannelOption.IP_FREEBIND, EpollChannelOption.IP_TRANSPARENT,
EpollChannelOption.IP_RECVORIGDSTADDR, EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE); EpollChannelOption.IP_RECVORIGDSTADDR, EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE,
EpollChannelOption.UDP_GRO);
} }
@SuppressWarnings({ "unchecked", "deprecation" }) @SuppressWarnings({ "unchecked", "deprecation" })
@ -103,6 +103,9 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
if (option == EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) { if (option == EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) {
return (T) Integer.valueOf(getMaxDatagramPayloadSize()); return (T) Integer.valueOf(getMaxDatagramPayloadSize());
} }
if (option == EpollChannelOption.UDP_GRO) {
return (T) Boolean.valueOf(isUdpGro());
}
return super.getOption(option); return super.getOption(option);
} }
@ -141,6 +144,8 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
setIpRecvOrigDestAddr((Boolean) value); setIpRecvOrigDestAddr((Boolean) value);
} else if (option == EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) { } else if (option == EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) {
setMaxDatagramPayloadSize((Integer) value); setMaxDatagramPayloadSize((Integer) value);
} else if (option == EpollChannelOption.UDP_GRO) {
setUdpGro((Boolean) value);
} else { } else {
return super.setOption(option, value); return super.setOption(option, value);
} }
@ -522,6 +527,33 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
return maxDatagramSize; return maxDatagramSize;
} }
private volatile boolean gro;
/**
* Enable / disable <a href="https://lwn.net/Articles/768995/">UDP_GRO</a>.
* @param gro {@code true} if {@code UDP_GRO} should be enabled, {@code false} otherwise.
* @return this.
*/
public EpollDatagramChannelConfig setUdpGro(boolean gro) {
try {
((EpollDatagramChannel) channel).socket.setUdpGro(gro);
} catch (IOException e) {
throw new ChannelException(e);
}
this.gro = gro;
return this;
}
/**
* Returns if {@code UDP_GRO} is enabled.
* @return {@code true} if enabled, {@code false} otherwise.
*/
public boolean isUdpGro() {
// We don't do a syscall here but just return the cached value due a kernel bug:
// https://lore.kernel.org/netdev/20210325195614.800687-1-norman_maurer@apple.com/T/#u
return gro;
}
@Override @Override
public EpollDatagramChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) { public EpollDatagramChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) {
super.setMaxMessagesPerWrite(maxMessagesPerWrite); super.setMaxMessagesPerWrite(maxMessagesPerWrite);

View File

@ -59,6 +59,10 @@ final class LinuxSocket extends Socket {
return Native.recvmmsg(intValue(), ipv6, msgs, offset, len); return Native.recvmmsg(intValue(), ipv6, msgs, offset, len);
} }
int recvmsg(NativeDatagramPacketArray.NativeDatagramPacket msg) throws IOException {
return Native.recvmsg(intValue(), ipv6, msg);
}
void setTimeToLive(int ttl) throws IOException { void setTimeToLive(int ttl) throws IOException {
setTimeToLive(intValue(), ttl); setTimeToLive(intValue(), ttl);
} }
@ -286,6 +290,14 @@ final class LinuxSocket extends Socket {
setIpMulticastLoop(intValue(), ipv6, loopbackModeDisabled ? 0 : 1); setIpMulticastLoop(intValue(), ipv6, loopbackModeDisabled ? 0 : 1);
} }
boolean isUdpGro() throws IOException {
return isUdpGro(intValue()) != 0;
}
void setUdpGro(boolean gro) throws IOException {
setUdpGro(intValue(), gro ? 1 : 0);
}
long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException { long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException {
// Open the file-region as it may be created via the lazy constructor. This is needed as we directly access // Open the file-region as it may be created via the lazy constructor. This is needed as we directly access
// the FileChannel field via JNI. // the FileChannel field via JNI.
@ -389,4 +401,6 @@ final class LinuxSocket extends Socket {
private static native int getIpMulticastLoop(int fd, boolean ipv6) throws IOException; private static native int getIpMulticastLoop(int fd, boolean ipv6) throws IOException;
private static native void setIpMulticastLoop(int fd, boolean ipv6, int enabled) throws IOException; private static native void setIpMulticastLoop(int fd, boolean ipv6, int enabled) throws IOException;
private static native void setTimeToLive(int fd, int ttl) throws IOException; private static native void setTimeToLive(int fd, int ttl) throws IOException;
private static native int isUdpGro(int fd) throws IOException;
private static native void setUdpGro(int fd, int gro) throws IOException;
} }

View File

@ -242,6 +242,17 @@ public final class Native {
private static native int recvmmsg0( private static native int recvmmsg0(
int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len); int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len);
static int recvmsg(int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket packet) throws IOException {
int res = recvmsg0(fd, ipv6, packet);
if (res >= 0) {
return res;
}
return ioResult("recvmsg", res);
}
private static native int recvmsg0(
int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket msg);
// epoll_event related // epoll_event related
public static native int sizeofEpollEvent(); public static native int sizeofEpollEvent();
public static native int offsetofEpollData(); public static native int offsetofEpollData();

View File

@ -183,7 +183,7 @@ final class NativeDatagramPacketArray {
} }
} }
DatagramPacket newDatagramPacket(ByteBuf buffer, InetSocketAddress localAddress) throws UnknownHostException { DatagramPacket newDatagramPacket(ByteBuf buffer, InetSocketAddress recipient) throws UnknownHostException {
final InetAddress address; final InetAddress address;
if (addrLen == ipv4Bytes.length) { if (addrLen == ipv4Bytes.length) {
System.arraycopy(addr, 0, ipv4Bytes, 0, addrLen); System.arraycopy(addr, 0, ipv4Bytes, 0, addrLen);
@ -191,8 +191,14 @@ final class NativeDatagramPacketArray {
} else { } else {
address = Inet6Address.getByAddress(null, addr, scopeId); address = Inet6Address.getByAddress(null, addr, scopeId);
} }
return new DatagramPacket(buffer.writerIndex(count), InetSocketAddress sender = new InetSocketAddress(address, port);
localAddress, new InetSocketAddress(address, port)); buffer.writerIndex(count);
// UDP_GRO
if (segmentSize > 0) {
return new SegmentedDatagramPacket(buffer, segmentSize, recipient, sender);
}
return new DatagramPacket(buffer, recipient, sender);
} }
} }
} }

View File

@ -21,6 +21,8 @@ import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.InternetProtocolFamily;
@ -30,12 +32,12 @@ import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
public class EpollDatagramUnicastTest extends DatagramUnicastTest { public class EpollDatagramUnicastTest extends DatagramUnicastTest {
@ -57,7 +59,7 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
} }
public void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb) throws Throwable { public void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb) throws Throwable {
testSendSegmentedDatagramPacket(sb, cb, false); testSegmentedDatagramPacket(sb, cb, false, false);
} }
@Test @Test
@ -66,15 +68,37 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
} }
public void testSendSegmentedDatagramPacketComposite(Bootstrap sb, Bootstrap cb) throws Throwable { public void testSendSegmentedDatagramPacketComposite(Bootstrap sb, Bootstrap cb) throws Throwable {
testSendSegmentedDatagramPacket(sb, cb, true); testSegmentedDatagramPacket(sb, cb, true, false);
} }
private void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb, boolean composite) @Test
public void testSendAndReceiveSegmentedDatagramPacket() throws Throwable {
run();
}
public void testSendAndReceiveSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb) throws Throwable {
testSegmentedDatagramPacket(sb, cb, false, true);
}
@Test
public void testSendAndReceiveSegmentedDatagramPacketComposite() throws Throwable {
run();
}
public void testSendAndReceiveSegmentedDatagramPacketComposite(Bootstrap sb, Bootstrap cb) throws Throwable {
testSegmentedDatagramPacket(sb, cb, true, true);
}
private void testSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb, boolean composite, boolean gro)
throws Throwable { throws Throwable {
if (!(cb.group() instanceof EpollEventLoopGroup)) { if (!(cb.group() instanceof EpollEventLoopGroup)) {
// Only supported for the native epoll transport. // Only supported for the native epoll transport.
return; return;
} }
if (gro && !(sb.group() instanceof EpollEventLoopGroup)) {
// Only supported for the native epoll transport.
return;
}
Assume.assumeTrue(SegmentedDatagramPacket.isSupported()); Assume.assumeTrue(SegmentedDatagramPacket.isSupported());
Channel sc = null; Channel sc = null;
Channel cc = null; Channel cc = null;
@ -94,6 +118,12 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
int bufferCapacity = numBuffers * segmentSize; int bufferCapacity = numBuffers * segmentSize;
final CountDownLatch latch = new CountDownLatch(numBuffers); final CountDownLatch latch = new CountDownLatch(numBuffers);
AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>(); AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
if (gro) {
// Enable GRO and also ensure we can read everything with one read as otherwise
// we will drop things on the floor.
sb.option(EpollChannelOption.UDP_GRO, true);
sb.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferCapacity));
}
sc = sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() { sc = sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) { public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) {
@ -103,6 +133,9 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
} }
}).bind(newSocketAddress()).sync().channel(); }).bind(newSocketAddress()).sync().channel();
if (sc instanceof EpollDatagramChannel) {
assertEquals(gro, sc.config().getOption(EpollChannelOption.UDP_GRO));
}
InetSocketAddress addr = sendToAddress((InetSocketAddress) sc.localAddress()); InetSocketAddress addr = sendToAddress((InetSocketAddress) sc.localAddress());
final ByteBuf buffer; final ByteBuf buffer;
if (composite) { if (composite) {