Reduce GC produced by native DatagramChannel implementations when in connected mode. (#8806)

Motivation:

In the native code EpollDatagramChannel / KQueueDatagramChannel creates a DatagramSocketAddress object for each received UDP datagram even when in connected mode as it uses the recvfrom(...) / recvmsg(...)  method. Creating these is quite heavy in terms of allocations as internally, char[], String, Inet4Address, InetAddressHolder, InetSocketAddressHolder, InetAddress[], byte[] objects are getting generated when constructing the object. When in connected mode we can just use regular read(...) calls which do not need to allocate all of these.

Modifications:

- When in connected mode use read(...) and NOT recvfrom(..) / readmsg(...) to reduce allocations when possible.
- Adjust tests to ensure read works as expected when in connected mode.

Result:

Less allocations and GC when using native datagram channels in connected mode. Fixes https://github.com/netty/netty/issues/8770.
This commit is contained in:
Norman Maurer 2019-02-01 10:29:36 +01:00 committed by GitHub
parent ad922fa47e
commit 7bba4f49cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 135 additions and 60 deletions

View File

@ -158,7 +158,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
});
final CountDownLatch latch = new CountDownLatch(count);
sc = setupServerChannel(sb, bytes, latch);
sc = setupServerChannel(sb, bytes, latch, false);
if (bindClient) {
cc = cb.bind(newSocketAddress()).sync().channel();
} else {
@ -209,10 +209,21 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
private void testSimpleSendWithConnect0(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count,
WrapType wrapType) throws Throwable {
cb.handler(new SimpleChannelInboundHandler<Object>() {
final CountDownLatch clientLatch = new CountDownLatch(count);
cb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception {
// Nothing will be sent.
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
ByteBuf buf = msg.content();
assertEquals(bytes.length, buf.readableBytes());
for (int i = 0; i < bytes.length; i++) {
assertEquals(bytes[i], buf.getByte(buf.readerIndex() + i));
}
// Test that the channel's localAddress is equal to the message's recipient
assertEquals(ctx.channel().localAddress(), msg.recipient());
clientLatch.countDown();
}
});
@ -220,7 +231,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
DatagramChannel cc = null;
try {
final CountDownLatch latch = new CountDownLatch(count);
sc = setupServerChannel(sb, bytes, latch);
sc = setupServerChannel(sb, bytes, latch, true);
cc = (DatagramChannel) cb.connect(sc.localAddress()).sync().channel();
for (int i = 0; i < count; i++) {
@ -243,7 +254,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
}
cc.flush();
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
assertTrue(cc.isConnected());
// Test what happens when we call disconnect()
@ -264,7 +275,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
}
@SuppressWarnings("deprecation")
private Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final CountDownLatch latch)
private Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final CountDownLatch latch, final boolean echo)
throws Throwable {
sb.handler(new ChannelInitializer<Channel>() {
@Override
@ -274,13 +285,16 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
ByteBuf buf = msg.content();
assertEquals(bytes.length, buf.readableBytes());
for (byte b : bytes) {
assertEquals(b, buf.readByte());
for (int i = 0; i < bytes.length; i++) {
assertEquals(bytes[i], buf.getByte(buf.readerIndex() + i));
}
// Test that the channel's localAddress is equal to the message's recipient
assertEquals(ctx.channel().localAddress(), msg.recipient());
if (echo) {
ctx.writeAndFlush(new DatagramPacket(buf.retainedDuplicate(), msg.sender()));
}
latch.countDown();
}
});

View File

@ -28,6 +28,7 @@ import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.unix.DatagramSocketAddress;
import io.netty.channel.unix.Errors;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.StringUtil;
@ -36,6 +37,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.PortUnreachableException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
@ -449,46 +451,72 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
Throwable exception = null;
try {
ByteBuf data = null;
ByteBuf byteBuf = null;
try {
boolean connected = isConnected();
do {
data = allocHandle.allocate(allocator);
allocHandle.attemptedBytesRead(data.writableBytes());
final DatagramSocketAddress remoteAddress;
if (data.hasMemoryAddress()) {
// has a memory address so use optimized call
remoteAddress = socket.recvFromAddress(data.memoryAddress(), data.writerIndex(),
data.capacity());
byteBuf = allocHandle.allocate(allocator);
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
final DatagramPacket packet;
if (connected) {
try {
allocHandle.lastBytesRead(doReadBytes(byteBuf));
} catch (Errors.NativeIoException e) {
// We need to correctly translate connect errors to match NIO behaviour.
if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
PortUnreachableException error = new PortUnreachableException(e.getMessage());
error.initCause(e);
throw error;
}
throw e;
}
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
break;
}
packet = new DatagramPacket(
byteBuf, (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
} else {
ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
}
final DatagramSocketAddress remoteAddress;
if (byteBuf.hasMemoryAddress()) {
// has a memory address so use optimized call
remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
byteBuf.capacity());
} else {
ByteBuffer nioData = byteBuf.internalNioBuffer(
byteBuf.writerIndex(), byteBuf.writableBytes());
remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
}
if (remoteAddress == null) {
allocHandle.lastBytesRead(-1);
data.release();
data = null;
break;
}
if (remoteAddress == null) {
allocHandle.lastBytesRead(-1);
byteBuf.release();
byteBuf = null;
break;
}
InetSocketAddress localAddress = remoteAddress.localAddress();
if (localAddress == null) {
localAddress = (InetSocketAddress) localAddress();
}
allocHandle.lastBytesRead(remoteAddress.receivedAmount());
byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
InetSocketAddress localAddress = remoteAddress.localAddress();
if (localAddress == null) {
localAddress = (InetSocketAddress) localAddress();
packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
}
allocHandle.incMessagesRead(1);
allocHandle.lastBytesRead(remoteAddress.receivedAmount());
data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead());
readPending = false;
pipeline.fireChannelRead(
new DatagramPacket(data, localAddress, remoteAddress));
pipeline.fireChannelRead(packet);
data = null;
byteBuf = null;
} while (allocHandle.continueReading());
} catch (Throwable t) {
if (data != null) {
data.release();
if (byteBuf != null) {
byteBuf.release();
}
exception = t;
}

View File

@ -28,6 +28,7 @@ import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.unix.DatagramSocketAddress;
import io.netty.channel.unix.Errors;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.StringUtil;
@ -37,6 +38,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.PortUnreachableException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
@ -420,41 +422,72 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
Throwable exception = null;
try {
ByteBuf data = null;
ByteBuf byteBuf = null;
try {
boolean connected = isConnected();
do {
data = allocHandle.allocate(allocator);
allocHandle.attemptedBytesRead(data.writableBytes());
final DatagramSocketAddress remoteAddress;
if (data.hasMemoryAddress()) {
// has a memory address so use optimized call
remoteAddress = socket.recvFromAddress(data.memoryAddress(), data.writerIndex(),
data.capacity());
} else {
ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
}
byteBuf = allocHandle.allocate(allocator);
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
if (remoteAddress == null) {
allocHandle.lastBytesRead(-1);
data.release();
data = null;
break;
final DatagramPacket packet;
if (connected) {
try {
allocHandle.lastBytesRead(doReadBytes(byteBuf));
} catch (Errors.NativeIoException e) {
// We need to correctly translate connect errors to match NIO behaviour.
if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
PortUnreachableException error = new PortUnreachableException(e.getMessage());
error.initCause(e);
throw error;
}
throw e;
}
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
break;
}
packet = new DatagramPacket(byteBuf,
(InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
} else {
final DatagramSocketAddress remoteAddress;
if (byteBuf.hasMemoryAddress()) {
// has a memory address so use optimized call
remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
byteBuf.capacity());
} else {
ByteBuffer nioData = byteBuf.internalNioBuffer(
byteBuf.writerIndex(), byteBuf.writableBytes());
remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
}
if (remoteAddress == null) {
allocHandle.lastBytesRead(-1);
byteBuf.release();
byteBuf = null;
break;
}
InetSocketAddress localAddress = remoteAddress.localAddress();
if (localAddress == null) {
localAddress = (InetSocketAddress) localAddress();
}
allocHandle.lastBytesRead(remoteAddress.receivedAmount());
byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
}
allocHandle.incMessagesRead(1);
allocHandle.lastBytesRead(remoteAddress.receivedAmount());
data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead());
readPending = false;
pipeline.fireChannelRead(
new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
pipeline.fireChannelRead(packet);
data = null;
byteBuf = null;
} while (allocHandle.continueReading());
} catch (Throwable t) {
if (data != null) {
data.release();
if (byteBuf != null) {
byteBuf.release();
}
exception = t;
}