Correctly set writerIndex when EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE is used in all cases (#9819)
Motivation: Due a bug we did not correctly set the writerIndex of the ByteBuf when a user specified EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE but we ended up with a non scattering read. Modifications: - Set writerIndex to the correct value - Add unit tests Result: Fixes https://github.com/netty/netty/issues/9788
This commit is contained in:
parent
4a6f5b19b7
commit
030ab560d0
@ -684,9 +684,10 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
if (localAddress == null) {
|
if (localAddress == null) {
|
||||||
localAddress = localAddress();
|
localAddress = localAddress();
|
||||||
}
|
}
|
||||||
|
int received = remoteAddress.receivedAmount();
|
||||||
allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
|
allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
|
||||||
remoteAddress.receivedAmount() : writable);
|
received : writable);
|
||||||
byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
|
byteBuf.writerIndex(writerIndex + received);
|
||||||
allocHandle.incMessagesRead(1);
|
allocHandle.incMessagesRead(1);
|
||||||
|
|
||||||
pipeline().fireChannelRead(new DatagramPacket(byteBuf, localAddress, remoteAddress));
|
pipeline().fireChannelRead(new DatagramPacket(byteBuf, localAddress, remoteAddress));
|
||||||
|
@ -184,4 +184,93 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScatteringReadWithSmallBuffer() throws Throwable {
|
||||||
|
run();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testScatteringReadWithSmallBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
|
testScatteringReadWithSmallBuffer0(sb, cb, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScatteringConnectedReadWithSmallBuffer() throws Throwable {
|
||||||
|
run();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testScatteringConnectedReadWithSmallBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
|
testScatteringReadWithSmallBuffer0(sb, cb, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testScatteringReadWithSmallBuffer0(Bootstrap sb, Bootstrap cb, boolean connected) throws Throwable {
|
||||||
|
int packetSize = 16;
|
||||||
|
|
||||||
|
sb.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1400, 1400, 64 * 1024));
|
||||||
|
sb.option(EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE, 1400);
|
||||||
|
|
||||||
|
Channel sc = null;
|
||||||
|
Channel cc = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cb.handler(new SimpleChannelInboundHandler<Object>() {
|
||||||
|
@Override
|
||||||
|
public void channelRead0(ChannelHandlerContext ctx, Object msgs) {
|
||||||
|
// Nothing will be sent.
|
||||||
|
}
|
||||||
|
});
|
||||||
|
cc = cb.bind(newSocketAddress()).sync().channel();
|
||||||
|
final SocketAddress ccAddress = cc.localAddress();
|
||||||
|
|
||||||
|
final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
|
||||||
|
final byte[] bytes = new byte[packetSize];
|
||||||
|
PlatformDependent.threadLocalRandom().nextBytes(bytes);
|
||||||
|
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
|
||||||
|
assertEquals(ccAddress, msg.sender());
|
||||||
|
|
||||||
|
assertEquals(bytes.length, msg.content().readableBytes());
|
||||||
|
byte[] receivedBytes = new byte[bytes.length];
|
||||||
|
msg.content().readBytes(receivedBytes);
|
||||||
|
assertArrayEquals(bytes, receivedBytes);
|
||||||
|
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||||
|
errorRef.compareAndSet(null, cause);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
sc = sb.bind(newSocketAddress()).sync().channel();
|
||||||
|
|
||||||
|
if (connected) {
|
||||||
|
sc.connect(cc.localAddress()).syncUninterruptibly();
|
||||||
|
}
|
||||||
|
|
||||||
|
InetSocketAddress addr = (InetSocketAddress) sc.localAddress();
|
||||||
|
|
||||||
|
cc.writeAndFlush(new DatagramPacket(cc.alloc().directBuffer().writeBytes(bytes), addr)).sync();
|
||||||
|
|
||||||
|
if (!latch.await(10, TimeUnit.SECONDS)) {
|
||||||
|
Throwable error = errorRef.get();
|
||||||
|
if (error != null) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
fail("Timeout while waiting for packets");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (cc != null) {
|
||||||
|
cc.close().syncUninterruptibly();
|
||||||
|
}
|
||||||
|
if (sc != null) {
|
||||||
|
sc.close().syncUninterruptibly();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user