From c22c6b845d28783375823e4c8222957ffe3664d9 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 27 Feb 2021 21:55:37 +0100 Subject: [PATCH] Add support for UDP_SEGMENT (GSO) when using sendmmsg (#11038) Motivation: For protocols like QUIC using UDP_SEGMENT (GSO) can help to reduce the overhead quite a bit. We should support it. Modifications: - Add a SegmentedDatagramPacket which can be used to use UDP_SEGMENT - Add unit test Result: Be able to make use of UDP_SEGMENT --- .../src/main/c/netty_epoll_native.c | 40 +++++- .../channel/epoll/EpollDatagramChannel.java | 14 +- .../java/io/netty/channel/epoll/Native.java | 3 +- .../epoll/NativeDatagramPacketArray.java | 23 ++- .../epoll/SegmentedDatagramPacket.java | 133 ++++++++++++++++++ .../epoll/EpollDatagramUnicastTest.java | 75 ++++++++++ .../netty/channel/socket/DatagramPacket.java | 2 +- 7 files changed, 279 insertions(+), 11 deletions(-) create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/SegmentedDatagramPacket.java diff --git a/transport-native-epoll/src/main/c/netty_epoll_native.c b/transport-native-epoll/src/main/c/netty_epoll_native.c index d4e0039487..9f462acf55 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_native.c +++ b/transport-native-epoll/src/main/c/netty_epoll_native.c @@ -40,6 +40,9 @@ #include #include +// Needed for UDP_SEGMENT +#include + #include "netty_epoll_linuxsocket.h" #include "netty_unix_buffer.h" #include "netty_unix_errors.h" @@ -63,6 +66,11 @@ #define TCP_FASTOPEN 23 #endif +// Allow to compile on systems with older kernels. +#ifndef UDP_SEGMENT +#define UDP_SEGMENT 103 +#endif + // optional extern int epoll_create1(int flags) __attribute__((weak)); @@ -103,6 +111,7 @@ struct mmsghdr { // Those are initialized in the init(...) method and cached for performance reasons static jfieldID packetAddrFieldId = NULL; static jfieldID packetAddrLenFieldId = NULL; +static jfieldID packetSegmentSizeFieldId = NULL; static jfieldID packetScopeIdFieldId = NULL; static jfieldID packetPortFieldId = NULL; static jfieldID packetMemoryAddressFieldId = NULL; @@ -320,17 +329,27 @@ static jint netty_epoll_native_epollCtlDel0(JNIEnv* env, jclass clazz, jint efd, static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobjectArray packets, jint offset, jint len) { struct mmsghdr msg[len]; struct sockaddr_storage addr[len]; + char controls[len][CMSG_SPACE(sizeof(uint16_t))]; + socklen_t addrSize; int i; memset(msg, 0, sizeof(msg)); for (i = 0; i < len; i++) { - jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset); jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId); jint addrLen = (*env)->GetIntField(env, packet, packetAddrLenFieldId); - + jint packetSegmentSize = (*env)->GetIntField(env, packet, packetSegmentSizeFieldId); + if (packetSegmentSize > 0) { + msg[i].msg_hdr.msg_control = controls[i]; + msg[i].msg_hdr.msg_controllen = sizeof(controls[i]); + struct cmsghdr *cm = CMSG_FIRSTHDR(&msg[i].msg_hdr); + cm->cmsg_level = SOL_UDP; + cm->cmsg_type = UDP_SEGMENT; + cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); + *((uint16_t *) CMSG_DATA(cm)) = packetSegmentSize; + } if (addrLen != 0) { jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId); jint port = (*env)->GetIntField(env, packet, packetPortFieldId); @@ -421,6 +440,8 @@ static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo (*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id); (*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port)); } + // TODO: Support this also for recvmmsg + (*env)->SetIntField(env, packet, packetSegmentSizeFieldId, 0); } return (jint) res; @@ -448,6 +469,17 @@ static jboolean netty_epoll_native_isSupportingSendmmsg(JNIEnv* env, jclass claz return JNI_TRUE; } +static jboolean netty_epoll_native_isSupportingUdpSegment(JNIEnv* env, jclass clazz) { + int fd = socket(AF_INET, SOCK_DGRAM, 0); + if (fd == -1) { + return JNI_FALSE; + } + int gso_size = 512; + int ret = setsockopt(fd, SOL_UDP, UDP_SEGMENT, &gso_size, sizeof(gso_size)); + close(fd); + return ret == -1 ? JNI_FALSE : JNI_TRUE; +} + static jboolean netty_epoll_native_isSupportingRecvmmsg(JNIEnv* env, jclass clazz) { if (SYS_recvmmsg == -1) { return JNI_FALSE; @@ -567,6 +599,7 @@ static const JNINativeMethod fixed_method_table[] = { { "sizeofEpollEvent", "()I", (void *) netty_epoll_native_sizeofEpollEvent }, { "offsetofEpollData", "()I", (void *) netty_epoll_native_offsetofEpollData }, { "splice0", "(IJIJJ)I", (void *) netty_epoll_native_splice0 }, + { "isSupportingUdpSegment", "()Z", (void *) netty_epoll_native_isSupportingUdpSegment }, { "registerUnix", "()I", (void *) netty_epoll_native_registerUnix }, }; @@ -655,6 +688,7 @@ static jint netty_epoll_native_JNI_OnLoad(JNIEnv* env, const char* packagePrefix NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetAddrFieldId, "addr", "[B", done); NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetAddrLenFieldId, "addrLen", "I", done); + NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetSegmentSizeFieldId, "segmentSize", "I", done); NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetScopeIdFieldId, "scopeId", "I", done); NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetPortFieldId, "port", "I", done); NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetMemoryAddressFieldId, "memoryAddress", "J", done); @@ -682,6 +716,7 @@ done: } packetAddrFieldId = NULL; packetAddrLenFieldId = NULL; + packetSegmentSizeFieldId = NULL; packetScopeIdFieldId = NULL; packetPortFieldId = NULL; packetMemoryAddressFieldId = NULL; @@ -704,6 +739,7 @@ static void netty_epoll_native_JNI_OnUnload(JNIEnv* env, const char* packagePref packetAddrFieldId = NULL; packetAddrLenFieldId = NULL; + packetSegmentSizeFieldId = NULL; packetScopeIdFieldId = NULL; packetPortFieldId = NULL; packetMemoryAddressFieldId = NULL; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 0736a6fc9d..ceb2a26abf 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -297,7 +297,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements try { // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+ - if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) { + if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 || + // We only handle UDP_SEGMENT in sendmmsg. + in.current() instanceof SegmentedDatagramPacket) { NativeDatagramPacketArray array = cleanDatagramPacketArray(); array.add(in, isConnected()); int cnt = array.count(); @@ -371,6 +373,16 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements @Override protected Object filterOutboundMessage(Object msg) { + if (msg instanceof SegmentedDatagramPacket) { + if (!Native.IS_SUPPORTING_UDP_SEGMENT) { + throw new UnsupportedOperationException( + "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); + } + SegmentedDatagramPacket packet = (SegmentedDatagramPacket) msg; + ByteBuf content = packet.content(); + return UnixChannelUtil.isBufferCopyNeededForWrite(content) ? + packet.replace(newDirectBuffer(packet, content)) : msg; + } if (msg instanceof DatagramPacket) { DatagramPacket packet = (DatagramPacket) msg; ByteBuf content = packet.content(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index fecf9f894c..87c51f3e9a 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -96,7 +96,7 @@ public final class Native { public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg(); static final boolean IS_SUPPORTING_RECVMMSG = isSupportingRecvmmsg(); - + static final boolean IS_SUPPORTING_UDP_SEGMENT = isSupportingUdpSegment(); public static final boolean IS_SUPPORTING_TCP_FASTOPEN = isSupportingTcpFastopen(); public static final int TCP_MD5SIG_MAXKEYLEN = tcpMd5SigMaxKeyLen(); public static final String KERNEL_VERSION = kernelVersion(); @@ -109,6 +109,7 @@ public final class Native { return new FileDescriptor(timerFd()); } + private static native boolean isSupportingUdpSegment(); private static native int eventFd(); private static native int timerFd(); public static native void eventFdWrite(int fd, long value); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java index 4efffe6e10..60e4de8bae 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java @@ -55,10 +55,10 @@ final class NativeDatagramPacketArray { } boolean addWritable(ByteBuf buf, int index, int len) { - return add0(buf, index, len, null); + return add0(buf, index, len, 0, null); } - private boolean add0(ByteBuf buf, int index, int len, InetSocketAddress recipient) { + private boolean add0(ByteBuf buf, int index, int len, int segmentLen, InetSocketAddress recipient) { if (count == packets.length) { // We already filled up to UIO_MAX_IOV messages. This is the max allowed per // recvmmsg(...) / sendmmsg(...) call, we will try again later. @@ -73,7 +73,7 @@ final class NativeDatagramPacketArray { return false; } NativeDatagramPacket p = packets[count]; - p.init(iovArray.memoryAddress(offset), iovArray.count() - offset, recipient); + p.init(iovArray.memoryAddress(offset), iovArray.count() - offset, segmentLen, recipient); count++; return true; @@ -115,11 +115,20 @@ final class NativeDatagramPacketArray { if (msg instanceof DatagramPacket) { DatagramPacket packet = (DatagramPacket) msg; ByteBuf buf = packet.content(); - return add0(buf, buf.readerIndex(), buf.readableBytes(), packet.recipient()); + int segmentSize = 0; + if (packet instanceof SegmentedDatagramPacket) { + int seg = ((SegmentedDatagramPacket) packet).segmentSize(); + // We only need to tell the kernel that we want to use UDP_SEGMENT if there are multiple + // segments in the packet. + if (buf.readableBytes() > seg) { + segmentSize = seg; + } + } + return add0(buf, buf.readerIndex(), buf.readableBytes(), segmentSize, packet.recipient()); } if (msg instanceof ByteBuf && connected) { ByteBuf buf = (ByteBuf) msg; - return add0(buf, buf.readerIndex(), buf.readableBytes(), null); + return add0(buf, buf.readerIndex(), buf.readableBytes(), 0, null); } return false; } @@ -137,13 +146,15 @@ final class NativeDatagramPacketArray { private final byte[] addr = new byte[16]; + private int segmentSize; private int addrLen; private int scopeId; private int port; - private void init(long memoryAddress, int count, InetSocketAddress recipient) { + private void init(long memoryAddress, int count, int segmentSize, InetSocketAddress recipient) { this.memoryAddress = memoryAddress; this.count = count; + this.segmentSize = segmentSize; if (recipient == null) { this.scopeId = 0; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/SegmentedDatagramPacket.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/SegmentedDatagramPacket.java new file mode 100644 index 0000000000..606c75d7c6 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/SegmentedDatagramPacket.java @@ -0,0 +1,133 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.internal.ObjectUtil; + +import java.net.InetSocketAddress; + +/** + * Allows to use GSO + * if the underlying OS supports it. Before instance and use this class you should check {@link #isSupported()}. + */ +public final class SegmentedDatagramPacket extends DatagramPacket { + + private final int segmentSize; + + /** + * Create a new instance. + * + * @param data the {@link ByteBuf} which must be continguous. + * @param segmentSize the segment size. + * @param recipient the recipient. + */ + public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient) { + super(checkByteBuf(data), recipient); + checkIsSupported(); + this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize"); + } + + /** + * Create a new instance. + * + * @param data the {@link ByteBuf} which must be continguous. + * @param segmentSize the segment size. + * @param recipient the recipient. + */ + public SegmentedDatagramPacket(ByteBuf data, int segmentSize, + InetSocketAddress recipient, InetSocketAddress sender) { + super(checkByteBuf(data), recipient, sender); + checkIsSupported(); + this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize"); + } + + /** + * Returns {@code true} if the underlying system supports GSO. + */ + public static boolean isSupported() { + return Epoll.isAvailable() && + // We only support it together with sendmmsg(...) + Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT; + } + + /** + * Return the size of each segment (the last segment can be smaller). + * + * @return size of segments. + */ + public int segmentSize() { + return segmentSize; + } + + @Override + public SegmentedDatagramPacket copy() { + return new SegmentedDatagramPacket(content().copy(), segmentSize, recipient(), sender()); + } + + @Override + public SegmentedDatagramPacket duplicate() { + return new SegmentedDatagramPacket(content().duplicate(), segmentSize, recipient(), sender()); + } + + @Override + public SegmentedDatagramPacket retainedDuplicate() { + return new SegmentedDatagramPacket(content().retainedDuplicate(), segmentSize, recipient(), sender()); + } + + @Override + public SegmentedDatagramPacket replace(ByteBuf content) { + return new SegmentedDatagramPacket(content, segmentSize, recipient(), sender()); + } + + @Override + public SegmentedDatagramPacket retain() { + super.retain(); + return this; + } + + @Override + public SegmentedDatagramPacket retain(int increment) { + super.retain(increment); + return this; + } + + @Override + public SegmentedDatagramPacket touch() { + super.touch(); + return this; + } + + @Override + public SegmentedDatagramPacket touch(Object hint) { + super.touch(hint); + return this; + } + + private static ByteBuf checkByteBuf(ByteBuf buffer) { + if (!buffer.isContiguous()) { + throw new IllegalArgumentException("Buffer needs to be continguous"); + } + return buffer; + } + + private static void checkIsSupported() { + if (!isSupported()) { + throw new IllegalStateException(); + } + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java index 1e30b9f3cf..eea960bbca 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java @@ -16,11 +16,26 @@ package io.netty.channel.epoll; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.testsuite.transport.TestsuitePermutation; import io.netty.testsuite.transport.socket.DatagramUnicastTest; +import org.junit.Assume; +import org.junit.Test; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.fail; public class EpollDatagramUnicastTest extends DatagramUnicastTest { @Override @@ -34,4 +49,64 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest { super.testSimpleSendWithConnect(sb, cb); sb.option(EpollChannelOption.IP_RECVORIGDSTADDR, false); } + + @Test + public void testSendSegmentedDatagramPacket() throws Throwable { + run(); + } + + public void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb) + throws Throwable { + if (!(cb.group() instanceof EpollEventLoopGroup)) { + // Only supported for the native epoll transport. + return; + } + Assume.assumeTrue(SegmentedDatagramPacket.isSupported()); + Channel sc = null; + Channel cc = null; + + try { + cb.handler(new SimpleChannelInboundHandler() { + @Override + public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception { + // Nothing will be sent. + } + }); + + final SocketAddress sender; + cc = cb.bind(newSocketAddress()).sync().channel(); + + final int segmentSize = 512; + int bufferCapacity = 16 * segmentSize; + final CountDownLatch latch = new CountDownLatch(bufferCapacity / segmentSize); + AtomicReference errorRef = new AtomicReference(); + sc = sb.handler(new SimpleChannelInboundHandler() { + @Override + public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) { + if (packet.content().readableBytes() == segmentSize) { + latch.countDown(); + } + } + }).bind(newSocketAddress()).sync().channel(); + + InetSocketAddress addr = sendToAddress((InetSocketAddress) sc.localAddress()); + ByteBuf buffer = Unpooled.directBuffer(bufferCapacity).writeZero(bufferCapacity); + cc.writeAndFlush(new SegmentedDatagramPacket(buffer, segmentSize, addr)).sync(); + + if (!latch.await(10, TimeUnit.SECONDS)) { + Throwable error = errorRef.get(); + if (error != null) { + throw error; + } + fail(); + } + } finally { + if (cc != null) { + cc.close().sync(); + } + if (sc != null) { + sc.close().sync(); + } + } + } } diff --git a/transport/src/main/java/io/netty/channel/socket/DatagramPacket.java b/transport/src/main/java/io/netty/channel/socket/DatagramPacket.java index 024edb2b45..c5d1cd7a01 100644 --- a/transport/src/main/java/io/netty/channel/socket/DatagramPacket.java +++ b/transport/src/main/java/io/netty/channel/socket/DatagramPacket.java @@ -24,7 +24,7 @@ import java.net.InetSocketAddress; /** * The message container that is used for {@link DatagramChannel} to communicate with the remote peer. */ -public final class DatagramPacket +public class DatagramPacket extends DefaultAddressedEnvelope implements ByteBufHolder { /**