Dequeue all cached messages and destroy the queue instance after removing the FlowControlHandler from channel pipeline. (#10304)

Motivation:

The `FlowControlHandler` may cache the received messages in a queue in order to do the flow control. However, if this handler is manually removed from pipeline during runtime, those cached messages might not be passed to the next channel handler forever.

Modification:

Dequeue all these cached messages and call `ChannelHandlerContext.fireChannelRead(...)` in `handlerRemoved(...)` method.

Result:
Avoid losing the received messages.
This commit is contained in:
feijermu 2020-05-19 15:46:12 +08:00 committed by GitHub
parent d1b99b702c
commit f66412c84c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 0 deletions

View File

@ -119,6 +119,15 @@ public class FlowControlHandler extends ChannelDuplexHandler {
config = ctx.channel().config(); config = ctx.channel().config();
} }
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
if (!isQueueEmpty()) {
dequeue(ctx, queue.size());
}
destroy();
}
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
destroy(); destroy();

View File

@ -479,6 +479,65 @@ public class FlowControlHandlerTest {
assertFalse(channel.finish()); assertFalse(channel.finish());
} }
@Test
public void testRemoveFlowControl() throws Exception {
final CountDownLatch latch = new CountDownLatch(3);
ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//do the first read
ctx.read();
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
latch.countDown();
super.channelRead(ctx, msg);
}
};
FlowControlHandler flow = new FlowControlHandler() {
private int num;
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
++num;
if (num >= 3) {
//We have received 3 messages. Remove myself later
final ChannelHandler handler = this;
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
ctx.pipeline().remove(handler);
}
});
}
}
};
ChannelInboundHandlerAdapter tail = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//consume this msg
ReferenceCountUtil.release(msg);
}
};
Channel server = newServer(false /* no auto read */, flow, handler, tail);
Channel client = newClient(server.localAddress());
try {
// Write one message
client.writeAndFlush(newOneMessage()).sync();
// We should receive 3 messages
assertTrue(latch.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());
} finally {
client.close();
server.close();
}
}
/** /**
* This is a fictional message decoder. It decodes each {@code byte} * This is a fictional message decoder. It decodes each {@code byte}
* into three strings. * into three strings.