Dequeue all cached messages and destroy the queue instance after removing the FlowControlHandler from channel pipeline. (#10304)

Motivation:

The `FlowControlHandler` may cache the received messages in a queue in order to do the flow control. However, if this handler is manually removed from pipeline during runtime, those cached messages might not be passed to the next channel handler forever.

Modification:

Dequeue all these cached messages and call `ChannelHandlerContext.fireChannelRead(...)` in `handlerRemoved(...)` method.

Result:
Avoid losing the received messages.
This commit is contained in:
feijermu 2020-05-19 15:46:12 +08:00 committed by Norman Maurer
parent bdb5e20d99
commit 9fe5f6f958
2 changed files with 67 additions and 0 deletions

View File

@ -117,6 +117,14 @@ public class FlowControlHandler implements ChannelHandler {
config = ctx.channel().config();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (!isQueueEmpty()) {
dequeue(ctx, queue.size());
}
destroy();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
destroy();

View File

@ -477,6 +477,65 @@ public class FlowControlHandlerTest {
assertFalse(channel.finish());
}
@Test
public void testRemoveFlowControl() throws Exception {
final CountDownLatch latch = new CountDownLatch(3);
ChannelHandler handler = new ChannelHandler() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//do the first read
ctx.read();
ctx.fireChannelActive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
latch.countDown();
ctx.fireChannelRead(msg);
}
};
FlowControlHandler flow = new FlowControlHandler() {
private int num;
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
++num;
if (num >= 3) {
//We have received 3 messages. Remove myself later
final ChannelHandler handler = this;
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
ctx.pipeline().remove(handler);
}
});
}
}
};
ChannelHandler tail = new ChannelHandler() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//consume this msg
ReferenceCountUtil.release(msg);
}
};
Channel server = newServer(false /* no auto read */, flow, handler, tail);
Channel client = newClient(server.localAddress());
try {
// Write one message
client.writeAndFlush(newOneMessage()).sync();
// We should receive 3 messages
assertTrue(latch.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());
} finally {
client.close();
server.close();
}
}
/**
* This is a fictional message decoder. It decodes each {@code byte}
* into three strings.