diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index e958d70155..721f55bd92 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -32,6 +32,7 @@ import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.unix.DatagramSocketAddress; import io.netty.channel.unix.Errors; +import io.netty.channel.unix.Errors.NativeIoException; import io.netty.channel.unix.IovArray; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannelUtil; @@ -480,19 +481,27 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements do { ByteBuf byteBuf = allocHandle.allocate(allocator); final boolean read; - if (connected) { - read = connectedRead(allocHandle, byteBuf); - } else { - int datagramSize = config().getMaxDatagramPayloadSize(); - int numDatagram = datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize; + int datagramSize = config().getMaxDatagramPayloadSize(); + int numDatagram = datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize; + try { if (numDatagram <= 1) { - read = read(allocHandle, byteBuf, datagramSize); + if (connected) { + read = connectedRead(allocHandle, byteBuf, datagramSize); + } else { + read = read(allocHandle, byteBuf, datagramSize); + } } else { // Try to use scattering reads via recvmmsg(...) syscall. read = scatteringRead(allocHandle, byteBuf, datagramSize, numDatagram); } + } catch (NativeIoException e) { + if (connected) { + throw translateForConnected(e); + } + throw e; } + if (read) { readPending = false; } else { @@ -516,26 +525,33 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements } } - private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf) + private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize) throws Exception { try { - allocHandle.attemptedBytesRead(byteBuf.writableBytes()); + int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize) + : byteBuf.writableBytes(); + allocHandle.attemptedBytesRead(writable); - try { - allocHandle.lastBytesRead(doReadBytes(byteBuf)); - } catch (Errors.NativeIoException e) { - // We need to correctly translate connect errors to match NIO behaviour. - if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) { - PortUnreachableException error = new PortUnreachableException(e.getMessage()); - error.initCause(e); - throw error; - } - throw e; + int writerIndex = byteBuf.writerIndex(); + int localReadAmount; + if (byteBuf.hasMemoryAddress()) { + localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, writerIndex + writable); + } else { + ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, writable); + localReadAmount = socket.read(buf, buf.position(), buf.limit()); } - if (allocHandle.lastBytesRead() <= 0) { + + if (localReadAmount <= 0) { + allocHandle.lastBytesRead(localReadAmount); + // nothing was read, release the buffer. return false; } + byteBuf.writerIndex(writerIndex + localReadAmount); + + allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ? + localReadAmount : writable); + DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress()); allocHandle.incMessagesRead(1); @@ -549,6 +565,16 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements } } + private IOException translateForConnected(NativeIoException e) { + // We need to correctly translate connect errors to match NIO behaviour. + if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) { + PortUnreachableException error = new PortUnreachableException(e.getMessage()); + error.initCause(e); + return error; + } + return e; + } + private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException { RecyclableArrayList bufferPackets = null; @@ -565,6 +591,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex()); NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets(); + int received = socket.recvmmsg(packets, 0, array.count()); if (received == 0) { allocHandle.lastBytesRead(-1); diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramScatteringReadTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramScatteringReadTest.java index 869f6587ed..56269997d8 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramScatteringReadTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramScatteringReadTest.java @@ -54,7 +54,7 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest { } public void testScatteringReadPartial(Bootstrap sb, Bootstrap cb) throws Throwable { - testScatteringRead(sb, cb, true); + testScatteringRead(sb, cb, false, true); } @Test @@ -63,12 +63,31 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest { } public void testScatteringRead(Bootstrap sb, Bootstrap cb) throws Throwable { - testScatteringRead(sb, cb, false); + testScatteringRead(sb, cb, false, false); } - private void testScatteringRead(Bootstrap sb, Bootstrap cb, boolean partial) throws Throwable { + @Test + public void testScatteringReadConnectedPartial() throws Throwable { + run(); + } + + public void testScatteringReadConnectedPartial(Bootstrap sb, Bootstrap cb) throws Throwable { + testScatteringRead(sb, cb, true, true); + } + + @Test + public void testScatteringConnectedRead() throws Throwable { + run(); + } + + public void testScatteringConnectedRead(Bootstrap sb, Bootstrap cb) throws Throwable { + testScatteringRead(sb, cb, true, false); + } + + private void testScatteringRead(Bootstrap sb, Bootstrap cb, boolean connected, boolean partial) throws Throwable { int packetSize = 512; int numPackets = 4; + sb.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator( packetSize, packetSize * (partial ? numPackets / 2 : numPackets), 64 * 1024)); sb.option(EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE, packetSize); @@ -122,6 +141,10 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest { sb.option(ChannelOption.AUTO_READ, false); sc = sb.bind(newSocketAddress()).sync().channel(); + if (connected) { + sc.connect(cc.localAddress()).syncUninterruptibly(); + } + InetSocketAddress addr = (InetSocketAddress) sc.localAddress(); List futures = new ArrayList(numPackets); @@ -154,5 +177,4 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest { } } } - }