Also support sendmmsg(...) on connected UDP channels when using native epoll transport (#9536)

Motivation:

We should also use sendmmsg on connected channels whenever possible to reduce the overhead of syscalls.

Modifications:

No matter if the channel is connected or not try to use sendmmsg when supported to reduce the overhead of syscalls

Result:

Better performance on connected UDP channels due less syscalls
This commit is contained in:
Norman Maurer 2019-09-06 20:57:04 +02:00 committed by GitHub
parent 6fc7c589f0
commit 7b7f319fec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 59 additions and 33 deletions

View File

@ -32,6 +32,8 @@ import org.junit.Test;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.NotYetConnectedException; import java.nio.channels.NotYetConnectedException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -242,25 +244,16 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
cc.connect(sc.localAddress()).syncUninterruptibly(); cc.connect(sc.localAddress()).syncUninterruptibly();
List<ChannelFuture> futures = new ArrayList<ChannelFuture>();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
switch (wrapType) { futures.add(write(cc, buf, wrapType));
case DUP:
cc.write(buf.retainedDuplicate());
break;
case SLICE:
cc.write(buf.retainedSlice());
break;
case READ_ONLY:
cc.write(buf.retain().asReadOnly());
break;
case NONE:
cc.write(buf.retain());
break;
default:
throw new Error("unknown wrap type: " + wrapType);
}
} }
cc.flush(); cc.flush();
for (ChannelFuture future: futures) {
future.sync();
}
assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(clientLatch.await(10, TimeUnit.SECONDS)); assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
assertTrue(cc.isConnected()); assertTrue(cc.isConnected());
@ -282,6 +275,21 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
} }
} }
private static ChannelFuture write(Channel cc, ByteBuf buf, WrapType wrapType) {
switch (wrapType) {
case DUP:
return cc.write(buf.retainedDuplicate());
case SLICE:
return cc.write(buf.retainedSlice());
case READ_ONLY:
return cc.write(buf.retain().asReadOnly());
case NONE:
return cc.write(buf.retain());
default:
throw new Error("unknown wrap type: " + wrapType);
}
}
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final SocketAddress sender, private Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final SocketAddress sender,
final CountDownLatch latch, final boolean echo) final CountDownLatch latch, final boolean echo)

View File

@ -319,16 +319,19 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset); jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset);
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId); jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId);
jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId); jint addrLen = (*env)->GetIntField(env, packet, packetAddrLenFieldId);
jint port = (*env)->GetIntField(env, packet, packetPortFieldId);
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr[i], &addrSize) == -1) { if (addrLen != 0) {
return -1; jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId);
jint port = (*env)->GetIntField(env, packet, packetPortFieldId);
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr[i], &addrSize) == -1) {
return -1;
}
msg[i].msg_hdr.msg_name = &addr[i];
msg[i].msg_hdr.msg_namelen = addrSize;
} }
msg[i].msg_hdr.msg_name = &addr[i];
msg[i].msg_hdr.msg_namelen = addrSize;
msg[i].msg_hdr.msg_iov = (struct iovec*) (intptr_t) (*env)->GetLongField(env, packet, packetMemoryAddressFieldId); msg[i].msg_hdr.msg_iov = (struct iovec*) (intptr_t) (*env)->GetLongField(env, packet, packetMemoryAddressFieldId);
msg[i].msg_hdr.msg_iovlen = (*env)->GetIntField(env, packet, packetCountFieldId); msg[i].msg_hdr.msg_iovlen = (*env)->GetIntField(env, packet, packetCountFieldId);
} }

View File

@ -311,7 +311,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
// Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+ // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) { if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
NativeDatagramPacketArray array = cleanDatagramPacketArray(); NativeDatagramPacketArray array = cleanDatagramPacketArray();
in.forEachFlushedMessage(array); array.add(in, isConnected());
int cnt = array.count(); int cnt = array.count();
if (cnt >= 1) { if (cnt >= 1) {

View File

@ -17,6 +17,7 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.unix.IovArray; import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.Limits; import io.netty.channel.unix.Limits;
@ -32,7 +33,7 @@ import static io.netty.channel.unix.NativeInetAddress.copyIpv4MappedIpv6Address;
/** /**
* Support <a href="http://linux.die.net/man/2/sendmmsg">sendmmsg(...)</a> on linux with GLIBC 2.14+ * Support <a href="http://linux.die.net/man/2/sendmmsg">sendmmsg(...)</a> on linux with GLIBC 2.14+
*/ */
final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessageProcessor { final class NativeDatagramPacketArray {
// Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call. // Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call.
private final NativeDatagramPacket[] packets = new NativeDatagramPacket[UIO_MAX_IOV]; private final NativeDatagramPacket[] packets = new NativeDatagramPacket[UIO_MAX_IOV];
@ -43,6 +44,7 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr
// temporary array to copy the ipv4 part of ipv6-mapped-ipv4 addresses and then create a Inet4Address out of it. // temporary array to copy the ipv4 part of ipv6-mapped-ipv4 addresses and then create a Inet4Address out of it.
private final byte[] ipv4Bytes = new byte[4]; private final byte[] ipv4Bytes = new byte[4];
private final MyMessageProcessor processor = new MyMessageProcessor();
private int count; private int count;
@ -52,11 +54,6 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr
} }
} }
private boolean addReadable(DatagramPacket packet) {
ByteBuf buf = packet.content();
return add0(buf, buf.readerIndex(), buf.readableBytes(), packet.recipient());
}
boolean addWritable(ByteBuf buf, int index, int len) { boolean addWritable(ByteBuf buf, int index, int len) {
return add0(buf, index, len, null); return add0(buf, index, len, null);
} }
@ -82,9 +79,9 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr
return true; return true;
} }
@Override void add(ChannelOutboundBuffer buffer, boolean connected) throws Exception {
public boolean processMessage(Object msg) { processor.connected = connected;
return msg instanceof DatagramPacket && addReadable((DatagramPacket) msg); buffer.forEachFlushedMessage(processor);
} }
/** /**
@ -110,6 +107,24 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr
iovArray.release(); iovArray.release();
} }
private final class MyMessageProcessor implements MessageProcessor {
private boolean connected;
@Override
public boolean processMessage(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf buf = packet.content();
return add0(buf, buf.readerIndex(), buf.readableBytes(), packet.recipient());
}
if (msg instanceof ByteBuf && connected) {
ByteBuf buf = (ByteBuf) msg;
return add0(buf, buf.readerIndex(), buf.readableBytes(), null);
}
return false;
}
}
/** /**
* Used to pass needed data to JNI. * Used to pass needed data to JNI.
*/ */