diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index d18fa3a4a1..0d4ca82075 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -436,7 +436,9 @@ public class LocalChannel extends AbstractChannel { } } ChannelPipeline peerPipeline = peer.pipeline(); - if (peer.readInProgress) { + // We should only set readInProgress to false if there is any data that was read as otherwise we may miss to + // forward data later on. + if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) { peer.readInProgress = false; for (;;) { Object received = peer.inboundBuffer.poll(); diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index a8def572b8..2ec4c29a6a 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -26,6 +26,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoop; @@ -945,4 +946,78 @@ public class LocalChannelTest { closeChannel(sc); } } + + private static void writeAndFlushReadOnSuccess(final ChannelHandlerContext ctx, Object msg) { + ctx.writeAndFlush(msg).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (future.isSuccess()) { + ctx.read(); + } + } + }); + } + + @Test(timeout = 5000) + public void testAutoReadDisabledSharedGroup() throws Exception { + testAutoReadDisabled(sharedGroup, sharedGroup); + } + + @Test(timeout = 5000) + public void testAutoReadDisabledDifferentGroup() throws Exception { + testAutoReadDisabled(group1, group2); + } + + private static void testAutoReadDisabled(EventLoopGroup serverGroup, EventLoopGroup clientGroup) throws Exception { + final CountDownLatch latch = new CountDownLatch(100); + Bootstrap cb = new Bootstrap(); + ServerBootstrap sb = new ServerBootstrap(); + + cb.group(serverGroup) + .channel(LocalChannel.class) + .option(ChannelOption.AUTO_READ, false) + .handler(new ChannelInboundHandlerAdapter() { + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + writeAndFlushReadOnSuccess(ctx, "test"); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { + writeAndFlushReadOnSuccess(ctx, msg); + } + }); + + sb.group(clientGroup) + .channel(LocalServerChannel.class) + .childOption(ChannelOption.AUTO_READ, false) + .childHandler(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + ctx.read(); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { + latch.countDown(); + if (latch.getCount() > 0) { + writeAndFlushReadOnSuccess(ctx, msg); + } + } + }); + + Channel sc = null; + Channel cc = null; + try { + // Start server + sc = sb.bind(TEST_ADDRESS).sync().channel(); + cc = cb.connect(TEST_ADDRESS).sync().channel(); + + latch.await(); + } finally { + closeChannel(cc); + closeChannel(sc); + } + } }