Correctly set writerIndex when EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE is used in all cases (#9819)

Motivation:

Due a bug we did not correctly set the writerIndex of the ByteBuf when a
user specified EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE but we ended
up with a non scattering read.

Modifications:

- Set writerIndex to the correct value
- Add unit tests

Result:

Fixes https://github.com/netty/netty/issues/9788
This commit is contained in:
Norman Maurer 2019-11-28 09:03:54 +01:00
parent aa3c800f12
commit 2d48ec4e3f
2 changed files with 92 additions and 2 deletions

View File

@ -671,9 +671,10 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
if (localAddress == null) {
localAddress = localAddress();
}
int received = remoteAddress.receivedAmount();
allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
remoteAddress.receivedAmount() : writable);
byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
received : writable);
byteBuf.writerIndex(writerIndex + received);
allocHandle.incMessagesRead(1);
pipeline().fireChannelRead(new DatagramPacket(byteBuf, localAddress, remoteAddress));

View File

@ -184,4 +184,93 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest {
}
}
}
@Test
public void testScatteringReadWithSmallBuffer() throws Throwable {
run();
}
public void testScatteringReadWithSmallBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
testScatteringReadWithSmallBuffer0(sb, cb, false);
}
@Test
public void testScatteringConnectedReadWithSmallBuffer() throws Throwable {
run();
}
public void testScatteringConnectedReadWithSmallBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
testScatteringReadWithSmallBuffer0(sb, cb, true);
}
private void testScatteringReadWithSmallBuffer0(Bootstrap sb, Bootstrap cb, boolean connected) throws Throwable {
int packetSize = 16;
sb.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1400, 1400, 64 * 1024));
sb.option(EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE, 1400);
Channel sc = null;
Channel cc = null;
try {
cb.handler(new SimpleChannelInboundHandler<Object>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msgs) {
// Nothing will be sent.
}
});
cc = cb.bind(newSocketAddress()).sync().channel();
final SocketAddress ccAddress = cc.localAddress();
final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
final byte[] bytes = new byte[packetSize];
PlatformDependent.threadLocalRandom().nextBytes(bytes);
final CountDownLatch latch = new CountDownLatch(1);
sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
assertEquals(ccAddress, msg.sender());
assertEquals(bytes.length, msg.content().readableBytes());
byte[] receivedBytes = new byte[bytes.length];
msg.content().readBytes(receivedBytes);
assertArrayEquals(bytes, receivedBytes);
latch.countDown();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
errorRef.compareAndSet(null, cause);
}
});
sc = sb.bind(newSocketAddress()).sync().channel();
if (connected) {
sc.connect(cc.localAddress()).syncUninterruptibly();
}
InetSocketAddress addr = (InetSocketAddress) sc.localAddress();
cc.writeAndFlush(new DatagramPacket(cc.alloc().directBuffer().writeBytes(bytes), addr)).sync();
if (!latch.await(10, TimeUnit.SECONDS)) {
Throwable error = errorRef.get();
if (error != null) {
throw error;
}
fail("Timeout while waiting for packets");
}
} finally {
if (cc != null) {
cc.close().syncUninterruptibly();
}
if (sc != null) {
sc.close().syncUninterruptibly();
}
}
}
}