Read until all data is consumed when EOF is detected even if readPend… (#7961)

* Read until all data is consumed when EOF is detected even if readPending is false and auto-read is disabled.

Motivation:

We should better always notify the user of EOF even if the user did not request any data as otherwise we may never be notified when the remote peer closes the connection. This should be ok as the amount of extra data we may read and so fire through the pipeline is limited by SO_RECVBUF.

Modifications:

- Always drain the socket when EOF is detected.
- Add testcase

Result:

No risk for the user to be not notified of EOF.
This commit is contained in:
Norman Maurer 2018-05-24 20:29:29 +02:00 committed by GitHub
parent 19d1f4ea62
commit 030318e53c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 75 additions and 35 deletions

View File

@ -396,16 +396,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
final void epollInBefore() { maybeMoreDataToRead = false; } final void epollInBefore() { maybeMoreDataToRead = false; }
final void epollInFinally(ChannelConfig config) { final void epollInFinally(ChannelConfig config) {
maybeMoreDataToRead = allocHandle.isEdgeTriggered() && allocHandle.maybeMoreDataToRead(); maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
// Check if there is a readPending which was not processed yet.
// This could be for two reasons: if (allocHandle.isReceivedRdHup() || (readPending && maybeMoreDataToRead)) {
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
clearEpollIn();
} else if (readPending && maybeMoreDataToRead) {
// trigger a read again as there may be something left to read and because of epoll ET we // trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket // will not get notified again until we read everything from the socket
// //
@ -414,6 +407,14 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// to false before every read operation to prevent re-entry into epollInReady() we will not read from // to false before every read operation to prevent re-entry into epollInReady() we will not read from
// the underlying OS again unless the user happens to call read again. // the underlying OS again unless the user happens to call read again.
executeEpollInReadyRunnable(config); executeEpollInReadyRunnable(config);
} else if (!readPending && !config.isAutoRead()) {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
clearEpollIn();
} }
} }

View File

@ -52,10 +52,11 @@ class EpollRecvByteAllocatorHandle implements RecvByteBufAllocator.ExtendedHandl
* respect auto read we supporting reading to stop if auto read is off. It is expected that the * respect auto read we supporting reading to stop if auto read is off. It is expected that the
* {@link #EpollSocketChannel} implementations will track if we are in edgeTriggered mode and all data was not * {@link #EpollSocketChannel} implementations will track if we are in edgeTriggered mode and all data was not
* read, and will force a EPOLLIN ready event. * read, and will force a EPOLLIN ready event.
*
* It is assumed RDHUP is handled externally by checking {@link #isReceivedRdHup()}.
*/ */
return (isEdgeTriggered && lastBytesRead() > 0) || return (isEdgeTriggered && lastBytesRead() > 0) ||
(!isEdgeTriggered && lastBytesRead() == attemptedBytesRead()) || (!isEdgeTriggered && lastBytesRead() == attemptedBytesRead());
receivedRdHup;
} }
final void edgeTriggered(boolean edgeTriggered) { final void edgeTriggered(boolean edgeTriggered) {

View File

@ -405,15 +405,8 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
final void readReadyFinally(ChannelConfig config) { final void readReadyFinally(ChannelConfig config) {
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
// Check if there is a readPending which was not processed yet.
// This could be for two reasons: if (allocHandle.isReadEOF() || (readPending && maybeMoreDataToRead)) {
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
clearReadFilter0();
} else if (readPending && maybeMoreDataToRead) {
// trigger a read again as there may be something left to read and because of ET we // trigger a read again as there may be something left to read and because of ET we
// will not get notified again until we read everything from the socket // will not get notified again until we read everything from the socket
// //
@ -422,6 +415,14 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
// to false before every read operation to prevent re-entry into readReady() we will not read from // to false before every read operation to prevent re-entry into readReady() we will not read from
// the underlying OS again unless the user happens to call read again. // the underlying OS again unless the user happens to call read again.
executeReadReadyRunnable(config); executeReadReadyRunnable(config);
} else if (!readPending && !config.isAutoRead()) {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
clearReadFilter0();
} }
} }

View File

@ -103,6 +103,10 @@ final class KQueueRecvByteAllocatorHandle implements RecvByteBufAllocator.Extend
readEOF = true; readEOF = true;
} }
boolean isReadEOF() {
return readEOF;
}
void numberBytesPending(long numberBytesPending) { void numberBytesPending(long numberBytesPending) {
this.numberBytesPending = numberBytesPending; this.numberBytesPending = numberBytesPending;
} }
@ -116,9 +120,9 @@ final class KQueueRecvByteAllocatorHandle implements RecvByteBufAllocator.Extend
* channel. It is expected that the {@link #KQueueSocketChannel} implementations will track if all data was not * channel. It is expected that the {@link #KQueueSocketChannel} implementations will track if all data was not
* read, and will force a EVFILT_READ ready event. * read, and will force a EVFILT_READ ready event.
* *
* If EOF has been read we must read until we get an error. * It is assumed EOF is handled externally by checking {@link #isReadEOF()}.
*/ */
return numberBytesPending != 0 || readEOF; return numberBytesPending != 0;
} }
private int guess0() { private int guess0() {

View File

@ -25,6 +25,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import org.junit.Test; import org.junit.Test;
@ -41,7 +42,17 @@ public abstract class DetectPeerCloseWithoutReadTest {
protected abstract Class<? extends Channel> clientChannel(); protected abstract Class<? extends Channel> clientChannel();
@Test(timeout = 10000) @Test(timeout = 10000)
public void clientCloseWithoutServerReadIsDetected() throws InterruptedException { public void clientCloseWithoutServerReadIsDetectedNoExtraReadRequested() throws InterruptedException {
clientCloseWithoutServerReadIsDetected0(false);
}
@Test(timeout = 10000)
public void clientCloseWithoutServerReadIsDetectedExtraReadRequested() throws InterruptedException {
clientCloseWithoutServerReadIsDetected0(true);
}
private void clientCloseWithoutServerReadIsDetected0(final boolean extraReadRequested)
throws InterruptedException {
EventLoopGroup serverGroup = null; EventLoopGroup serverGroup = null;
EventLoopGroup clientGroup = null; EventLoopGroup clientGroup = null;
Channel serverChannel = null; Channel serverChannel = null;
@ -54,11 +65,15 @@ public abstract class DetectPeerCloseWithoutReadTest {
ServerBootstrap sb = new ServerBootstrap(); ServerBootstrap sb = new ServerBootstrap();
sb.group(serverGroup); sb.group(serverGroup);
sb.channel(serverChannel()); sb.channel(serverChannel());
// Ensure we read only one message per read() call and that we need multiple read()
// calls to consume everything.
sb.childOption(ChannelOption.AUTO_READ, false); sb.childOption(ChannelOption.AUTO_READ, false);
sb.childOption(ChannelOption.MAX_MESSAGES_PER_READ, 1);
sb.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(expectedBytes / 10));
sb.childHandler(new ChannelInitializer<Channel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) {
ch.pipeline().addLast(new TestHandler(bytesRead, latch)); ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch));
} }
}); });
@ -89,7 +104,16 @@ public abstract class DetectPeerCloseWithoutReadTest {
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void serverCloseWithoutClientReadIsDetected() throws InterruptedException { public void serverCloseWithoutClientReadIsDetectedNoExtraReadRequested() throws InterruptedException {
serverCloseWithoutClientReadIsDetected0(false);
}
@Test(timeout = 10000)
public void serverCloseWithoutClientReadIsDetectedExtraReadRequested() throws InterruptedException {
serverCloseWithoutClientReadIsDetected0(true);
}
private void serverCloseWithoutClientReadIsDetected0(final boolean extraReadRequested) throws InterruptedException {
EventLoopGroup serverGroup = null; EventLoopGroup serverGroup = null;
EventLoopGroup clientGroup = null; EventLoopGroup clientGroup = null;
Channel serverChannel = null; Channel serverChannel = null;
@ -105,10 +129,10 @@ public abstract class DetectPeerCloseWithoutReadTest {
sb.channel(serverChannel()); sb.channel(serverChannel());
sb.childHandler(new ChannelInitializer<Channel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer(expectedBytes); ByteBuf buf = ctx.alloc().buffer(expectedBytes);
buf.writerIndex(buf.writerIndex() + expectedBytes); buf.writerIndex(buf.writerIndex() + expectedBytes);
ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE); ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
@ -123,11 +147,15 @@ public abstract class DetectPeerCloseWithoutReadTest {
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
cb.group(serverGroup); cb.group(serverGroup);
cb.channel(clientChannel()); cb.channel(clientChannel());
// Ensure we read only one message per read() call and that we need multiple read()
// calls to consume everything.
cb.option(ChannelOption.AUTO_READ, false); cb.option(ChannelOption.AUTO_READ, false);
cb.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
cb.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(expectedBytes / 10));
cb.handler(new ChannelInitializer<Channel>() { cb.handler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new TestHandler(bytesRead, latch)); ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch));
} }
}); });
clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel(); clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
@ -152,22 +180,27 @@ public abstract class DetectPeerCloseWithoutReadTest {
private static final class TestHandler extends SimpleChannelInboundHandler<ByteBuf> { private static final class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final AtomicInteger bytesRead; private final AtomicInteger bytesRead;
private final boolean extraReadRequested;
private final CountDownLatch latch; private final CountDownLatch latch;
TestHandler(AtomicInteger bytesRead, CountDownLatch latch) { TestHandler(AtomicInteger bytesRead, boolean extraReadRequested, CountDownLatch latch) {
this.bytesRead = bytesRead; this.bytesRead = bytesRead;
this.extraReadRequested = extraReadRequested;
this.latch = latch; this.latch = latch;
} }
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
bytesRead.addAndGet(msg.readableBytes()); bytesRead.addAndGet(msg.readableBytes());
if (extraReadRequested) {
// Because autoread is off, we call read to consume all data until we detect the close. // Because autoread is off, we call read to consume all data until we detect the close.
ctx.read(); ctx.read();
} }
}
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) {
latch.countDown(); latch.countDown();
ctx.fireChannelInactive(); ctx.fireChannelInactive();
} }