### Motivation: FlowControllerHandler currently may swell read-complete events in some situations. ### Modification: * Fire read-complete event from flow controller, when it previously was swallowed * New unit test to cover this case ### Result: Fixes #9667: FlowControllerHandler swallows read-complete event when auto-read is disabled
This commit is contained in:
parent
a77fe9333d
commit
73f5c8384e
@ -154,9 +154,13 @@ public class FlowControlHandler extends ChannelDuplexHandler {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
// Don't relay completion events from upstream as they
|
if (isQueueEmpty()) {
|
||||||
// make no sense in this context. See dequeue() where
|
ctx.fireChannelReadComplete();
|
||||||
// a new set of completion events is being produced.
|
} else {
|
||||||
|
// Don't relay completion events from upstream as they
|
||||||
|
// make no sense in this context. See dequeue() where
|
||||||
|
// a new set of completion events is being produced.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -29,10 +29,13 @@ import io.netty.channel.ChannelInitializer;
|
|||||||
import io.netty.channel.ChannelOption;
|
import io.netty.channel.ChannelOption;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||||
|
import io.netty.handler.timeout.IdleStateEvent;
|
||||||
|
import io.netty.handler.timeout.IdleStateHandler;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
@ -40,13 +43,14 @@ import org.junit.Test;
|
|||||||
|
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Queue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Exchanger;
|
import java.util.concurrent.Exchanger;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.*;
|
import static java.util.concurrent.TimeUnit.*;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.*;
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
public class FlowControlHandlerTest {
|
public class FlowControlHandlerTest {
|
||||||
private static EventLoopGroup GROUP;
|
private static EventLoopGroup GROUP;
|
||||||
@ -77,7 +81,7 @@ public class FlowControlHandlerTest {
|
|||||||
.childOption(ChannelOption.AUTO_READ, autoRead)
|
.childOption(ChannelOption.AUTO_READ, autoRead)
|
||||||
.childHandler(new ChannelInitializer<Channel>() {
|
.childHandler(new ChannelInitializer<Channel>() {
|
||||||
@Override
|
@Override
|
||||||
protected void initChannel(Channel ch) throws Exception {
|
protected void initChannel(Channel ch) {
|
||||||
ChannelPipeline pipeline = ch.pipeline();
|
ChannelPipeline pipeline = ch.pipeline();
|
||||||
pipeline.addLast(new OneByteToThreeStringsDecoder());
|
pipeline.addLast(new OneByteToThreeStringsDecoder());
|
||||||
pipeline.addLast(handlers);
|
pipeline.addLast(handlers);
|
||||||
@ -419,13 +423,69 @@ public class FlowControlHandlerTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSwallowedReadComplete() throws Exception {
|
||||||
|
final long delayMillis = 100;
|
||||||
|
final Queue<IdleStateEvent> userEvents = new LinkedBlockingQueue<IdleStateEvent>();
|
||||||
|
final EmbeddedChannel channel = new EmbeddedChannel(false, false,
|
||||||
|
new FlowControlHandler(),
|
||||||
|
new IdleStateHandler(delayMillis, 0, 0, MILLISECONDS),
|
||||||
|
new ChannelInboundHandlerAdapter() {
|
||||||
|
@Override
|
||||||
|
public void channelActive(ChannelHandlerContext ctx) {
|
||||||
|
ctx.fireChannelActive();
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
|
ctx.fireChannelRead(msg);
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelReadComplete(ChannelHandlerContext ctx) {
|
||||||
|
ctx.fireChannelReadComplete();
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||||
|
if (evt instanceof IdleStateEvent) {
|
||||||
|
userEvents.add((IdleStateEvent) evt);
|
||||||
|
}
|
||||||
|
ctx.fireUserEventTriggered(evt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
channel.config().setAutoRead(false);
|
||||||
|
assertFalse(channel.config().isAutoRead());
|
||||||
|
|
||||||
|
channel.register();
|
||||||
|
|
||||||
|
// Reset read timeout by some message
|
||||||
|
assertTrue(channel.writeInbound(Unpooled.EMPTY_BUFFER));
|
||||||
|
channel.flushInbound();
|
||||||
|
assertEquals(Unpooled.EMPTY_BUFFER, channel.readInbound());
|
||||||
|
|
||||||
|
// Emulate 'no more messages in NIO channel' on the next read attempt.
|
||||||
|
channel.flushInbound();
|
||||||
|
assertNull(channel.readInbound());
|
||||||
|
|
||||||
|
Thread.sleep(delayMillis);
|
||||||
|
channel.runPendingTasks();
|
||||||
|
assertEquals(IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT, userEvents.poll());
|
||||||
|
assertFalse(channel.finish());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a fictional message decoder. It decodes each {@code byte}
|
* This is a fictional message decoder. It decodes each {@code byte}
|
||||||
* into three strings.
|
* into three strings.
|
||||||
*/
|
*/
|
||||||
private static final class OneByteToThreeStringsDecoder extends ByteToMessageDecoder {
|
private static final class OneByteToThreeStringsDecoder extends ByteToMessageDecoder {
|
||||||
@Override
|
@Override
|
||||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
|
||||||
for (int i = 0; i < in.readableBytes(); i++) {
|
for (int i = 0; i < in.readableBytes(); i++) {
|
||||||
out.add("1");
|
out.add("1");
|
||||||
out.add("2");
|
out.add("2");
|
||||||
|
Loading…
Reference in New Issue
Block a user