diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 06c9f4b39e..a37fb776f7 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -684,9 +684,10 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements if (localAddress == null) { localAddress = localAddress(); } + int received = remoteAddress.receivedAmount(); allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ? - remoteAddress.receivedAmount() : writable); - byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead()); + received : writable); + byteBuf.writerIndex(writerIndex + received); allocHandle.incMessagesRead(1); pipeline().fireChannelRead(new DatagramPacket(byteBuf, localAddress, remoteAddress)); diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramScatteringReadTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramScatteringReadTest.java index 6cb62808ed..0c082bd490 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramScatteringReadTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramScatteringReadTest.java @@ -184,4 +184,93 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest { } } } + + @Test + public void testScatteringReadWithSmallBuffer() throws Throwable { + run(); + } + + public void testScatteringReadWithSmallBuffer(Bootstrap sb, Bootstrap cb) throws Throwable { + testScatteringReadWithSmallBuffer0(sb, cb, false); + } + + @Test + public void testScatteringConnectedReadWithSmallBuffer() throws Throwable { + run(); + } + + public void testScatteringConnectedReadWithSmallBuffer(Bootstrap sb, Bootstrap cb) throws Throwable { + testScatteringReadWithSmallBuffer0(sb, cb, true); + } + + private void testScatteringReadWithSmallBuffer0(Bootstrap sb, Bootstrap cb, boolean connected) throws Throwable { + int packetSize = 16; + + sb.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1400, 1400, 64 * 1024)); + sb.option(EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE, 1400); + + Channel sc = null; + Channel cc = null; + + try { + cb.handler(new SimpleChannelInboundHandler() { + @Override + public void channelRead0(ChannelHandlerContext ctx, Object msgs) { + // Nothing will be sent. + } + }); + cc = cb.bind(newSocketAddress()).sync().channel(); + final SocketAddress ccAddress = cc.localAddress(); + + final AtomicReference errorRef = new AtomicReference(); + final byte[] bytes = new byte[packetSize]; + PlatformDependent.threadLocalRandom().nextBytes(bytes); + + final CountDownLatch latch = new CountDownLatch(1); + sb.handler(new SimpleChannelInboundHandler() { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) { + assertEquals(ccAddress, msg.sender()); + + assertEquals(bytes.length, msg.content().readableBytes()); + byte[] receivedBytes = new byte[bytes.length]; + msg.content().readBytes(receivedBytes); + assertArrayEquals(bytes, receivedBytes); + + latch.countDown(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + errorRef.compareAndSet(null, cause); + } + }); + + sc = sb.bind(newSocketAddress()).sync().channel(); + + if (connected) { + sc.connect(cc.localAddress()).syncUninterruptibly(); + } + + InetSocketAddress addr = (InetSocketAddress) sc.localAddress(); + + cc.writeAndFlush(new DatagramPacket(cc.alloc().directBuffer().writeBytes(bytes), addr)).sync(); + + if (!latch.await(10, TimeUnit.SECONDS)) { + Throwable error = errorRef.get(); + if (error != null) { + throw error; + } + fail("Timeout while waiting for packets"); + } + } finally { + if (cc != null) { + cc.close().syncUninterruptibly(); + } + if (sc != null) { + sc.close().syncUninterruptibly(); + } + } + } }