Only reset readIsPending if outboundBuffer is not empty. (#7874)

Motivation:

We need to ensure we only reset readInProgress if the outboundBuffer is not empty as otherwise we may miss to call fireChannelRead(...) later on when using the LocalChannel.

Modifications:

Also check if the outboundBuffer is not empty before setting readInProgress to false again

Result:

Fixes https://github.com/netty/netty/issues/7855
This commit is contained in:
Norman Maurer 2018-04-19 09:39:16 +02:00 committed by GitHub
parent 0b690a991f
commit 0f34d887b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 78 additions and 1 deletions

View File

@ -436,7 +436,9 @@ public class LocalChannel extends AbstractChannel {
}
}
ChannelPipeline peerPipeline = peer.pipeline();
if (peer.readInProgress) {
// We should only set readInProgress to false if there is any data that was read as otherwise we may miss to
// forward data later on.
if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
peer.readInProgress = false;
for (;;) {
Object received = peer.inboundBuffer.poll();

View File

@ -26,6 +26,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoop;
@ -945,4 +946,78 @@ public class LocalChannelTest {
closeChannel(sc);
}
}
private static void writeAndFlushReadOnSuccess(final ChannelHandlerContext ctx, Object msg) {
ctx.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
ctx.read();
}
}
});
}
@Test(timeout = 5000)
public void testAutoReadDisabledSharedGroup() throws Exception {
testAutoReadDisabled(sharedGroup, sharedGroup);
}
@Test(timeout = 5000)
public void testAutoReadDisabledDifferentGroup() throws Exception {
testAutoReadDisabled(group1, group2);
}
private static void testAutoReadDisabled(EventLoopGroup serverGroup, EventLoopGroup clientGroup) throws Exception {
final CountDownLatch latch = new CountDownLatch(100);
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
cb.group(serverGroup)
.channel(LocalChannel.class)
.option(ChannelOption.AUTO_READ, false)
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
writeAndFlushReadOnSuccess(ctx, "test");
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
writeAndFlushReadOnSuccess(ctx, msg);
}
});
sb.group(clientGroup)
.channel(LocalServerChannel.class)
.childOption(ChannelOption.AUTO_READ, false)
.childHandler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
latch.countDown();
if (latch.getCount() > 0) {
writeAndFlushReadOnSuccess(ctx, msg);
}
}
});
Channel sc = null;
Channel cc = null;
try {
// Start server
sc = sb.bind(TEST_ADDRESS).sync().channel();
cc = cb.connect(TEST_ADDRESS).sync().channel();
latch.await();
} finally {
closeChannel(cc);
closeChannel(sc);
}
}
}