Also support sendmmsg(...) on connected UDP channels when using native epoll transport (#9536)

Motivation:

We should also use sendmmsg on connected channels whenever possible to reduce the overhead of syscalls.

Modifications:

No matter if the channel is connected or not try to use sendmmsg when supported to reduce the overhead of syscalls

Result:

Better performance on connected UDP channels due less syscalls
This commit is contained in:
Norman Maurer 2019-09-06 20:57:04 +02:00
parent d57a5f5d4f
commit 82c330e8a5
4 changed files with 59 additions and 33 deletions

View File

@ -32,6 +32,8 @@ import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.NotYetConnectedException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -242,25 +244,16 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
cc.connect(sc.localAddress()).syncUninterruptibly();
List<ChannelFuture> futures = new ArrayList<ChannelFuture>();
for (int i = 0; i < count; i++) {
switch (wrapType) {
case DUP:
cc.write(buf.retainedDuplicate());
break;
case SLICE:
cc.write(buf.retainedSlice());
break;
case READ_ONLY:
cc.write(buf.retain().asReadOnly());
break;
case NONE:
cc.write(buf.retain());
break;
default:
throw new Error("unknown wrap type: " + wrapType);
}
futures.add(write(cc, buf, wrapType));
}
cc.flush();
for (ChannelFuture future: futures) {
future.sync();
}
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
assertTrue(cc.isConnected());
@ -282,6 +275,21 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
}
}
private static ChannelFuture write(Channel cc, ByteBuf buf, WrapType wrapType) {
switch (wrapType) {
case DUP:
return cc.write(buf.retainedDuplicate());
case SLICE:
return cc.write(buf.retainedSlice());
case READ_ONLY:
return cc.write(buf.retain().asReadOnly());
case NONE:
return cc.write(buf.retain());
default:
throw new Error("unknown wrap type: " + wrapType);
}
}
@SuppressWarnings("deprecation")
private Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final SocketAddress sender,
final CountDownLatch latch, final boolean echo)

View File

@ -295,16 +295,19 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset);
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId);
jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId);
jint port = (*env)->GetIntField(env, packet, packetPortFieldId);
jint addrLen = (*env)->GetIntField(env, packet, packetAddrLenFieldId);
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr[i], &addrSize) == -1) {
return -1;
if (addrLen != 0) {
jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId);
jint port = (*env)->GetIntField(env, packet, packetPortFieldId);
if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr[i], &addrSize) == -1) {
return -1;
}
msg[i].msg_hdr.msg_name = &addr[i];
msg[i].msg_hdr.msg_namelen = addrSize;
}
msg[i].msg_hdr.msg_name = &addr[i];
msg[i].msg_hdr.msg_namelen = addrSize;
msg[i].msg_hdr.msg_iov = (struct iovec*) (intptr_t) (*env)->GetLongField(env, packet, packetMemoryAddressFieldId);
msg[i].msg_hdr.msg_iovlen = (*env)->GetIntField(env, packet, packetCountFieldId);
}

View File

@ -297,7 +297,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
// Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
NativeDatagramPacketArray array = cleanDatagramPacketArray();
in.forEachFlushedMessage(array);
array.add(in, isConnected());
int cnt = array.count();
if (cnt >= 1) {

View File

@ -17,6 +17,7 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.Limits;
@ -32,7 +33,7 @@ import static io.netty.channel.unix.NativeInetAddress.copyIpv4MappedIpv6Address;
/**
* Support <a href="http://linux.die.net/man/2/sendmmsg">sendmmsg(...)</a> on linux with GLIBC 2.14+
*/
final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessageProcessor {
final class NativeDatagramPacketArray {
// Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call.
private final NativeDatagramPacket[] packets = new NativeDatagramPacket[UIO_MAX_IOV];
@ -43,6 +44,7 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr
// temporary array to copy the ipv4 part of ipv6-mapped-ipv4 addresses and then create a Inet4Address out of it.
private final byte[] ipv4Bytes = new byte[4];
private final MyMessageProcessor processor = new MyMessageProcessor();
private int count;
@ -52,11 +54,6 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr
}
}
private boolean addReadable(DatagramPacket packet) {
ByteBuf buf = packet.content();
return add0(buf, buf.readerIndex(), buf.readableBytes(), packet.recipient());
}
boolean addWritable(ByteBuf buf, int index, int len) {
return add0(buf, index, len, null);
}
@ -82,9 +79,9 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr
return true;
}
@Override
public boolean processMessage(Object msg) {
return msg instanceof DatagramPacket && addReadable((DatagramPacket) msg);
void add(ChannelOutboundBuffer buffer, boolean connected) throws Exception {
processor.connected = connected;
buffer.forEachFlushedMessage(processor);
}
/**
@ -110,6 +107,24 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr
iovArray.release();
}
private final class MyMessageProcessor implements MessageProcessor {
private boolean connected;
@Override
public boolean processMessage(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf buf = packet.content();
return add0(buf, buf.readerIndex(), buf.readableBytes(), packet.recipient());
}
if (msg instanceof ByteBuf && connected) {
ByteBuf buf = (ByteBuf) msg;
return add0(buf, buf.readerIndex(), buf.readableBytes(), null);
}
return false;
}
}
/**
* Used to pass needed data to JNI.
*/