Add support for UDP_GRO (#11120)
Motivation: UDP_GRO can improve performance when reading UDP datagrams. This patch adds support for it. See https://lwn.net/Articles/768995/ Modifications: - Add recvmsg(...) - Add support for UDP_GRO in recvmsg(...) and recvmmsg(...) - Remove usage of recvfrom(...) and just always use recvmsg(...) or recvmmsg(...) to simplify things - Refactor some code for sharing - Add EpollChannelOption.UDP_GRO and the getter / setter in EpollDatagramConfig Result: UDP_GRO is supported when the underlying system supports it.
This commit is contained in:
parent
cc2b443150
commit
b05fdf3ff8
@ -25,9 +25,9 @@
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
#include <netinet/in.h>
|
||||
#include <netinet/udp.h> // SOL_UDP
|
||||
#include <sys/sendfile.h>
|
||||
#include <linux/tcp.h> // TCP_NOTSENT_LOWAT is a linux specific define
|
||||
|
||||
#include "netty_epoll_linuxsocket.h"
|
||||
#include "netty_unix_errors.h"
|
||||
#include "netty_unix_filedescriptor.h"
|
||||
@ -57,6 +57,11 @@
|
||||
#define SO_BUSY_POLL 46
|
||||
#endif
|
||||
|
||||
// UDP_GRO is defined in linux 5. We define this here so older kernels can compile.
|
||||
#ifndef UDP_GRO
|
||||
#define UDP_GRO 104
|
||||
#endif
|
||||
|
||||
static jclass peerCredentialsClass = NULL;
|
||||
static jmethodID peerCredentialsMethodId = NULL;
|
||||
|
||||
@ -610,6 +615,19 @@ static jobject netty_epoll_linuxsocket_getPeerCredentials(JNIEnv *env, jclass cl
|
||||
return (*env)->NewObject(env, peerCredentialsClass, peerCredentialsMethodId, credentials.pid, credentials.uid, gids);
|
||||
}
|
||||
|
||||
static jint netty_epoll_linuxsocket_isUdpGro(JNIEnv* env, jclass clazz, jint fd) {
|
||||
int optval;
|
||||
if (netty_unix_socket_getOption(env, fd, SOL_UDP, UDP_GRO, &optval, sizeof(optval)) == -1) {
|
||||
return -1;
|
||||
}
|
||||
return optval;
|
||||
}
|
||||
|
||||
static void netty_epoll_linuxsocket_setUdpGro(JNIEnv* env, jclass clazz, jint fd, jint optval) {
|
||||
netty_unix_socket_setOption(env, fd, SOL_UDP, UDP_GRO, &optval, sizeof(optval));
|
||||
}
|
||||
|
||||
|
||||
static jlong netty_epoll_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd, jobject fileRegion, jlong base_off, jlong off, jlong len) {
|
||||
jobject fileChannel = (*env)->GetObjectField(env, fileRegion, fileChannelFieldId);
|
||||
if (fileChannel == NULL) {
|
||||
@ -642,6 +660,7 @@ static jlong netty_epoll_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
// JNI Registered Methods End
|
||||
|
||||
// JNI Method Registration Table Begin
|
||||
@ -682,7 +701,10 @@ static const JNINativeMethod fixed_method_table[] = {
|
||||
{ "joinGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_joinGroup },
|
||||
{ "joinSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_joinSsmGroup },
|
||||
{ "leaveGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_leaveGroup },
|
||||
{ "leaveSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_leaveSsmGroup }
|
||||
{ "leaveSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_leaveSsmGroup },
|
||||
{ "isUdpGro", "(I)I", (void *) netty_epoll_linuxsocket_isUdpGro },
|
||||
{ "setUdpGro", "(II)V", (void *) netty_epoll_linuxsocket_setUdpGro }
|
||||
|
||||
// "sendFile" has a dynamic signature
|
||||
};
|
||||
|
||||
|
@ -71,6 +71,12 @@
|
||||
#define UDP_SEGMENT 103
|
||||
#endif
|
||||
|
||||
// UDP_GRO is defined in linux 5. We define this here so older kernels can compile.
|
||||
#ifndef UDP_GRO
|
||||
#define UDP_GRO 104
|
||||
#endif
|
||||
|
||||
|
||||
// optional
|
||||
extern int epoll_create1(int flags) __attribute__((weak));
|
||||
|
||||
@ -379,6 +385,73 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
|
||||
return (jint) res;
|
||||
}
|
||||
|
||||
static void init_packet(JNIEnv* env, jobject packet, struct msghdr* msg, int len) {
|
||||
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId);
|
||||
|
||||
(*env)->SetIntField(env, packet, packetCountFieldId, len);
|
||||
|
||||
struct sockaddr_storage* addr = (struct sockaddr_storage*) msg->msg_name;
|
||||
|
||||
if (addr->ss_family == AF_INET) {
|
||||
struct sockaddr_in* ipaddr = (struct sockaddr_in*) addr;
|
||||
|
||||
(*env)->SetByteArrayRegion(env, address, 0, 4, (jbyte*) &ipaddr->sin_addr.s_addr);
|
||||
(*env)->SetIntField(env, packet, packetAddrLenFieldId, 4);
|
||||
(*env)->SetIntField(env, packet, packetScopeIdFieldId, 0);
|
||||
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ipaddr->sin_port));
|
||||
} else {
|
||||
int addrLen = netty_unix_socket_ipAddressLength(addr);
|
||||
struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr;
|
||||
|
||||
if (addrLen == 4) {
|
||||
// IPV4 mapped IPV6 address
|
||||
jbyte* addr = (jbyte*) &ip6addr->sin6_addr.s6_addr;
|
||||
(*env)->SetByteArrayRegion(env, address, 0, 4, addr + 12);
|
||||
} else {
|
||||
(*env)->SetByteArrayRegion(env, address, 0, 16, (jbyte*) &ip6addr->sin6_addr.s6_addr);
|
||||
}
|
||||
(*env)->SetIntField(env, packet, packetAddrLenFieldId, addrLen);
|
||||
(*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id);
|
||||
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port));
|
||||
}
|
||||
struct cmsghdr *cmsg = NULL;
|
||||
uint16_t gso_size = 0;
|
||||
uint16_t *gsosizeptr = NULL;
|
||||
for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
|
||||
if (cmsg->cmsg_level == SOL_UDP && cmsg->cmsg_type == UDP_GRO) {
|
||||
gsosizeptr = (uint16_t *) CMSG_DATA(cmsg);
|
||||
gso_size = *gsosizeptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
(*env)->SetIntField(env, packet, packetSegmentSizeFieldId, gso_size);
|
||||
}
|
||||
|
||||
static jint netty_epoll_native_recvmsg0(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject packet) {
|
||||
struct msghdr msg = { 0 };
|
||||
struct sockaddr_storage sock_address;
|
||||
int addrSize = sizeof(sock_address);
|
||||
// Enough space for GRO
|
||||
char control[CMSG_SPACE(sizeof(uint16_t))] = { 0 };
|
||||
msg.msg_name = &sock_address;
|
||||
msg.msg_namelen = (socklen_t) addrSize;
|
||||
msg.msg_iov = (struct iovec*) (intptr_t) (*env)->GetLongField(env, packet, packetMemoryAddressFieldId);
|
||||
msg.msg_iovlen = (*env)->GetIntField(env, packet, packetCountFieldId);
|
||||
msg.msg_control = control;
|
||||
msg.msg_controllen = sizeof(control);
|
||||
ssize_t res;
|
||||
int err;
|
||||
do {
|
||||
res = recvmsg(fd, &msg, 0);
|
||||
// keep on reading if it was interrupted
|
||||
} while (res == -1 && ((err = errno) == EINTR));
|
||||
if (res < 0) {
|
||||
return -err;
|
||||
}
|
||||
init_packet(env, packet, &msg, res);
|
||||
return (jint) res;
|
||||
}
|
||||
|
||||
static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobjectArray packets, jint offset, jint len) {
|
||||
struct mmsghdr msg[len];
|
||||
memset(msg, 0, sizeof(msg));
|
||||
@ -412,36 +485,7 @@ static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
|
||||
|
||||
for (i = 0; i < res; i++) {
|
||||
jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset);
|
||||
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId);
|
||||
|
||||
(*env)->SetIntField(env, packet, packetCountFieldId, msg[i].msg_len);
|
||||
|
||||
struct sockaddr_storage* addr = (struct sockaddr_storage*) msg[i].msg_hdr.msg_name;
|
||||
|
||||
if (addr->ss_family == AF_INET) {
|
||||
struct sockaddr_in* ipaddr = (struct sockaddr_in*) addr;
|
||||
|
||||
(*env)->SetByteArrayRegion(env, address, 0, 4, (jbyte*) &ipaddr->sin_addr.s_addr);
|
||||
(*env)->SetIntField(env, packet, packetAddrLenFieldId, 4);
|
||||
(*env)->SetIntField(env, packet, packetScopeIdFieldId, 0);
|
||||
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ipaddr->sin_port));
|
||||
} else {
|
||||
int addrLen = netty_unix_socket_ipAddressLength(addr);
|
||||
struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr;
|
||||
|
||||
if (addrLen == 4) {
|
||||
// IPV4 mapped IPV6 address
|
||||
jbyte* addr = (jbyte*) &ip6addr->sin6_addr.s6_addr;
|
||||
(*env)->SetByteArrayRegion(env, address, 0, 4, addr + 12);
|
||||
} else {
|
||||
(*env)->SetByteArrayRegion(env, address, 0, 16, (jbyte*) &ip6addr->sin6_addr.s6_addr);
|
||||
}
|
||||
(*env)->SetIntField(env, packet, packetAddrLenFieldId, addrLen);
|
||||
(*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id);
|
||||
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port));
|
||||
}
|
||||
// TODO: Support this also for recvmmsg
|
||||
(*env)->SetIntField(env, packet, packetSegmentSizeFieldId, 0);
|
||||
init_packet(env, packet, &msg[i].msg_hdr, msg[i].msg_len);
|
||||
}
|
||||
|
||||
return (jint) res;
|
||||
@ -457,6 +501,7 @@ static jstring netty_epoll_native_kernelVersion(JNIEnv* env, jclass clazz) {
|
||||
netty_unix_errors_throwRuntimeExceptionErrorNo(env, "uname() failed: ", errno);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static jboolean netty_epoll_native_isSupportingSendmmsg(JNIEnv* env, jclass clazz) {
|
||||
if (SYS_sendmmsg == -1) {
|
||||
return JNI_FALSE;
|
||||
@ -603,7 +648,7 @@ static const JNINativeMethod fixed_method_table[] = {
|
||||
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);
|
||||
|
||||
static jint dynamicMethodsTableSize() {
|
||||
return fixed_method_table_size + 2; // 2 is for the dynamic method signatures.
|
||||
return fixed_method_table_size + 3; // 3 is for the dynamic method signatures.
|
||||
}
|
||||
|
||||
static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) {
|
||||
@ -630,6 +675,13 @@ static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) {
|
||||
dynamicMethod->fnPtr = (void *) netty_epoll_native_recvmmsg0;
|
||||
netty_jni_util_free_dynamic_name(&dynamicTypeName);
|
||||
|
||||
++dynamicMethod;
|
||||
NETTY_JNI_UTIL_PREPEND(packagePrefix, "io/netty/channel/epoll/NativeDatagramPacketArray$NativeDatagramPacket;)I", dynamicTypeName, error);
|
||||
NETTY_JNI_UTIL_PREPEND("(IZL", dynamicTypeName, dynamicMethod->signature, error);
|
||||
dynamicMethod->name = "recvmsg0";
|
||||
dynamicMethod->fnPtr = (void *) netty_epoll_native_recvmsg0;
|
||||
netty_jni_util_free_dynamic_name(&dynamicTypeName);
|
||||
|
||||
return dynamicMethods;
|
||||
error:
|
||||
free(dynamicTypeName);
|
||||
|
@ -50,6 +50,7 @@ public final class EpollChannelOption<T> extends UnixChannelOption<T> {
|
||||
public static final ChannelOption<Map<InetAddress, byte[]>> TCP_MD5SIG = valueOf("TCP_MD5SIG");
|
||||
|
||||
public static final ChannelOption<Integer> MAX_DATAGRAM_PAYLOAD_SIZE = valueOf("MAX_DATAGRAM_PAYLOAD_SIZE");
|
||||
public static final ChannelOption<Boolean> UDP_GRO = valueOf("UDP_GRO");
|
||||
|
||||
@SuppressWarnings({ "unused", "deprecation" })
|
||||
private EpollChannelOption() {
|
||||
|
@ -26,10 +26,8 @@ import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultAddressedEnvelope;
|
||||
import io.netty.channel.socket.DatagramChannel;
|
||||
import io.netty.channel.socket.DatagramChannelConfig;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.channel.socket.InternetProtocolFamily;
|
||||
import io.netty.channel.unix.DatagramSocketAddress;
|
||||
import io.netty.channel.unix.Errors;
|
||||
import io.netty.channel.unix.Errors.NativeIoException;
|
||||
import io.netty.channel.unix.Socket;
|
||||
@ -449,7 +447,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
@Override
|
||||
void epollInReady() {
|
||||
assert eventLoop().inEventLoop();
|
||||
DatagramChannelConfig config = config();
|
||||
EpollDatagramChannelConfig config = config();
|
||||
if (shouldBreakEpollInReady(config)) {
|
||||
clearEpollIn0();
|
||||
return;
|
||||
@ -467,25 +465,25 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
try {
|
||||
boolean connected = isConnected();
|
||||
do {
|
||||
ByteBuf byteBuf = allocHandle.allocate(allocator);
|
||||
final boolean read;
|
||||
int datagramSize = config().getMaxDatagramPayloadSize();
|
||||
|
||||
ByteBuf byteBuf = allocHandle.allocate(allocator);
|
||||
// Only try to use recvmmsg if its really supported by the running system.
|
||||
int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
|
||||
datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize :
|
||||
0;
|
||||
|
||||
try {
|
||||
if (numDatagram <= 1) {
|
||||
if (connected) {
|
||||
read = connectedRead(allocHandle, byteBuf, datagramSize);
|
||||
if (!connected || config.isUdpGro()) {
|
||||
read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf);
|
||||
} else {
|
||||
read = read(allocHandle, byteBuf, datagramSize);
|
||||
read = connectedRead(allocHandle, byteBuf, datagramSize);
|
||||
}
|
||||
} else {
|
||||
// Try to use scattering reads via recvmmsg(...) syscall.
|
||||
read = scatteringRead(allocHandle, byteBuf, datagramSize, numDatagram);
|
||||
read = scatteringRead(allocHandle, cleanDatagramPacketArray(),
|
||||
byteBuf, datagramSize, numDatagram);
|
||||
}
|
||||
} catch (NativeIoException e) {
|
||||
if (connected) {
|
||||
@ -568,13 +566,104 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
return e;
|
||||
}
|
||||
|
||||
private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle,
|
||||
private static void addDatagramPacketToOut(DatagramPacket packet,
|
||||
RecyclableArrayList out) {
|
||||
if (packet instanceof SegmentedDatagramPacket) {
|
||||
SegmentedDatagramPacket segmentedDatagramPacket = (SegmentedDatagramPacket) packet;
|
||||
ByteBuf content = segmentedDatagramPacket.content();
|
||||
InetSocketAddress recipient = segmentedDatagramPacket.recipient();
|
||||
InetSocketAddress sender = segmentedDatagramPacket.sender();
|
||||
int segmentSize = segmentedDatagramPacket.segmentSize();
|
||||
do {
|
||||
out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(),
|
||||
segmentSize)), recipient, sender));
|
||||
} while (content.isReadable());
|
||||
|
||||
segmentedDatagramPacket.release();
|
||||
} else {
|
||||
out.add(packet);
|
||||
}
|
||||
}
|
||||
|
||||
private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) {
|
||||
if (byteBuf != null) {
|
||||
byteBuf.release();
|
||||
}
|
||||
if (packetList != null) {
|
||||
for (int i = 0; i < packetList.size(); i++) {
|
||||
ReferenceCountUtil.release(packetList.get(i));
|
||||
}
|
||||
packetList.recycle();
|
||||
}
|
||||
}
|
||||
|
||||
private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
|
||||
int bytesRead, DatagramPacket packet) {
|
||||
handle.lastBytesRead(bytesRead);
|
||||
handle.incMessagesRead(1);
|
||||
pipeline.fireChannelRead(packet);
|
||||
}
|
||||
|
||||
private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
|
||||
int bytesRead, RecyclableArrayList packetList) {
|
||||
int messagesRead = packetList.size();
|
||||
handle.lastBytesRead(bytesRead);
|
||||
handle.incMessagesRead(messagesRead);
|
||||
for (int i = 0; i < messagesRead; i++) {
|
||||
pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER));
|
||||
}
|
||||
}
|
||||
|
||||
private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle,
|
||||
NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException {
|
||||
RecyclableArrayList datagramPackets = null;
|
||||
try {
|
||||
int writable = byteBuf.writableBytes();
|
||||
|
||||
boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable);
|
||||
assert added;
|
||||
|
||||
allocHandle.attemptedBytesRead(writable);
|
||||
|
||||
NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
|
||||
|
||||
int bytesReceived = socket.recvmsg(msg);
|
||||
if (bytesReceived == 0) {
|
||||
allocHandle.lastBytesRead(-1);
|
||||
return false;
|
||||
}
|
||||
byteBuf.writerIndex(bytesReceived);
|
||||
InetSocketAddress local = localAddress();
|
||||
DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
|
||||
if (!(packet instanceof SegmentedDatagramPacket)) {
|
||||
processPacket(pipeline(), allocHandle, bytesReceived, packet);
|
||||
byteBuf = null;
|
||||
} else {
|
||||
// Its important that we process all received data out of the NativeDatagramPacketArray
|
||||
// before we call fireChannelRead(...). This is because the user may call flush()
|
||||
// in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
|
||||
datagramPackets = RecyclableArrayList.newInstance();
|
||||
addDatagramPacketToOut(packet, datagramPackets);
|
||||
// null out byteBuf as addDatagramPacketToOut did take ownership of the ByteBuf / packet and transfered
|
||||
// it into the RecyclableArrayList.
|
||||
byteBuf = null;
|
||||
|
||||
processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
|
||||
datagramPackets.recycle();
|
||||
datagramPackets = null;
|
||||
}
|
||||
|
||||
return true;
|
||||
} finally {
|
||||
releaseAndRecycle(byteBuf, datagramPackets);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array,
|
||||
ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
|
||||
RecyclableArrayList bufferPackets = null;
|
||||
RecyclableArrayList datagramPackets = null;
|
||||
try {
|
||||
int offset = byteBuf.writerIndex();
|
||||
NativeDatagramPacketArray array = cleanDatagramPacketArray();
|
||||
|
||||
for (int i = 0; i < numDatagram; i++, offset += datagramSize) {
|
||||
if (!array.addWritable(byteBuf, offset, datagramSize)) {
|
||||
break;
|
||||
@ -596,82 +685,30 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
if (received == 1) {
|
||||
// Single packet fast-path
|
||||
DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
|
||||
allocHandle.lastBytesRead(datagramSize);
|
||||
allocHandle.incMessagesRead(1);
|
||||
pipeline().fireChannelRead(packet);
|
||||
byteBuf = null;
|
||||
return true;
|
||||
if (!(packet instanceof SegmentedDatagramPacket)) {
|
||||
processPacket(pipeline(), allocHandle, datagramSize, packet);
|
||||
byteBuf = null;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Its important that we process all received data out of the NativeDatagramPacketArray
|
||||
// before we call fireChannelRead(...). This is because the user may call flush()
|
||||
// in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
|
||||
bufferPackets = RecyclableArrayList.newInstance();
|
||||
datagramPackets = RecyclableArrayList.newInstance();
|
||||
for (int i = 0; i < received; i++) {
|
||||
DatagramPacket packet = packets[i].newDatagramPacket(byteBuf.readRetainedSlice(datagramSize), local);
|
||||
bufferPackets.add(packet);
|
||||
addDatagramPacketToOut(packet, datagramPackets);
|
||||
}
|
||||
|
||||
allocHandle.lastBytesRead(bytesReceived);
|
||||
allocHandle.incMessagesRead(received);
|
||||
|
||||
for (int i = 0; i < received; i++) {
|
||||
pipeline().fireChannelRead(bufferPackets.set(i, Unpooled.EMPTY_BUFFER));
|
||||
}
|
||||
bufferPackets.recycle();
|
||||
bufferPackets = null;
|
||||
return true;
|
||||
} finally {
|
||||
if (byteBuf != null) {
|
||||
byteBuf.release();
|
||||
}
|
||||
if (bufferPackets != null) {
|
||||
for (int i = 0; i < bufferPackets.size(); i++) {
|
||||
ReferenceCountUtil.release(bufferPackets.get(i));
|
||||
}
|
||||
bufferPackets.recycle();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean read(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
|
||||
throws IOException {
|
||||
try {
|
||||
int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
|
||||
: byteBuf.writableBytes();
|
||||
allocHandle.attemptedBytesRead(writable);
|
||||
int writerIndex = byteBuf.writerIndex();
|
||||
final DatagramSocketAddress remoteAddress;
|
||||
if (byteBuf.hasMemoryAddress()) {
|
||||
// has a memory address so use optimized call
|
||||
remoteAddress = socket.recvFromAddress(
|
||||
byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
|
||||
} else {
|
||||
ByteBuffer nioData = byteBuf.internalNioBuffer(writerIndex, writable);
|
||||
remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
|
||||
}
|
||||
|
||||
if (remoteAddress == null) {
|
||||
allocHandle.lastBytesRead(-1);
|
||||
return false;
|
||||
}
|
||||
InetSocketAddress localAddress = remoteAddress.localAddress();
|
||||
if (localAddress == null) {
|
||||
localAddress = localAddress();
|
||||
}
|
||||
int received = remoteAddress.receivedAmount();
|
||||
allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
|
||||
received : writable);
|
||||
byteBuf.writerIndex(writerIndex + received);
|
||||
allocHandle.incMessagesRead(1);
|
||||
|
||||
pipeline().fireChannelRead(new DatagramPacket(byteBuf, localAddress, remoteAddress));
|
||||
// Ass we did use readRetainedSlice(...) before we should now release the byteBuf and null it out.
|
||||
byteBuf.release();
|
||||
byteBuf = null;
|
||||
|
||||
processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
|
||||
datagramPackets.recycle();
|
||||
datagramPackets = null;
|
||||
return true;
|
||||
} finally {
|
||||
if (byteBuf != null) {
|
||||
byteBuf.release();
|
||||
}
|
||||
releaseAndRecycle(byteBuf, datagramPackets);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,7 +17,6 @@ package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.FixedRecvByteBufAllocator;
|
||||
@ -52,7 +51,8 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
|
||||
ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL,
|
||||
ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION,
|
||||
EpollChannelOption.SO_REUSEPORT, EpollChannelOption.IP_FREEBIND, EpollChannelOption.IP_TRANSPARENT,
|
||||
EpollChannelOption.IP_RECVORIGDSTADDR, EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE);
|
||||
EpollChannelOption.IP_RECVORIGDSTADDR, EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE,
|
||||
EpollChannelOption.UDP_GRO);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "deprecation" })
|
||||
@ -103,6 +103,9 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
|
||||
if (option == EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) {
|
||||
return (T) Integer.valueOf(getMaxDatagramPayloadSize());
|
||||
}
|
||||
if (option == EpollChannelOption.UDP_GRO) {
|
||||
return (T) Boolean.valueOf(isUdpGro());
|
||||
}
|
||||
return super.getOption(option);
|
||||
}
|
||||
|
||||
@ -141,6 +144,8 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
|
||||
setIpRecvOrigDestAddr((Boolean) value);
|
||||
} else if (option == EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) {
|
||||
setMaxDatagramPayloadSize((Integer) value);
|
||||
} else if (option == EpollChannelOption.UDP_GRO) {
|
||||
setUdpGro((Boolean) value);
|
||||
} else {
|
||||
return super.setOption(option, value);
|
||||
}
|
||||
@ -528,6 +533,33 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
|
||||
return maxDatagramSize;
|
||||
}
|
||||
|
||||
private volatile boolean gro;
|
||||
|
||||
/**
|
||||
* Enable / disable <a href="https://lwn.net/Articles/768995/">UDP_GRO</a>.
|
||||
* @param gro {@code true} if {@code UDP_GRO} should be enabled, {@code false} otherwise.
|
||||
* @return this.
|
||||
*/
|
||||
public EpollDatagramChannelConfig setUdpGro(boolean gro) {
|
||||
try {
|
||||
((EpollDatagramChannel) channel).socket.setUdpGro(gro);
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
this.gro = gro;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns if {@code UDP_GRO} is enabled.
|
||||
* @return {@code true} if enabled, {@code false} otherwise.
|
||||
*/
|
||||
public boolean isUdpGro() {
|
||||
// We don't do a syscall here but just return the cached value due a kernel bug:
|
||||
// https://lore.kernel.org/netdev/20210325195614.800687-1-norman_maurer@apple.com/T/#u
|
||||
return gro;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollDatagramChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) {
|
||||
super.setMaxMessagesPerWrite(maxMessagesPerWrite);
|
||||
|
@ -59,6 +59,10 @@ final class LinuxSocket extends Socket {
|
||||
return Native.recvmmsg(intValue(), ipv6, msgs, offset, len);
|
||||
}
|
||||
|
||||
int recvmsg(NativeDatagramPacketArray.NativeDatagramPacket msg) throws IOException {
|
||||
return Native.recvmsg(intValue(), ipv6, msg);
|
||||
}
|
||||
|
||||
void setTimeToLive(int ttl) throws IOException {
|
||||
setTimeToLive(intValue(), ttl);
|
||||
}
|
||||
@ -286,6 +290,14 @@ final class LinuxSocket extends Socket {
|
||||
setIpMulticastLoop(intValue(), ipv6, loopbackModeDisabled ? 0 : 1);
|
||||
}
|
||||
|
||||
boolean isUdpGro() throws IOException {
|
||||
return isUdpGro(intValue()) != 0;
|
||||
}
|
||||
|
||||
void setUdpGro(boolean gro) throws IOException {
|
||||
setUdpGro(intValue(), gro ? 1 : 0);
|
||||
}
|
||||
|
||||
long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException {
|
||||
// Open the file-region as it may be created via the lazy constructor. This is needed as we directly access
|
||||
// the FileChannel field via JNI.
|
||||
@ -389,4 +401,6 @@ final class LinuxSocket extends Socket {
|
||||
private static native int getIpMulticastLoop(int fd, boolean ipv6) throws IOException;
|
||||
private static native void setIpMulticastLoop(int fd, boolean ipv6, int enabled) throws IOException;
|
||||
private static native void setTimeToLive(int fd, int ttl) throws IOException;
|
||||
private static native int isUdpGro(int fd) throws IOException;
|
||||
private static native void setUdpGro(int fd, int gro) throws IOException;
|
||||
}
|
||||
|
@ -263,6 +263,17 @@ public final class Native {
|
||||
private static native int recvmmsg0(
|
||||
int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len);
|
||||
|
||||
static int recvmsg(int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket packet) throws IOException {
|
||||
int res = recvmsg0(fd, ipv6, packet);
|
||||
if (res >= 0) {
|
||||
return res;
|
||||
}
|
||||
return ioResult("recvmsg", res);
|
||||
}
|
||||
|
||||
private static native int recvmsg0(
|
||||
int fd, boolean ipv6, NativeDatagramPacketArray.NativeDatagramPacket msg);
|
||||
|
||||
// epoll_event related
|
||||
public static native int sizeofEpollEvent();
|
||||
public static native int offsetofEpollData();
|
||||
|
@ -182,7 +182,7 @@ final class NativeDatagramPacketArray {
|
||||
}
|
||||
}
|
||||
|
||||
DatagramPacket newDatagramPacket(ByteBuf buffer, InetSocketAddress localAddress) throws UnknownHostException {
|
||||
DatagramPacket newDatagramPacket(ByteBuf buffer, InetSocketAddress recipient) throws UnknownHostException {
|
||||
final InetAddress address;
|
||||
if (addrLen == ipv4Bytes.length) {
|
||||
System.arraycopy(addr, 0, ipv4Bytes, 0, addrLen);
|
||||
@ -190,8 +190,14 @@ final class NativeDatagramPacketArray {
|
||||
} else {
|
||||
address = Inet6Address.getByAddress(null, addr, scopeId);
|
||||
}
|
||||
return new DatagramPacket(buffer.writerIndex(count),
|
||||
localAddress, new InetSocketAddress(address, port));
|
||||
InetSocketAddress sender = new InetSocketAddress(address, port);
|
||||
buffer.writerIndex(count);
|
||||
|
||||
// UDP_GRO
|
||||
if (segmentSize > 0) {
|
||||
return new SegmentedDatagramPacket(buffer, segmentSize, recipient, sender);
|
||||
}
|
||||
return new DatagramPacket(buffer, recipient, sender);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,8 @@ import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.FixedRecvByteBufAllocator;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.channel.socket.InternetProtocolFamily;
|
||||
@ -30,12 +32,12 @@ import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class EpollDatagramUnicastTest extends DatagramUnicastTest {
|
||||
@ -57,7 +59,7 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
|
||||
}
|
||||
|
||||
public void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testSendSegmentedDatagramPacket(sb, cb, false);
|
||||
testSegmentedDatagramPacket(sb, cb, false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -66,15 +68,37 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
|
||||
}
|
||||
|
||||
public void testSendSegmentedDatagramPacketComposite(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testSendSegmentedDatagramPacket(sb, cb, true);
|
||||
testSegmentedDatagramPacket(sb, cb, true, false);
|
||||
}
|
||||
|
||||
private void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb, boolean composite)
|
||||
@Test
|
||||
public void testSendAndReceiveSegmentedDatagramPacket() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testSendAndReceiveSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testSegmentedDatagramPacket(sb, cb, false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendAndReceiveSegmentedDatagramPacketComposite() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testSendAndReceiveSegmentedDatagramPacketComposite(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testSegmentedDatagramPacket(sb, cb, true, true);
|
||||
}
|
||||
|
||||
private void testSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb, boolean composite, boolean gro)
|
||||
throws Throwable {
|
||||
if (!(cb.group() instanceof EpollEventLoopGroup)) {
|
||||
// Only supported for the native epoll transport.
|
||||
return;
|
||||
}
|
||||
if (gro && !(sb.group() instanceof EpollEventLoopGroup)) {
|
||||
// Only supported for the native epoll transport.
|
||||
return;
|
||||
}
|
||||
Assume.assumeTrue(SegmentedDatagramPacket.isSupported());
|
||||
Channel sc = null;
|
||||
Channel cc = null;
|
||||
@ -94,6 +118,12 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
|
||||
int bufferCapacity = numBuffers * segmentSize;
|
||||
final CountDownLatch latch = new CountDownLatch(numBuffers);
|
||||
AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
|
||||
if (gro) {
|
||||
// Enable GRO and also ensure we can read everything with one read as otherwise
|
||||
// we will drop things on the floor.
|
||||
sb.option(EpollChannelOption.UDP_GRO, true);
|
||||
sb.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferCapacity));
|
||||
}
|
||||
sc = sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
|
||||
@Override
|
||||
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) {
|
||||
@ -103,6 +133,9 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
|
||||
}
|
||||
}).bind(newSocketAddress()).sync().channel();
|
||||
|
||||
if (sc instanceof EpollDatagramChannel) {
|
||||
assertEquals(gro, sc.config().getOption(EpollChannelOption.UDP_GRO));
|
||||
}
|
||||
InetSocketAddress addr = sendToAddress((InetSocketAddress) sc.localAddress());
|
||||
final ByteBuf buffer;
|
||||
if (composite) {
|
||||
|
Loading…
Reference in New Issue
Block a user