Add support for recvmmsg(...) even with connected datagram channels w… (#9539)

Motivation:

394a1b3485 added support for recvmmsg(...) for unconnected datagram channels, this change also allows to use recvmmsg(...) with connected datagram channels.

Modifications:

- Always try to use recvmmsg(...) if configured to do so
- Adjust unit test to cover it

Result:

Less syscalls when reading datagram packets
This commit is contained in:
Norman Maurer 2019-09-06 20:58:38 +02:00
parent 82c330e8a5
commit acaa29f508
2 changed files with 72 additions and 23 deletions

View File

@ -32,6 +32,7 @@ import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.unix.DatagramSocketAddress; import io.netty.channel.unix.DatagramSocketAddress;
import io.netty.channel.unix.Errors; import io.netty.channel.unix.Errors;
import io.netty.channel.unix.Errors.NativeIoException;
import io.netty.channel.unix.IovArray; import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.Socket; import io.netty.channel.unix.Socket;
import io.netty.channel.unix.UnixChannelUtil; import io.netty.channel.unix.UnixChannelUtil;
@ -480,19 +481,27 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
do { do {
ByteBuf byteBuf = allocHandle.allocate(allocator); ByteBuf byteBuf = allocHandle.allocate(allocator);
final boolean read; final boolean read;
if (connected) { int datagramSize = config().getMaxDatagramPayloadSize();
read = connectedRead(allocHandle, byteBuf); int numDatagram = datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize;
} else {
int datagramSize = config().getMaxDatagramPayloadSize();
int numDatagram = datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize;
try {
if (numDatagram <= 1) { if (numDatagram <= 1) {
read = read(allocHandle, byteBuf, datagramSize); if (connected) {
read = connectedRead(allocHandle, byteBuf, datagramSize);
} else {
read = read(allocHandle, byteBuf, datagramSize);
}
} else { } else {
// Try to use scattering reads via recvmmsg(...) syscall. // Try to use scattering reads via recvmmsg(...) syscall.
read = scatteringRead(allocHandle, byteBuf, datagramSize, numDatagram); read = scatteringRead(allocHandle, byteBuf, datagramSize, numDatagram);
} }
} catch (NativeIoException e) {
if (connected) {
throw translateForConnected(e);
}
throw e;
} }
if (read) { if (read) {
readPending = false; readPending = false;
} else { } else {
@ -516,26 +525,33 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
} }
} }
private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf) private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
throws Exception { throws Exception {
try { try {
allocHandle.attemptedBytesRead(byteBuf.writableBytes()); int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
: byteBuf.writableBytes();
allocHandle.attemptedBytesRead(writable);
try { int writerIndex = byteBuf.writerIndex();
allocHandle.lastBytesRead(doReadBytes(byteBuf)); int localReadAmount;
} catch (Errors.NativeIoException e) { if (byteBuf.hasMemoryAddress()) {
// We need to correctly translate connect errors to match NIO behaviour. localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) { } else {
PortUnreachableException error = new PortUnreachableException(e.getMessage()); ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, writable);
error.initCause(e); localReadAmount = socket.read(buf, buf.position(), buf.limit());
throw error;
}
throw e;
} }
if (allocHandle.lastBytesRead() <= 0) {
if (localReadAmount <= 0) {
allocHandle.lastBytesRead(localReadAmount);
// nothing was read, release the buffer. // nothing was read, release the buffer.
return false; return false;
} }
byteBuf.writerIndex(writerIndex + localReadAmount);
allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
localReadAmount : writable);
DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress()); DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress());
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
@ -549,6 +565,16 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
} }
} }
private IOException translateForConnected(NativeIoException e) {
// We need to correctly translate connect errors to match NIO behaviour.
if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
PortUnreachableException error = new PortUnreachableException(e.getMessage());
error.initCause(e);
return error;
}
return e;
}
private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle,
ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException { ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
RecyclableArrayList bufferPackets = null; RecyclableArrayList bufferPackets = null;
@ -565,6 +591,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex()); allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex());
NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets(); NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
int received = socket.recvmmsg(packets, 0, array.count()); int received = socket.recvmmsg(packets, 0, array.count());
if (received == 0) { if (received == 0) {
allocHandle.lastBytesRead(-1); allocHandle.lastBytesRead(-1);

View File

@ -54,7 +54,7 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest {
} }
public void testScatteringReadPartial(Bootstrap sb, Bootstrap cb) throws Throwable { public void testScatteringReadPartial(Bootstrap sb, Bootstrap cb) throws Throwable {
testScatteringRead(sb, cb, true); testScatteringRead(sb, cb, false, true);
} }
@Test @Test
@ -63,12 +63,31 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest {
} }
public void testScatteringRead(Bootstrap sb, Bootstrap cb) throws Throwable { public void testScatteringRead(Bootstrap sb, Bootstrap cb) throws Throwable {
testScatteringRead(sb, cb, false); testScatteringRead(sb, cb, false, false);
} }
private void testScatteringRead(Bootstrap sb, Bootstrap cb, boolean partial) throws Throwable { @Test
public void testScatteringReadConnectedPartial() throws Throwable {
run();
}
public void testScatteringReadConnectedPartial(Bootstrap sb, Bootstrap cb) throws Throwable {
testScatteringRead(sb, cb, true, true);
}
@Test
public void testScatteringConnectedRead() throws Throwable {
run();
}
public void testScatteringConnectedRead(Bootstrap sb, Bootstrap cb) throws Throwable {
testScatteringRead(sb, cb, true, false);
}
private void testScatteringRead(Bootstrap sb, Bootstrap cb, boolean connected, boolean partial) throws Throwable {
int packetSize = 512; int packetSize = 512;
int numPackets = 4; int numPackets = 4;
sb.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator( sb.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(
packetSize, packetSize * (partial ? numPackets / 2 : numPackets), 64 * 1024)); packetSize, packetSize * (partial ? numPackets / 2 : numPackets), 64 * 1024));
sb.option(EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE, packetSize); sb.option(EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE, packetSize);
@ -122,6 +141,10 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest {
sb.option(ChannelOption.AUTO_READ, false); sb.option(ChannelOption.AUTO_READ, false);
sc = sb.bind(newSocketAddress()).sync().channel(); sc = sb.bind(newSocketAddress()).sync().channel();
if (connected) {
sc.connect(cc.localAddress()).syncUninterruptibly();
}
InetSocketAddress addr = (InetSocketAddress) sc.localAddress(); InetSocketAddress addr = (InetSocketAddress) sc.localAddress();
List<ChannelFuture> futures = new ArrayList<ChannelFuture>(numPackets); List<ChannelFuture> futures = new ArrayList<ChannelFuture>(numPackets);
@ -154,5 +177,4 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest {
} }
} }
} }
} }