diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index fd00947d22..fa66d0907f 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -442,8 +442,16 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. - if (firstRegistration && isActive()) { - pipeline.fireChannelActive(); + if (isActive()) { + if (firstRegistration) { + pipeline.fireChannelActive(); + } else if (config().isAutoRead()) { + // This channel was registered before and autoRead() is set. This means we need to begin read + // again so that we process inbound data. + // + // See https://github.com/netty/netty/issues/4805 + beginRead(); + } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. diff --git a/transport/src/test/java/io/netty/channel/nio/NioSocketChannelTest.java b/transport/src/test/java/io/netty/channel/nio/NioSocketChannelTest.java index 3d5b47dfd4..fc75dda475 100644 --- a/transport/src/test/java/io/netty/channel/nio/NioSocketChannelTest.java +++ b/transport/src/test/java/io/netty/channel/nio/NioSocketChannelTest.java @@ -15,16 +15,26 @@ */ package io.netty.channel.nio; +import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.CharsetUtil; import io.netty.util.NetUtil; +import io.netty.util.internal.ThreadLocalRandom; import org.junit.Test; import java.io.DataInput; @@ -35,6 +45,7 @@ import java.net.Socket; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.Queue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import static org.hamcrest.CoreMatchers.*; @@ -141,4 +152,92 @@ public class NioSocketChannelTest { group.shutdownGracefully().sync(); } } + + // Test for https://github.com/netty/netty/issues/4805 + @Test(timeout = 3000) + public void testChannelReRegisterReadSameEventLoop() throws Exception { + testChannelReRegisterRead(true); + } + + @Test(timeout = 3000) + public void testChannelReRegisterReadDifferentEventLoop() throws Exception { + testChannelReRegisterRead(false); + } + + private static void testChannelReRegisterRead(final boolean sameEventLoop) throws Exception { + final EventLoopGroup group = new NioEventLoopGroup(2); + final CountDownLatch latch = new CountDownLatch(1); + + // Just some random bytes + byte[] bytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(bytes); + + Channel sc = null; + Channel cc = null; + ServerBootstrap b = new ServerBootstrap(); + try { + b.group(group) + .channel(NioServerSocketChannel.class) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) { + // We was able to read something from the Channel after reregister. + latch.countDown(); + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + final EventLoop loop = group.next(); + if (sameEventLoop) { + deregister(ctx, loop); + } else { + loop.execute(new Runnable() { + @Override + public void run() { + deregister(ctx, loop); + } + }); + } + } + + private void deregister(ChannelHandlerContext ctx, final EventLoop loop) { + // As soon as the channel becomes active re-register it to another + // EventLoop. After this is done we should still receive the data that + // was written to the channel. + ctx.deregister().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture cf) { + Channel channel = cf.channel(); + assertNotSame(loop, channel.eventLoop()); + group.next().register(channel); + } + }); + } + }); + } + }); + + sc = b.bind(0).syncUninterruptibly().channel(); + + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group).channel(NioSocketChannel.class); + bootstrap.handler(new ChannelInboundHandlerAdapter()); + cc = bootstrap.connect(sc.localAddress()).syncUninterruptibly().channel(); + cc.writeAndFlush(Unpooled.wrappedBuffer(bytes)).syncUninterruptibly(); + latch.await(); + } finally { + if (cc != null) { + cc.close(); + } + if (sc != null) { + sc.close(); + } + group.shutdownGracefully(); + } + } }