[#4805] Respect isAutoRead() once re-register Channel
Motivation:
When a channel was registered before and is re-registered we need to respect ChannelConfig.isAutoRead() and so start reading one the registration task completes. This was done "by luck" before 15162202fb
.
Modifications:
Explicit start reading once a Channel was re-registered if isAutoRead() is true.
Result:
Correctly receive data after re-registration completes.
This commit is contained in:
parent
64d1eea608
commit
027e8224e4
@ -442,8 +442,16 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
pipeline.fireChannelRegistered();
|
||||
// Only fire a channelActive if the channel has never been registered. This prevents firing
|
||||
// multiple channel actives if the channel is deregistered and re-registered.
|
||||
if (firstRegistration && isActive()) {
|
||||
if (isActive()) {
|
||||
if (firstRegistration) {
|
||||
pipeline.fireChannelActive();
|
||||
} else if (config().isAutoRead()) {
|
||||
// This channel was registered before and autoRead() is set. This means we need to begin read
|
||||
// again so that we process inbound data.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/4805
|
||||
beginRead();
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// Close the channel directly to avoid FD leak.
|
||||
|
@ -15,16 +15,26 @@
|
||||
*/
|
||||
package io.netty.channel.nio;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import io.netty.util.NetUtil;
|
||||
import io.netty.util.internal.ThreadLocalRandom;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.DataInput;
|
||||
@ -35,6 +45,7 @@ import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.*;
|
||||
@ -141,4 +152,92 @@ public class NioSocketChannelTest {
|
||||
group.shutdownGracefully().sync();
|
||||
}
|
||||
}
|
||||
|
||||
// Test for https://github.com/netty/netty/issues/4805
|
||||
@Test(timeout = 3000)
|
||||
public void testChannelReRegisterReadSameEventLoop() throws Exception {
|
||||
testChannelReRegisterRead(true);
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testChannelReRegisterReadDifferentEventLoop() throws Exception {
|
||||
testChannelReRegisterRead(false);
|
||||
}
|
||||
|
||||
private static void testChannelReRegisterRead(final boolean sameEventLoop) throws Exception {
|
||||
final EventLoopGroup group = new NioEventLoopGroup(2);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// Just some random bytes
|
||||
byte[] bytes = new byte[1024];
|
||||
ThreadLocalRandom.current().nextBytes(bytes);
|
||||
|
||||
Channel sc = null;
|
||||
Channel cc = null;
|
||||
ServerBootstrap b = new ServerBootstrap();
|
||||
try {
|
||||
b.group(group)
|
||||
.channel(NioServerSocketChannel.class)
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
.childHandler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
pipeline.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
|
||||
// We was able to read something from the Channel after reregister.
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
|
||||
final EventLoop loop = group.next();
|
||||
if (sameEventLoop) {
|
||||
deregister(ctx, loop);
|
||||
} else {
|
||||
loop.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
deregister(ctx, loop);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void deregister(ChannelHandlerContext ctx, final EventLoop loop) {
|
||||
// As soon as the channel becomes active re-register it to another
|
||||
// EventLoop. After this is done we should still receive the data that
|
||||
// was written to the channel.
|
||||
ctx.deregister().addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture cf) {
|
||||
Channel channel = cf.channel();
|
||||
assertNotSame(loop, channel.eventLoop());
|
||||
group.next().register(channel);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
sc = b.bind(0).syncUninterruptibly().channel();
|
||||
|
||||
Bootstrap bootstrap = new Bootstrap();
|
||||
bootstrap.group(group).channel(NioSocketChannel.class);
|
||||
bootstrap.handler(new ChannelInboundHandlerAdapter());
|
||||
cc = bootstrap.connect(sc.localAddress()).syncUninterruptibly().channel();
|
||||
cc.writeAndFlush(Unpooled.wrappedBuffer(bytes)).syncUninterruptibly();
|
||||
latch.await();
|
||||
} finally {
|
||||
if (cc != null) {
|
||||
cc.close();
|
||||
}
|
||||
if (sc != null) {
|
||||
sc.close();
|
||||
}
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user