Add support for UDP_SEGMENT (GSO) when using sendmmsg (#11038)
Motivation: For protocols like QUIC using UDP_SEGMENT (GSO) can help to reduce the overhead quite a bit. We should support it. Modifications: - Add a SegmentedDatagramPacket which can be used to use UDP_SEGMENT - Add unit test Result: Be able to make use of UDP_SEGMENT
This commit is contained in:
parent
1529ef1794
commit
c22c6b845d
@ -40,6 +40,9 @@
|
||||
#include <linux/net.h>
|
||||
#include <sys/syscall.h>
|
||||
|
||||
// Needed for UDP_SEGMENT
|
||||
#include <netinet/udp.h>
|
||||
|
||||
#include "netty_epoll_linuxsocket.h"
|
||||
#include "netty_unix_buffer.h"
|
||||
#include "netty_unix_errors.h"
|
||||
@ -63,6 +66,11 @@
|
||||
#define TCP_FASTOPEN 23
|
||||
#endif
|
||||
|
||||
// Allow to compile on systems with older kernels.
|
||||
#ifndef UDP_SEGMENT
|
||||
#define UDP_SEGMENT 103
|
||||
#endif
|
||||
|
||||
// optional
|
||||
extern int epoll_create1(int flags) __attribute__((weak));
|
||||
|
||||
@ -103,6 +111,7 @@ struct mmsghdr {
|
||||
// Those are initialized in the init(...) method and cached for performance reasons
|
||||
static jfieldID packetAddrFieldId = NULL;
|
||||
static jfieldID packetAddrLenFieldId = NULL;
|
||||
static jfieldID packetSegmentSizeFieldId = NULL;
|
||||
static jfieldID packetScopeIdFieldId = NULL;
|
||||
static jfieldID packetPortFieldId = NULL;
|
||||
static jfieldID packetMemoryAddressFieldId = NULL;
|
||||
@ -320,17 +329,27 @@ static jint netty_epoll_native_epollCtlDel0(JNIEnv* env, jclass clazz, jint efd,
|
||||
static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobjectArray packets, jint offset, jint len) {
|
||||
struct mmsghdr msg[len];
|
||||
struct sockaddr_storage addr[len];
|
||||
char controls[len][CMSG_SPACE(sizeof(uint16_t))];
|
||||
|
||||
socklen_t addrSize;
|
||||
int i;
|
||||
|
||||
memset(msg, 0, sizeof(msg));
|
||||
|
||||
for (i = 0; i < len; i++) {
|
||||
|
||||
jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset);
|
||||
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId);
|
||||
jint addrLen = (*env)->GetIntField(env, packet, packetAddrLenFieldId);
|
||||
|
||||
jint packetSegmentSize = (*env)->GetIntField(env, packet, packetSegmentSizeFieldId);
|
||||
if (packetSegmentSize > 0) {
|
||||
msg[i].msg_hdr.msg_control = controls[i];
|
||||
msg[i].msg_hdr.msg_controllen = sizeof(controls[i]);
|
||||
struct cmsghdr *cm = CMSG_FIRSTHDR(&msg[i].msg_hdr);
|
||||
cm->cmsg_level = SOL_UDP;
|
||||
cm->cmsg_type = UDP_SEGMENT;
|
||||
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
|
||||
*((uint16_t *) CMSG_DATA(cm)) = packetSegmentSize;
|
||||
}
|
||||
if (addrLen != 0) {
|
||||
jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId);
|
||||
jint port = (*env)->GetIntField(env, packet, packetPortFieldId);
|
||||
@ -421,6 +440,8 @@ static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
|
||||
(*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id);
|
||||
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port));
|
||||
}
|
||||
// TODO: Support this also for recvmmsg
|
||||
(*env)->SetIntField(env, packet, packetSegmentSizeFieldId, 0);
|
||||
}
|
||||
|
||||
return (jint) res;
|
||||
@ -448,6 +469,17 @@ static jboolean netty_epoll_native_isSupportingSendmmsg(JNIEnv* env, jclass claz
|
||||
return JNI_TRUE;
|
||||
}
|
||||
|
||||
static jboolean netty_epoll_native_isSupportingUdpSegment(JNIEnv* env, jclass clazz) {
|
||||
int fd = socket(AF_INET, SOCK_DGRAM, 0);
|
||||
if (fd == -1) {
|
||||
return JNI_FALSE;
|
||||
}
|
||||
int gso_size = 512;
|
||||
int ret = setsockopt(fd, SOL_UDP, UDP_SEGMENT, &gso_size, sizeof(gso_size));
|
||||
close(fd);
|
||||
return ret == -1 ? JNI_FALSE : JNI_TRUE;
|
||||
}
|
||||
|
||||
static jboolean netty_epoll_native_isSupportingRecvmmsg(JNIEnv* env, jclass clazz) {
|
||||
if (SYS_recvmmsg == -1) {
|
||||
return JNI_FALSE;
|
||||
@ -567,6 +599,7 @@ static const JNINativeMethod fixed_method_table[] = {
|
||||
{ "sizeofEpollEvent", "()I", (void *) netty_epoll_native_sizeofEpollEvent },
|
||||
{ "offsetofEpollData", "()I", (void *) netty_epoll_native_offsetofEpollData },
|
||||
{ "splice0", "(IJIJJ)I", (void *) netty_epoll_native_splice0 },
|
||||
{ "isSupportingUdpSegment", "()Z", (void *) netty_epoll_native_isSupportingUdpSegment },
|
||||
{ "registerUnix", "()I", (void *) netty_epoll_native_registerUnix },
|
||||
|
||||
};
|
||||
@ -655,6 +688,7 @@ static jint netty_epoll_native_JNI_OnLoad(JNIEnv* env, const char* packagePrefix
|
||||
|
||||
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetAddrFieldId, "addr", "[B", done);
|
||||
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetAddrLenFieldId, "addrLen", "I", done);
|
||||
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetSegmentSizeFieldId, "segmentSize", "I", done);
|
||||
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetScopeIdFieldId, "scopeId", "I", done);
|
||||
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetPortFieldId, "port", "I", done);
|
||||
NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetMemoryAddressFieldId, "memoryAddress", "J", done);
|
||||
@ -682,6 +716,7 @@ done:
|
||||
}
|
||||
packetAddrFieldId = NULL;
|
||||
packetAddrLenFieldId = NULL;
|
||||
packetSegmentSizeFieldId = NULL;
|
||||
packetScopeIdFieldId = NULL;
|
||||
packetPortFieldId = NULL;
|
||||
packetMemoryAddressFieldId = NULL;
|
||||
@ -704,6 +739,7 @@ static void netty_epoll_native_JNI_OnUnload(JNIEnv* env, const char* packagePref
|
||||
|
||||
packetAddrFieldId = NULL;
|
||||
packetAddrLenFieldId = NULL;
|
||||
packetSegmentSizeFieldId = NULL;
|
||||
packetScopeIdFieldId = NULL;
|
||||
packetPortFieldId = NULL;
|
||||
packetMemoryAddressFieldId = NULL;
|
||||
|
@ -297,7 +297,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
|
||||
try {
|
||||
// Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
|
||||
if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
|
||||
if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
|
||||
// We only handle UDP_SEGMENT in sendmmsg.
|
||||
in.current() instanceof SegmentedDatagramPacket) {
|
||||
NativeDatagramPacketArray array = cleanDatagramPacketArray();
|
||||
array.add(in, isConnected());
|
||||
int cnt = array.count();
|
||||
@ -371,6 +373,16 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
|
||||
@Override
|
||||
protected Object filterOutboundMessage(Object msg) {
|
||||
if (msg instanceof SegmentedDatagramPacket) {
|
||||
if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
|
||||
throw new UnsupportedOperationException(
|
||||
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
|
||||
}
|
||||
SegmentedDatagramPacket packet = (SegmentedDatagramPacket) msg;
|
||||
ByteBuf content = packet.content();
|
||||
return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
|
||||
packet.replace(newDirectBuffer(packet, content)) : msg;
|
||||
}
|
||||
if (msg instanceof DatagramPacket) {
|
||||
DatagramPacket packet = (DatagramPacket) msg;
|
||||
ByteBuf content = packet.content();
|
||||
|
@ -96,7 +96,7 @@ public final class Native {
|
||||
|
||||
public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg();
|
||||
static final boolean IS_SUPPORTING_RECVMMSG = isSupportingRecvmmsg();
|
||||
|
||||
static final boolean IS_SUPPORTING_UDP_SEGMENT = isSupportingUdpSegment();
|
||||
public static final boolean IS_SUPPORTING_TCP_FASTOPEN = isSupportingTcpFastopen();
|
||||
public static final int TCP_MD5SIG_MAXKEYLEN = tcpMd5SigMaxKeyLen();
|
||||
public static final String KERNEL_VERSION = kernelVersion();
|
||||
@ -109,6 +109,7 @@ public final class Native {
|
||||
return new FileDescriptor(timerFd());
|
||||
}
|
||||
|
||||
private static native boolean isSupportingUdpSegment();
|
||||
private static native int eventFd();
|
||||
private static native int timerFd();
|
||||
public static native void eventFdWrite(int fd, long value);
|
||||
|
@ -55,10 +55,10 @@ final class NativeDatagramPacketArray {
|
||||
}
|
||||
|
||||
boolean addWritable(ByteBuf buf, int index, int len) {
|
||||
return add0(buf, index, len, null);
|
||||
return add0(buf, index, len, 0, null);
|
||||
}
|
||||
|
||||
private boolean add0(ByteBuf buf, int index, int len, InetSocketAddress recipient) {
|
||||
private boolean add0(ByteBuf buf, int index, int len, int segmentLen, InetSocketAddress recipient) {
|
||||
if (count == packets.length) {
|
||||
// We already filled up to UIO_MAX_IOV messages. This is the max allowed per
|
||||
// recvmmsg(...) / sendmmsg(...) call, we will try again later.
|
||||
@ -73,7 +73,7 @@ final class NativeDatagramPacketArray {
|
||||
return false;
|
||||
}
|
||||
NativeDatagramPacket p = packets[count];
|
||||
p.init(iovArray.memoryAddress(offset), iovArray.count() - offset, recipient);
|
||||
p.init(iovArray.memoryAddress(offset), iovArray.count() - offset, segmentLen, recipient);
|
||||
|
||||
count++;
|
||||
return true;
|
||||
@ -115,11 +115,20 @@ final class NativeDatagramPacketArray {
|
||||
if (msg instanceof DatagramPacket) {
|
||||
DatagramPacket packet = (DatagramPacket) msg;
|
||||
ByteBuf buf = packet.content();
|
||||
return add0(buf, buf.readerIndex(), buf.readableBytes(), packet.recipient());
|
||||
int segmentSize = 0;
|
||||
if (packet instanceof SegmentedDatagramPacket) {
|
||||
int seg = ((SegmentedDatagramPacket) packet).segmentSize();
|
||||
// We only need to tell the kernel that we want to use UDP_SEGMENT if there are multiple
|
||||
// segments in the packet.
|
||||
if (buf.readableBytes() > seg) {
|
||||
segmentSize = seg;
|
||||
}
|
||||
}
|
||||
return add0(buf, buf.readerIndex(), buf.readableBytes(), segmentSize, packet.recipient());
|
||||
}
|
||||
if (msg instanceof ByteBuf && connected) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
return add0(buf, buf.readerIndex(), buf.readableBytes(), null);
|
||||
return add0(buf, buf.readerIndex(), buf.readableBytes(), 0, null);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@ -137,13 +146,15 @@ final class NativeDatagramPacketArray {
|
||||
|
||||
private final byte[] addr = new byte[16];
|
||||
|
||||
private int segmentSize;
|
||||
private int addrLen;
|
||||
private int scopeId;
|
||||
private int port;
|
||||
|
||||
private void init(long memoryAddress, int count, InetSocketAddress recipient) {
|
||||
private void init(long memoryAddress, int count, int segmentSize, InetSocketAddress recipient) {
|
||||
this.memoryAddress = memoryAddress;
|
||||
this.count = count;
|
||||
this.segmentSize = segmentSize;
|
||||
|
||||
if (recipient == null) {
|
||||
this.scopeId = 0;
|
||||
|
@ -0,0 +1,133 @@
|
||||
/*
|
||||
* Copyright 2021 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* Allows to use <a href="https://blog.cloudflare.com/accelerating-udp-packet-transmission-for-quic/">GSO</a>
|
||||
* if the underlying OS supports it. Before instance and use this class you should check {@link #isSupported()}.
|
||||
*/
|
||||
public final class SegmentedDatagramPacket extends DatagramPacket {
|
||||
|
||||
private final int segmentSize;
|
||||
|
||||
/**
|
||||
* Create a new instance.
|
||||
*
|
||||
* @param data the {@link ByteBuf} which must be continguous.
|
||||
* @param segmentSize the segment size.
|
||||
* @param recipient the recipient.
|
||||
*/
|
||||
public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient) {
|
||||
super(checkByteBuf(data), recipient);
|
||||
checkIsSupported();
|
||||
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance.
|
||||
*
|
||||
* @param data the {@link ByteBuf} which must be continguous.
|
||||
* @param segmentSize the segment size.
|
||||
* @param recipient the recipient.
|
||||
*/
|
||||
public SegmentedDatagramPacket(ByteBuf data, int segmentSize,
|
||||
InetSocketAddress recipient, InetSocketAddress sender) {
|
||||
super(checkByteBuf(data), recipient, sender);
|
||||
checkIsSupported();
|
||||
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if the underlying system supports GSO.
|
||||
*/
|
||||
public static boolean isSupported() {
|
||||
return Epoll.isAvailable() &&
|
||||
// We only support it together with sendmmsg(...)
|
||||
Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the size of each segment (the last segment can be smaller).
|
||||
*
|
||||
* @return size of segments.
|
||||
*/
|
||||
public int segmentSize() {
|
||||
return segmentSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SegmentedDatagramPacket copy() {
|
||||
return new SegmentedDatagramPacket(content().copy(), segmentSize, recipient(), sender());
|
||||
}
|
||||
|
||||
@Override
|
||||
public SegmentedDatagramPacket duplicate() {
|
||||
return new SegmentedDatagramPacket(content().duplicate(), segmentSize, recipient(), sender());
|
||||
}
|
||||
|
||||
@Override
|
||||
public SegmentedDatagramPacket retainedDuplicate() {
|
||||
return new SegmentedDatagramPacket(content().retainedDuplicate(), segmentSize, recipient(), sender());
|
||||
}
|
||||
|
||||
@Override
|
||||
public SegmentedDatagramPacket replace(ByteBuf content) {
|
||||
return new SegmentedDatagramPacket(content, segmentSize, recipient(), sender());
|
||||
}
|
||||
|
||||
@Override
|
||||
public SegmentedDatagramPacket retain() {
|
||||
super.retain();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SegmentedDatagramPacket retain(int increment) {
|
||||
super.retain(increment);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SegmentedDatagramPacket touch() {
|
||||
super.touch();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SegmentedDatagramPacket touch(Object hint) {
|
||||
super.touch(hint);
|
||||
return this;
|
||||
}
|
||||
|
||||
private static ByteBuf checkByteBuf(ByteBuf buffer) {
|
||||
if (!buffer.isContiguous()) {
|
||||
throw new IllegalArgumentException("Buffer needs to be continguous");
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private static void checkIsSupported() {
|
||||
if (!isSupported()) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
}
|
@ -16,11 +16,26 @@
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.channel.socket.InternetProtocolFamily;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.DatagramUnicastTest;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class EpollDatagramUnicastTest extends DatagramUnicastTest {
|
||||
@Override
|
||||
@ -34,4 +49,64 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
|
||||
super.testSimpleSendWithConnect(sb, cb);
|
||||
sb.option(EpollChannelOption.IP_RECVORIGDSTADDR, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendSegmentedDatagramPacket() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb)
|
||||
throws Throwable {
|
||||
if (!(cb.group() instanceof EpollEventLoopGroup)) {
|
||||
// Only supported for the native epoll transport.
|
||||
return;
|
||||
}
|
||||
Assume.assumeTrue(SegmentedDatagramPacket.isSupported());
|
||||
Channel sc = null;
|
||||
Channel cc = null;
|
||||
|
||||
try {
|
||||
cb.handler(new SimpleChannelInboundHandler() {
|
||||
@Override
|
||||
public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception {
|
||||
// Nothing will be sent.
|
||||
}
|
||||
});
|
||||
|
||||
final SocketAddress sender;
|
||||
cc = cb.bind(newSocketAddress()).sync().channel();
|
||||
|
||||
final int segmentSize = 512;
|
||||
int bufferCapacity = 16 * segmentSize;
|
||||
final CountDownLatch latch = new CountDownLatch(bufferCapacity / segmentSize);
|
||||
AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
|
||||
sc = sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
|
||||
@Override
|
||||
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) {
|
||||
if (packet.content().readableBytes() == segmentSize) {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}).bind(newSocketAddress()).sync().channel();
|
||||
|
||||
InetSocketAddress addr = sendToAddress((InetSocketAddress) sc.localAddress());
|
||||
ByteBuf buffer = Unpooled.directBuffer(bufferCapacity).writeZero(bufferCapacity);
|
||||
cc.writeAndFlush(new SegmentedDatagramPacket(buffer, segmentSize, addr)).sync();
|
||||
|
||||
if (!latch.await(10, TimeUnit.SECONDS)) {
|
||||
Throwable error = errorRef.get();
|
||||
if (error != null) {
|
||||
throw error;
|
||||
}
|
||||
fail();
|
||||
}
|
||||
} finally {
|
||||
if (cc != null) {
|
||||
cc.close().sync();
|
||||
}
|
||||
if (sc != null) {
|
||||
sc.close().sync();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ import java.net.InetSocketAddress;
|
||||
/**
|
||||
* The message container that is used for {@link DatagramChannel} to communicate with the remote peer.
|
||||
*/
|
||||
public final class DatagramPacket
|
||||
public class DatagramPacket
|
||||
extends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> implements ByteBufHolder {
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user