diff --git a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java index 6cb52e4cf6..5304ee6736 100644 --- a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java +++ b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java @@ -153,9 +153,13 @@ public class FlowControlHandler implements ChannelHandler { @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - // Don't relay completion events from upstream as they - // make no sense in this context. See dequeue() where - // a new set of completion events is being produced. + if (isQueueEmpty()) { + ctx.fireChannelReadComplete(); + } else { + // Don't relay completion events from upstream as they + // make no sense in this context. See dequeue() where + // a new set of completion events is being produced. + } } /** diff --git a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java index c0a6e8addc..ce901c546e 100644 --- a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java @@ -27,11 +27,14 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; +import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.nio.NioHandler; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.ReferenceCountUtil; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -39,13 +42,14 @@ import org.junit.Test; import java.net.SocketAddress; import java.util.List; +import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Exchanger; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import static java.util.concurrent.TimeUnit.*; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; public class FlowControlHandlerTest { private static EventLoopGroup GROUP; @@ -76,7 +80,7 @@ public class FlowControlHandlerTest { .childOption(ChannelOption.AUTO_READ, autoRead) .childHandler(new ChannelInitializer() { @Override - protected void initChannel(Channel ch) throws Exception { + protected void initChannel(Channel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new OneByteToThreeStringsDecoder()); pipeline.addLast(handlers); @@ -418,13 +422,69 @@ public class FlowControlHandlerTest { } } + @Test + public void testSwallowedReadComplete() throws Exception { + final long delayMillis = 100; + final Queue userEvents = new LinkedBlockingQueue(); + final EmbeddedChannel channel = new EmbeddedChannel(false, false, + new FlowControlHandler(), + new IdleStateHandler(delayMillis, 0, 0, MILLISECONDS), + new ChannelHandler() { + @Override + public void channelActive(ChannelHandlerContext ctx) { + ctx.fireChannelActive(); + ctx.read(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ctx.fireChannelRead(msg); + ctx.read(); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.fireChannelReadComplete(); + ctx.read(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + if (evt instanceof IdleStateEvent) { + userEvents.add((IdleStateEvent) evt); + } + ctx.fireUserEventTriggered(evt); + } + } + ); + + channel.config().setAutoRead(false); + assertFalse(channel.config().isAutoRead()); + + channel.register(); + + // Reset read timeout by some message + assertTrue(channel.writeInbound(Unpooled.EMPTY_BUFFER)); + channel.flushInbound(); + assertEquals(Unpooled.EMPTY_BUFFER, channel.readInbound()); + + // Emulate 'no more messages in NIO channel' on the next read attempt. + channel.flushInbound(); + assertNull(channel.readInbound()); + + Thread.sleep(delayMillis); + channel.runPendingTasks(); + assertEquals(IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT, userEvents.poll()); + assertFalse(channel.finish()); + } + /** * This is a fictional message decoder. It decodes each {@code byte} * into three strings. */ private static final class OneByteToThreeStringsDecoder extends ByteToMessageDecoder { @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { for (int i = 0; i < in.readableBytes(); i++) { out.add("1"); out.add("2");