diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index c1af9975cd..03574b9715 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -396,16 +396,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann final void epollInBefore() { maybeMoreDataToRead = false; } final void epollInFinally(ChannelConfig config) { - maybeMoreDataToRead = allocHandle.isEdgeTriggered() && allocHandle.maybeMoreDataToRead(); - // Check if there is a readPending which was not processed yet. - // This could be for two reasons: - // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method - // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method - // - // See https://github.com/netty/netty/issues/2254 - if (!readPending && !config.isAutoRead()) { - clearEpollIn(); - } else if (readPending && maybeMoreDataToRead) { + maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); + + if (allocHandle.isReceivedRdHup() || (readPending && maybeMoreDataToRead)) { // trigger a read again as there may be something left to read and because of epoll ET we // will not get notified again until we read everything from the socket // @@ -414,6 +407,14 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // to false before every read operation to prevent re-entry into epollInReady() we will not read from // the underlying OS again unless the user happens to call read again. executeEpollInReadyRunnable(config); + } else if (!readPending && !config.isAutoRead()) { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + clearEpollIn(); } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java index 5c2c87c0c5..688cf4f7e9 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java @@ -52,10 +52,11 @@ class EpollRecvByteAllocatorHandle implements RecvByteBufAllocator.ExtendedHandl * respect auto read we supporting reading to stop if auto read is off. It is expected that the * {@link #EpollSocketChannel} implementations will track if we are in edgeTriggered mode and all data was not * read, and will force a EPOLLIN ready event. + * + * It is assumed RDHUP is handled externally by checking {@link #isReceivedRdHup()}. */ return (isEdgeTriggered && lastBytesRead() > 0) || - (!isEdgeTriggered && lastBytesRead() == attemptedBytesRead()) || - receivedRdHup; + (!isEdgeTriggered && lastBytesRead() == attemptedBytesRead()); } final void edgeTriggered(boolean edgeTriggered) { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index 698e59c7f1..7ca2bd9f1c 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -405,15 +405,8 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan final void readReadyFinally(ChannelConfig config) { maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); - // Check if there is a readPending which was not processed yet. - // This could be for two reasons: - // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method - // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method - // - // See https://github.com/netty/netty/issues/2254 - if (!readPending && !config.isAutoRead()) { - clearReadFilter0(); - } else if (readPending && maybeMoreDataToRead) { + + if (allocHandle.isReadEOF() || (readPending && maybeMoreDataToRead)) { // trigger a read again as there may be something left to read and because of ET we // will not get notified again until we read everything from the socket // @@ -422,6 +415,14 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan // to false before every read operation to prevent re-entry into readReady() we will not read from // the underlying OS again unless the user happens to call read again. executeReadReadyRunnable(config); + } else if (!readPending && !config.isAutoRead()) { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + clearReadFilter0(); } } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueRecvByteAllocatorHandle.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueRecvByteAllocatorHandle.java index e192ae5560..087881f461 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueRecvByteAllocatorHandle.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueRecvByteAllocatorHandle.java @@ -103,6 +103,10 @@ final class KQueueRecvByteAllocatorHandle implements RecvByteBufAllocator.Extend readEOF = true; } + boolean isReadEOF() { + return readEOF; + } + void numberBytesPending(long numberBytesPending) { this.numberBytesPending = numberBytesPending; } @@ -116,9 +120,9 @@ final class KQueueRecvByteAllocatorHandle implements RecvByteBufAllocator.Extend * channel. It is expected that the {@link #KQueueSocketChannel} implementations will track if all data was not * read, and will force a EVFILT_READ ready event. * - * If EOF has been read we must read until we get an error. + * It is assumed EOF is handled externally by checking {@link #isReadEOF()}. */ - return numberBytesPending != 0 || readEOF; + return numberBytesPending != 0; } private int guess0() { diff --git a/transport-native-unix-common-tests/src/main/java/io/netty/channel/unix/tests/DetectPeerCloseWithoutReadTest.java b/transport-native-unix-common-tests/src/main/java/io/netty/channel/unix/tests/DetectPeerCloseWithoutReadTest.java index ef5482d11b..a8b231f5e1 100644 --- a/transport-native-unix-common-tests/src/main/java/io/netty/channel/unix/tests/DetectPeerCloseWithoutReadTest.java +++ b/transport-native-unix-common-tests/src/main/java/io/netty/channel/unix/tests/DetectPeerCloseWithoutReadTest.java @@ -25,6 +25,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.ServerChannel; import io.netty.channel.SimpleChannelInboundHandler; import org.junit.Test; @@ -41,7 +42,17 @@ public abstract class DetectPeerCloseWithoutReadTest { protected abstract Class clientChannel(); @Test(timeout = 10000) - public void clientCloseWithoutServerReadIsDetected() throws InterruptedException { + public void clientCloseWithoutServerReadIsDetectedNoExtraReadRequested() throws InterruptedException { + clientCloseWithoutServerReadIsDetected0(false); + } + + @Test(timeout = 10000) + public void clientCloseWithoutServerReadIsDetectedExtraReadRequested() throws InterruptedException { + clientCloseWithoutServerReadIsDetected0(true); + } + + private void clientCloseWithoutServerReadIsDetected0(final boolean extraReadRequested) + throws InterruptedException { EventLoopGroup serverGroup = null; EventLoopGroup clientGroup = null; Channel serverChannel = null; @@ -54,11 +65,15 @@ public abstract class DetectPeerCloseWithoutReadTest { ServerBootstrap sb = new ServerBootstrap(); sb.group(serverGroup); sb.channel(serverChannel()); + // Ensure we read only one message per read() call and that we need multiple read() + // calls to consume everything. sb.childOption(ChannelOption.AUTO_READ, false); + sb.childOption(ChannelOption.MAX_MESSAGES_PER_READ, 1); + sb.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(expectedBytes / 10)); sb.childHandler(new ChannelInitializer() { @Override - protected void initChannel(Channel ch) throws Exception { - ch.pipeline().addLast(new TestHandler(bytesRead, latch)); + protected void initChannel(Channel ch) { + ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch)); } }); @@ -89,7 +104,16 @@ public abstract class DetectPeerCloseWithoutReadTest { } @Test(timeout = 10000) - public void serverCloseWithoutClientReadIsDetected() throws InterruptedException { + public void serverCloseWithoutClientReadIsDetectedNoExtraReadRequested() throws InterruptedException { + serverCloseWithoutClientReadIsDetected0(false); + } + + @Test(timeout = 10000) + public void serverCloseWithoutClientReadIsDetectedExtraReadRequested() throws InterruptedException { + serverCloseWithoutClientReadIsDetected0(true); + } + + private void serverCloseWithoutClientReadIsDetected0(final boolean extraReadRequested) throws InterruptedException { EventLoopGroup serverGroup = null; EventLoopGroup clientGroup = null; Channel serverChannel = null; @@ -105,10 +129,10 @@ public abstract class DetectPeerCloseWithoutReadTest { sb.channel(serverChannel()); sb.childHandler(new ChannelInitializer() { @Override - protected void initChannel(Channel ch) throws Exception { + protected void initChannel(Channel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { + public void channelActive(ChannelHandlerContext ctx) { ByteBuf buf = ctx.alloc().buffer(expectedBytes); buf.writerIndex(buf.writerIndex() + expectedBytes); ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE); @@ -123,11 +147,15 @@ public abstract class DetectPeerCloseWithoutReadTest { Bootstrap cb = new Bootstrap(); cb.group(serverGroup); cb.channel(clientChannel()); + // Ensure we read only one message per read() call and that we need multiple read() + // calls to consume everything. cb.option(ChannelOption.AUTO_READ, false); + cb.option(ChannelOption.MAX_MESSAGES_PER_READ, 1); + cb.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(expectedBytes / 10)); cb.handler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { - ch.pipeline().addLast(new TestHandler(bytesRead, latch)); + ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch)); } }); clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel(); @@ -152,22 +180,27 @@ public abstract class DetectPeerCloseWithoutReadTest { private static final class TestHandler extends SimpleChannelInboundHandler { private final AtomicInteger bytesRead; + private final boolean extraReadRequested; private final CountDownLatch latch; - TestHandler(AtomicInteger bytesRead, CountDownLatch latch) { + TestHandler(AtomicInteger bytesRead, boolean extraReadRequested, CountDownLatch latch) { this.bytesRead = bytesRead; + this.extraReadRequested = extraReadRequested; this.latch = latch; } @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { bytesRead.addAndGet(msg.readableBytes()); - // Because autoread is off, we call read to consume all data until we detect the close. - ctx.read(); + + if (extraReadRequested) { + // Because autoread is off, we call read to consume all data until we detect the close. + ctx.read(); + } } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) { latch.countDown(); ctx.fireChannelInactive(); }