[#4805] Respect isAutoRead() once re-register Channel

Motivation:

When a channel was registered before and is re-registered we need to respect ChannelConfig.isAutoRead() and so start reading one the registration task completes. This was done "by luck" before 15162202fb.

Modifications:

Explicit start reading once a Channel was re-registered if isAutoRead() is true.

Result:

Correctly receive data after re-registration completes.
This commit is contained in:
Norman Maurer 2016-02-01 20:02:01 +01:00
parent 08a7ca3747
commit 465a190c3f
2 changed files with 109 additions and 2 deletions

View File

@ -502,8 +502,16 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
pipeline.fireChannelRegistered(); pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing // Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered. // multiple channel actives if the channel is deregistered and re-registered.
if (firstRegistration && isActive()) { if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive(); pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
} }
} catch (Throwable t) { } catch (Throwable t) {
// Close the channel directly to avoid FD leak. // Close the channel directly to avoid FD leak.

View File

@ -15,16 +15,26 @@
*/ */
package io.netty.channel.nio; package io.netty.channel.nio;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil; import io.netty.util.NetUtil;
import io.netty.util.internal.ThreadLocalRandom;
import org.junit.Test; import org.junit.Test;
import java.io.DataInput; import java.io.DataInput;
@ -35,6 +45,7 @@ import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import static org.hamcrest.CoreMatchers.*; import static org.hamcrest.CoreMatchers.*;
@ -141,4 +152,92 @@ public class NioSocketChannelTest {
group.shutdownGracefully().sync(); group.shutdownGracefully().sync();
} }
} }
// Test for https://github.com/netty/netty/issues/4805
@Test(timeout = 3000)
public void testChannelReRegisterReadSameEventLoop() throws Exception {
testChannelReRegisterRead(true);
}
@Test(timeout = 3000)
public void testChannelReRegisterReadDifferentEventLoop() throws Exception {
testChannelReRegisterRead(false);
}
private static void testChannelReRegisterRead(final boolean sameEventLoop) throws Exception {
final EventLoopGroup group = new NioEventLoopGroup(2);
final CountDownLatch latch = new CountDownLatch(1);
// Just some random bytes
byte[] bytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(bytes);
Channel sc = null;
Channel cc = null;
ServerBootstrap b = new ServerBootstrap();
try {
b.group(group)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
// We was able to read something from the Channel after reregister.
latch.countDown();
}
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
final EventLoop loop = group.next();
if (sameEventLoop) {
deregister(ctx, loop);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
deregister(ctx, loop);
}
});
}
}
private void deregister(ChannelHandlerContext ctx, final EventLoop loop) {
// As soon as the channel becomes active re-register it to another
// EventLoop. After this is done we should still receive the data that
// was written to the channel.
ctx.deregister().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture cf) {
Channel channel = cf.channel();
assertNotSame(loop, channel.eventLoop());
group.next().register(channel);
}
});
}
});
}
});
sc = b.bind(0).syncUninterruptibly().channel();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInboundHandlerAdapter());
cc = bootstrap.connect(sc.localAddress()).syncUninterruptibly().channel();
cc.writeAndFlush(Unpooled.wrappedBuffer(bytes)).syncUninterruptibly();
latch.await();
} finally {
if (cc != null) {
cc.close();
}
if (sc != null) {
sc.close();
}
group.shutdownGracefully();
}
}
} }