From 0f34d887b78029aa9ebc3b5faf9c9f257960425c Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 19 Apr 2018 09:39:16 +0200 Subject: [PATCH] Only reset readIsPending if outboundBuffer is not empty. (#7874) Motivation: We need to ensure we only reset readInProgress if the outboundBuffer is not empty as otherwise we may miss to call fireChannelRead(...) later on when using the LocalChannel. Modifications: Also check if the outboundBuffer is not empty before setting readInProgress to false again Result: Fixes https://github.com/netty/netty/issues/7855 --- .../io/netty/channel/local/LocalChannel.java | 4 +- .../netty/channel/local/LocalChannelTest.java | 75 +++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index d18fa3a4a1..0d4ca82075 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -436,7 +436,9 @@ public class LocalChannel extends AbstractChannel { } } ChannelPipeline peerPipeline = peer.pipeline(); - if (peer.readInProgress) { + // We should only set readInProgress to false if there is any data that was read as otherwise we may miss to + // forward data later on. + if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) { peer.readInProgress = false; for (;;) { Object received = peer.inboundBuffer.poll(); diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index a8def572b8..2ec4c29a6a 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -26,6 +26,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoop; @@ -945,4 +946,78 @@ public class LocalChannelTest { closeChannel(sc); } } + + private static void writeAndFlushReadOnSuccess(final ChannelHandlerContext ctx, Object msg) { + ctx.writeAndFlush(msg).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (future.isSuccess()) { + ctx.read(); + } + } + }); + } + + @Test(timeout = 5000) + public void testAutoReadDisabledSharedGroup() throws Exception { + testAutoReadDisabled(sharedGroup, sharedGroup); + } + + @Test(timeout = 5000) + public void testAutoReadDisabledDifferentGroup() throws Exception { + testAutoReadDisabled(group1, group2); + } + + private static void testAutoReadDisabled(EventLoopGroup serverGroup, EventLoopGroup clientGroup) throws Exception { + final CountDownLatch latch = new CountDownLatch(100); + Bootstrap cb = new Bootstrap(); + ServerBootstrap sb = new ServerBootstrap(); + + cb.group(serverGroup) + .channel(LocalChannel.class) + .option(ChannelOption.AUTO_READ, false) + .handler(new ChannelInboundHandlerAdapter() { + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + writeAndFlushReadOnSuccess(ctx, "test"); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { + writeAndFlushReadOnSuccess(ctx, msg); + } + }); + + sb.group(clientGroup) + .channel(LocalServerChannel.class) + .childOption(ChannelOption.AUTO_READ, false) + .childHandler(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + ctx.read(); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { + latch.countDown(); + if (latch.getCount() > 0) { + writeAndFlushReadOnSuccess(ctx, msg); + } + } + }); + + Channel sc = null; + Channel cc = null; + try { + // Start server + sc = sb.bind(TEST_ADDRESS).sync().channel(); + cc = cb.connect(TEST_ADDRESS).sync().channel(); + + latch.await(); + } finally { + closeChannel(cc); + closeChannel(sc); + } + } }