EPOLLRDHUP prematurely marking input as shutdown

Motivation:
When the EPOLLRDHUP event is received we assume that the read side of the FD is no longer functional and force the input state to be shutdown. However if the channel is still active we should rely upon EPOLLIN and read to indicate there is no more data before we update the shutdown state. If we do not do this we may not read all pending data in the FD if the RecvByteBufAllocator doesn't want to consume it all in a single read operation.

Modifications:
- AbstractEpollChannel#epollRdHupReady() shouldn't force shutdown the input if the channel is active

Result:
All data can be read even if the RecvByteBufAllocator doesn't read it all in the current read loop.
Fixes https://github.com/netty/netty/issues/6303
This commit is contained in:
Scott Mitchell 2017-02-15 15:04:38 -08:00
parent 9ee4cc0ada
commit f3dd410142
2 changed files with 120 additions and 5 deletions

View File

@ -150,6 +150,120 @@ public class SocketHalfClosedTest extends AbstractSocketTest {
}
}
@Test
public void testAllDataReadClosure() throws Throwable {
run();
}
public void testAllDataReadClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testAllDataReadClosure(true, false, sb, cb);
testAllDataReadClosure(true, true, sb, cb);
testAllDataReadClosure(false, false, sb, cb);
testAllDataReadClosure(false, true, sb, cb);
}
public void testAllDataReadClosure(final boolean autoRead, final boolean allowHalfClosed,
ServerBootstrap sb, Bootstrap cb) throws Throwable {
final int totalServerBytesWritten = 1024 * 16;
final int numReadsPerReadLoop = 2;
final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
final AtomicInteger clientReadCompletes = new AtomicInteger();
Channel serverChannel = null;
Channel clientChannel = null;
try {
cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
.option(ChannelOption.AUTO_READ, autoRead)
.option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
buf.writerIndex(buf.capacity());
ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
serverInitializedLatch.countDown();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
});
}
});
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
private int bytesRead;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
bytesRead += buf.readableBytes();
buf.release();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt == ChannelInputShutdownEvent.INSTANCE && allowHalfClosed) {
clientHalfClosedLatch.countDown();
} else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
ctx.close();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (!allowHalfClosed) {
clientHalfClosedLatch.countDown();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
clientReadCompletes.incrementAndGet();
if (bytesRead == totalServerBytesWritten) {
clientReadAllDataLatch.countDown();
}
if (!autoRead) {
ctx.read();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
});
}
});
serverChannel = sb.bind().sync().channel();
clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
clientChannel.read();
serverInitializedLatch.await();
clientReadAllDataLatch.await();
clientHalfClosedLatch.await();
assertTrue("too many read complete events: " + clientReadCompletes.get(),
totalServerBytesWritten / numReadsPerReadLoop + 10 > clientReadCompletes.get());
} finally {
if (clientChannel != null) {
clientChannel.close().sync();
}
if (serverChannel != null) {
serverChannel.close().sync();
}
}
}
/**
* Designed to read a single byte at a time to control the number of reads done at a fine granularity.
*/

View File

@ -385,12 +385,13 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// read pending data from the underlying file descriptor.
// See https://github.com/netty/netty/issues/3709
epollInReady();
// Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
clearEpollRdHup();
} else {
// Just to be safe make sure the input marked as closed.
shutdownInput(true);
}
// epollInReady may call this, but we should ensure that it gets called.
shutdownInput(true);
// Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
clearEpollRdHup();
}
/**