Reduce GC produced by native DatagramChannel implementations when in connected mode. (#8806)

Motivation:

In the native code EpollDatagramChannel / KQueueDatagramChannel creates a DatagramSocketAddress object for each received UDP datagram even when in connected mode as it uses the recvfrom(...) / recvmsg(...)  method. Creating these is quite heavy in terms of allocations as internally, char[], String, Inet4Address, InetAddressHolder, InetSocketAddressHolder, InetAddress[], byte[] objects are getting generated when constructing the object. When in connected mode we can just use regular read(...) calls which do not need to allocate all of these.

Modifications:

- When in connected mode use read(...) and NOT recvfrom(..) / readmsg(...) to reduce allocations when possible.
- Adjust tests to ensure read works as expected when in connected mode.

Result:

Less allocations and GC when using native datagram channels in connected mode. Fixes https://github.com/netty/netty/issues/8770.
This commit is contained in:
Norman Maurer 2019-02-01 10:29:36 +01:00
parent 245cccd5e0
commit 99face616a
3 changed files with 135 additions and 60 deletions

View File

@ -158,7 +158,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
}); });
final CountDownLatch latch = new CountDownLatch(count); final CountDownLatch latch = new CountDownLatch(count);
sc = setupServerChannel(sb, bytes, latch); sc = setupServerChannel(sb, bytes, latch, false);
if (bindClient) { if (bindClient) {
cc = cb.bind(newSocketAddress()).sync().channel(); cc = cb.bind(newSocketAddress()).sync().channel();
} else { } else {
@ -209,10 +209,21 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
private void testSimpleSendWithConnect0(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count, private void testSimpleSendWithConnect0(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count,
WrapType wrapType) throws Throwable { WrapType wrapType) throws Throwable {
cb.handler(new SimpleChannelInboundHandler<Object>() { final CountDownLatch clientLatch = new CountDownLatch(count);
cb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception { public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
// Nothing will be sent. ByteBuf buf = msg.content();
assertEquals(bytes.length, buf.readableBytes());
for (int i = 0; i < bytes.length; i++) {
assertEquals(bytes[i], buf.getByte(buf.readerIndex() + i));
}
// Test that the channel's localAddress is equal to the message's recipient
assertEquals(ctx.channel().localAddress(), msg.recipient());
clientLatch.countDown();
} }
}); });
@ -220,7 +231,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
DatagramChannel cc = null; DatagramChannel cc = null;
try { try {
final CountDownLatch latch = new CountDownLatch(count); final CountDownLatch latch = new CountDownLatch(count);
sc = setupServerChannel(sb, bytes, latch); sc = setupServerChannel(sb, bytes, latch, true);
cc = (DatagramChannel) cb.connect(sc.localAddress()).sync().channel(); cc = (DatagramChannel) cb.connect(sc.localAddress()).sync().channel();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
@ -243,7 +254,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
} }
cc.flush(); cc.flush();
assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
assertTrue(cc.isConnected()); assertTrue(cc.isConnected());
// Test what happens when we call disconnect() // Test what happens when we call disconnect()
@ -264,7 +275,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final CountDownLatch latch) private Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final CountDownLatch latch, final boolean echo)
throws Throwable { throws Throwable {
sb.handler(new ChannelInitializer<Channel>() { sb.handler(new ChannelInitializer<Channel>() {
@Override @Override
@ -274,13 +285,16 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
ByteBuf buf = msg.content(); ByteBuf buf = msg.content();
assertEquals(bytes.length, buf.readableBytes()); assertEquals(bytes.length, buf.readableBytes());
for (byte b : bytes) { for (int i = 0; i < bytes.length; i++) {
assertEquals(b, buf.readByte()); assertEquals(bytes[i], buf.getByte(buf.readerIndex() + i));
} }
// Test that the channel's localAddress is equal to the message's recipient // Test that the channel's localAddress is equal to the message's recipient
assertEquals(ctx.channel().localAddress(), msg.recipient()); assertEquals(ctx.channel().localAddress(), msg.recipient());
if (echo) {
ctx.writeAndFlush(new DatagramPacket(buf.retainedDuplicate(), msg.sender()));
}
latch.countDown(); latch.countDown();
} }
}); });

View File

@ -29,6 +29,7 @@ import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.unix.DatagramSocketAddress; import io.netty.channel.unix.DatagramSocketAddress;
import io.netty.channel.unix.Errors;
import io.netty.channel.unix.IovArray; import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.UnixChannelUtil; import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
@ -37,6 +38,7 @@ import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import java.net.PortUnreachableException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -450,46 +452,72 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
Throwable exception = null; Throwable exception = null;
try { try {
ByteBuf data = null; ByteBuf byteBuf = null;
try { try {
boolean connected = isConnected();
do { do {
data = allocHandle.allocate(allocator); byteBuf = allocHandle.allocate(allocator);
allocHandle.attemptedBytesRead(data.writableBytes()); allocHandle.attemptedBytesRead(byteBuf.writableBytes());
final DatagramSocketAddress remoteAddress;
if (data.hasMemoryAddress()) { final DatagramPacket packet;
// has a memory address so use optimized call if (connected) {
remoteAddress = socket.recvFromAddress(data.memoryAddress(), data.writerIndex(), try {
data.capacity()); allocHandle.lastBytesRead(doReadBytes(byteBuf));
} catch (Errors.NativeIoException e) {
// We need to correctly translate connect errors to match NIO behaviour.
if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
PortUnreachableException error = new PortUnreachableException(e.getMessage());
error.initCause(e);
throw error;
}
throw e;
}
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
break;
}
packet = new DatagramPacket(
byteBuf, (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
} else { } else {
ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes()); final DatagramSocketAddress remoteAddress;
remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit()); if (byteBuf.hasMemoryAddress()) {
} // has a memory address so use optimized call
remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
byteBuf.capacity());
} else {
ByteBuffer nioData = byteBuf.internalNioBuffer(
byteBuf.writerIndex(), byteBuf.writableBytes());
remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
}
if (remoteAddress == null) { if (remoteAddress == null) {
allocHandle.lastBytesRead(-1); allocHandle.lastBytesRead(-1);
data.release(); byteBuf.release();
data = null; byteBuf = null;
break; break;
} }
InetSocketAddress localAddress = remoteAddress.localAddress();
if (localAddress == null) {
localAddress = (InetSocketAddress) localAddress();
}
allocHandle.lastBytesRead(remoteAddress.receivedAmount());
byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
InetSocketAddress localAddress = remoteAddress.localAddress(); packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
if (localAddress == null) {
localAddress = (InetSocketAddress) localAddress();
} }
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
allocHandle.lastBytesRead(remoteAddress.receivedAmount());
data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead());
readPending = false; readPending = false;
pipeline.fireChannelRead( pipeline.fireChannelRead(packet);
new DatagramPacket(data, localAddress, remoteAddress));
data = null; byteBuf = null;
} while (allocHandle.continueReading()); } while (allocHandle.continueReading());
} catch (Throwable t) { } catch (Throwable t) {
if (data != null) { if (byteBuf != null) {
data.release(); byteBuf.release();
} }
exception = t; exception = t;
} }

View File

@ -29,6 +29,7 @@ import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.unix.DatagramSocketAddress; import io.netty.channel.unix.DatagramSocketAddress;
import io.netty.channel.unix.Errors;
import io.netty.channel.unix.IovArray; import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.UnixChannelUtil; import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
@ -38,6 +39,7 @@ import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import java.net.PortUnreachableException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -419,41 +421,72 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
Throwable exception = null; Throwable exception = null;
try { try {
ByteBuf data = null; ByteBuf byteBuf = null;
try { try {
boolean connected = isConnected();
do { do {
data = allocHandle.allocate(allocator); byteBuf = allocHandle.allocate(allocator);
allocHandle.attemptedBytesRead(data.writableBytes()); allocHandle.attemptedBytesRead(byteBuf.writableBytes());
final DatagramSocketAddress remoteAddress;
if (data.hasMemoryAddress()) {
// has a memory address so use optimized call
remoteAddress = socket.recvFromAddress(data.memoryAddress(), data.writerIndex(),
data.capacity());
} else {
ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
}
if (remoteAddress == null) { final DatagramPacket packet;
allocHandle.lastBytesRead(-1); if (connected) {
data.release(); try {
data = null; allocHandle.lastBytesRead(doReadBytes(byteBuf));
break; } catch (Errors.NativeIoException e) {
// We need to correctly translate connect errors to match NIO behaviour.
if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
PortUnreachableException error = new PortUnreachableException(e.getMessage());
error.initCause(e);
throw error;
}
throw e;
}
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
break;
}
packet = new DatagramPacket(byteBuf,
(InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
} else {
final DatagramSocketAddress remoteAddress;
if (byteBuf.hasMemoryAddress()) {
// has a memory address so use optimized call
remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
byteBuf.capacity());
} else {
ByteBuffer nioData = byteBuf.internalNioBuffer(
byteBuf.writerIndex(), byteBuf.writableBytes());
remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
}
if (remoteAddress == null) {
allocHandle.lastBytesRead(-1);
byteBuf.release();
byteBuf = null;
break;
}
InetSocketAddress localAddress = remoteAddress.localAddress();
if (localAddress == null) {
localAddress = (InetSocketAddress) localAddress();
}
allocHandle.lastBytesRead(remoteAddress.receivedAmount());
byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
} }
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
allocHandle.lastBytesRead(remoteAddress.receivedAmount());
data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead());
readPending = false; readPending = false;
pipeline.fireChannelRead( pipeline.fireChannelRead(packet);
new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
data = null; byteBuf = null;
} while (allocHandle.continueReading()); } while (allocHandle.continueReading());
} catch (Throwable t) { } catch (Throwable t) {
if (data != null) { if (byteBuf != null) {
data.release(); byteBuf.release();
} }
exception = t; exception = t;
} }