diff --git a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java index 7ad723222c..5d9ca56bd4 100644 --- a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java +++ b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java @@ -172,37 +172,33 @@ public class FlowControlHandler extends ChannelDuplexHandler { * @see #channelRead(ChannelHandlerContext, Object) */ private int dequeue(ChannelHandlerContext ctx, int minConsume) { - if (queue != null) { + int consumed = 0; - int consumed = 0; - - Object msg; - while ((consumed < minConsume) || config.isAutoRead()) { - msg = queue.poll(); - if (msg == null) { - break; - } - - ++consumed; - ctx.fireChannelRead(msg); + // fireChannelRead(...) may call ctx.read() and so this method may reentrance. Because of this we need to + // check if queue was set to null in the meantime and if so break the loop. + while (queue != null && (consumed < minConsume || config.isAutoRead())) { + Object msg = queue.poll(); + if (msg == null) { + break; } - // We're firing a completion event every time one (or more) - // messages were consumed and the queue ended up being drained - // to an empty state. - if (queue.isEmpty()) { - queue.recycle(); - queue = null; - - if (consumed > 0) { - ctx.fireChannelReadComplete(); - } - } - - return consumed; + ++consumed; + ctx.fireChannelRead(msg); } - return 0; + // We're firing a completion event every time one (or more) + // messages were consumed and the queue ended up being drained + // to an empty state. + if (queue != null && queue.isEmpty()) { + queue.recycle(); + queue = null; + + if (consumed > 0) { + ctx.fireChannelReadComplete(); + } + } + + return consumed; } /** diff --git a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java index a4effb1f01..a093a9de70 100644 --- a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java @@ -42,6 +42,7 @@ import java.net.SocketAddress; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Exchanger; +import java.util.concurrent.atomic.AtomicReference; import static java.util.concurrent.TimeUnit.*; import static org.junit.Assert.assertTrue; @@ -368,6 +369,56 @@ public class FlowControlHandlerTest { } } + @Test + public void testReentranceNotCausesNPE() throws Throwable { + final Exchanger peerRef = new Exchanger(); + final CountDownLatch latch = new CountDownLatch(3); + final AtomicReference causeRef = new AtomicReference(); + ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelActive(); + peerRef.exchange(ctx.channel(), 1L, SECONDS); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + latch.countDown(); + ctx.read(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + causeRef.set(cause); + } + }; + + FlowControlHandler flow = new FlowControlHandler(); + Channel server = newServer(false, flow, handler); + Channel client = newClient(server.localAddress()); + try { + // The client connection on the server side + Channel peer = peerRef.exchange(null, 1L, SECONDS); + + // Write the message + client.writeAndFlush(newOneMessage()) + .syncUninterruptibly(); + + // channelRead(1) + peer.read(); + assertTrue(latch.await(1L, SECONDS)); + assertTrue(flow.isQueueEmpty()); + + Throwable cause = causeRef.get(); + if (cause != null) { + throw cause; + } + } finally { + client.close(); + server.close(); + } + } + /** * This is a fictional message decoder. It decodes each {@code byte} * into three strings.