A new ChannelHandler that allows the user to control the flow of messages if upstream handlers emit more than one event for each read()


Some handlers such as HttpObjectDecoder can emit more than one event per read()
which leads to problems in downstream handlers that expect only one event and hope
that ChannelConfig#setAutoRead(false) prevents further events being sent while they're
processing the one they've just received.


A new handler called FlowControlHandler that feeds off read() and isAutoRead() and acts
as a holding buffer if auto reading gets turned off and more events arrive while auto reading
is off.


Fixes issues such as #4895.
This commit is contained in:
Roger Kapsi 2016-04-05 10:44:32 -04:00 committed by Scott Mitchell
parent 04e33fd2d8
commit 5eb0127c2a
3 changed files with 656 additions and 0 deletions

View File

@ -0,0 +1,247 @@
* Copyright 2016 The Netty Project
* The Netty Project licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
package io.netty.handler.flow;
import java.util.ArrayDeque;
import java.util.Queue;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
* The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream.
* Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as
* many events as they like for any given input. A channel's auto reading configuration doesn't usually
* apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would
* like to hold subsequent events while they're processing one event. It's a common problem with the
* {@code HttpObjectDecoder} that will very often fire a {@code HttpRequest} that is immediately followed
* by a {@code LastHttpContent} event.
* <pre>
* {@link ChannelPipeline} pipeline = ...;
* pipeline.addLast(new HttpServerCodec());
* pipeline.addLast(new {@link FlowControlHandler}());
* pipeline.addLast(new MyExampleHandler());
* class MyExampleHandler extends {@link ChannelInboundHandlerAdapter} {
* @Override
* public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
* if (msg instanceof HttpRequest) {
* ctx.channel().config().setAutoRead(false);
* // The FlowControlHandler will hold any subsequent events that
* // were emitted by HttpObjectDecoder until auto reading is turned
* // back on or Channel#read() is being called.
* }
* }
* }
* </pre>
* @see ChannelConfig#setAutoRead(boolean)
public class FlowControlHandler extends ChannelDuplexHandler {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class);
private final boolean releaseMessages;
private RecyclableArrayDeque queue;
private ChannelConfig config;
private boolean shouldConsume;
public FlowControlHandler() {
public FlowControlHandler(boolean releaseMessages) {
this.releaseMessages = releaseMessages;
* Returns a copy of the underlying {@link Queue}. This method exists for
* testing, debugging and inspection purposes and it is not Thread safe!
Queue<Object> queue() {
RecyclableArrayDeque queue = this.queue;
if (queue == null) {
return new ArrayDeque<Object>(0);
return new ArrayDeque<Object>(queue);
* Releases all messages and destroys the {@link Queue}.
private void destroy() {
if (queue != null) {
if (!queue.isEmpty()) {
logger.trace("Non-empty queue: {}", queue);
if (releaseMessages) {
Object msg;
while ((msg = queue.poll()) != null) {
queue = null;
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
config = ctx.channel().config();
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public void read(ChannelHandlerContext ctx) throws Exception {
if (dequeue(ctx, 1) == 0) {
// It seems no messages were consumed. We need to read() some
// messages from upstream and once one arrives it need to be
// relayed to downstream to keep the flow going.
shouldConsume = true;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (queue == null) {
queue = RecyclableArrayDeque.newInstance();
// We just received one message. Do we need to relay it regardless
// of the auto reading configuration? The answer is yes if this
// method was called as a result of a prior read() call.
int minConsume = shouldConsume ? 1 : 0;
shouldConsume = false;
dequeue(ctx, minConsume);
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// Don't relay completion events from upstream as they
// make no sense in this context. See dequeue() where
// a new set of completion events is being produced.
* Dequeues one or many (or none) messages depending on the channel's auto
* reading state and returns the number of messages that were consumed from
* the internal queue.
* The {@code minConsume} argument is used to force {@code dequeue()} into
* consuming that number of messages regardless of the channel's auto
* reading configuration.
* @see #read(ChannelHandlerContext)
* @see #channelRead(ChannelHandlerContext, Object)
private int dequeue(ChannelHandlerContext ctx, int minConsume) {
if (queue != null) {
int consumed = 0;
Object msg;
while ((consumed < minConsume) || config.isAutoRead()) {
msg = queue.poll();
if (msg == null) {
// We're firing a completion event every time one (or more)
// messages were consumed and the queue ended up being drained
// to an empty state.
if (queue.isEmpty() && consumed > 0) {
return consumed;
return 0;
* A recyclable {@link ArrayDeque}.
private static final class RecyclableArrayDeque extends ArrayDeque<Object> {
private static final long serialVersionUID = 0L;
* A value of {@code 2} should be a good choice for most scenarios.
private static final int DEFAULT_NUM_ELEMENTS = 2;
private static final Recycler<RecyclableArrayDeque> RECYCLER = new Recycler<RecyclableArrayDeque>() {
protected RecyclableArrayDeque newObject(Handle<RecyclableArrayDeque> handle) {
return new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle);
public static RecyclableArrayDeque newInstance() {
return RECYCLER.get();
private final Handle<RecyclableArrayDeque> handle;
private RecyclableArrayDeque(int numElements, Handle<RecyclableArrayDeque> handle) {
this.handle = handle;
public void recycle() {

View File

@ -0,0 +1,20 @@
* Copyright 2016 The Netty Project
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
* Package to control the flow of messages.
package io.netty.handler.flow;

View File

@ -0,0 +1,389 @@
* Copyright 2016 The Netty Project
* The Netty Project licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
package io.netty.handler.flow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.ReferenceCountUtil;
public class FlowControlHandlerTest {
private static EventLoopGroup GROUP;
public static void init() {
GROUP = new NioEventLoopGroup();
public static void destroy() {
* The {@link OneByteToThreeStringsDecoder} decodes this {@code byte[]} into three messages.
private static ByteBuf newOneMessage() {
return Unpooled.wrappedBuffer(new byte[]{ 1 });
private static Channel newServer(final boolean autoRead, final ChannelHandler... handlers) {
assertTrue(handlers.length >= 1);
ServerBootstrap serverBootstrap = new ServerBootstrap();
.childOption(ChannelOption.AUTO_READ, autoRead)
.childHandler(new ChannelInitializer<Channel>() {
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new OneByteToThreeStringsDecoder());
return serverBootstrap.bind(0)
private static Channel newClient(SocketAddress server) {
Bootstrap bootstrap = new Bootstrap();
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
.handler(new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
fail("In this test the client is never receiving a message from the server.");
return bootstrap.connect(server)
* This test demonstrates the default behavior if auto reading
* is turned on from the get-go and you're trying to turn it off
* once you've received your first message.
* NOTE: This test waits for the client to disconnect which is
* interpreted as the signal that all {@code byte}s have been
* transferred to the server.
public void testAutoReadingOn() throws Exception {
final CountDownLatch latch = new CountDownLatch(3);
ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// We're turning off auto reading in the hope that no
// new messages are being sent but that is not true.
Channel server = newServer(true, handler);
Channel client = newClient(server.localAddress());
try {
// We received three messages even through auto reading
// was turned off after we received the first message.
assertTrue(latch.await(1L, TimeUnit.SECONDS));
} finally {
* This test demonstrates the default behavior if auto reading
* is turned off from the get-go and you're calling read() in
* the hope that only one message will be returned.
* NOTE: This test waits for the client to disconnect which is
* interpreted as the signal that all {@code byte}s have been
* transferred to the server.
public void testAutoReadingOff() throws Exception {
final Exchanger<Channel> peerRef = new Exchanger<Channel>();
final CountDownLatch latch = new CountDownLatch(3);
ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS);
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel server = newServer(false, handler);
Channel client = newClient(server.localAddress());
try {
// The client connection on the server side
Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS);
// Write the message
// Read the message
// We received all three messages but hoped that only one
// message was read because auto reading was off and we
// invoked the read() method only once.
assertTrue(latch.await(1L, TimeUnit.SECONDS));
} finally {
* The {@link FlowControlHandler} will simply pass-through all messages
* if auto reading is on and remains on.
public void testFlowAutoReadOn() throws Exception {
final CountDownLatch latch = new CountDownLatch(3);
ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(true, flow, handler);
Channel client = newClient(server.localAddress());
try {
// Write the message
// We should receive 3 messages
assertTrue(latch.await(1L, TimeUnit.SECONDS));
} finally {
* The {@link FlowControlHandler} will pass down messages one by one
* if {@link ChannelConfig#setAutoRead(boolean)} is being toggled.
public void testFlowToggleAutoRead() throws Exception {
final Exchanger<Channel> peerRef = new Exchanger<Channel>();
final AtomicReference<CountDownLatch> latchRef
= new AtomicReference<CountDownLatch>(new CountDownLatch(1));
final AtomicInteger counter = new AtomicInteger();
ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS);
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Disable auto reading after each message
CountDownLatch latch = latchRef.get();
FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(true, flow, handler);
Channel client = newClient(server.localAddress());
try {
// The client connection on the server side
Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS);
// channelRead(1)
assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS));
assertEquals(1, counter.get());
assertThat(flow.queue(), IsIterableContainingInOrder.<Object>contains("2", "3"));
// channelRead(2)
latchRef.set(new CountDownLatch(1));
assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS));
assertEquals(2, counter.get());
assertThat(flow.queue(), IsIterableContainingInOrder.<Object>contains("3"));
// channelRead(3)
latchRef.set(new CountDownLatch(1));
assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS));
assertEquals(3, counter.get());
} finally {
* The {@link FlowControlHandler} will pass down messages one by one
* if auto reading is off and the user is calling {@code read()} on
* their own.
public void testFlowAutoReadOff() throws Exception {
final Exchanger<Channel> peerRef = new Exchanger<Channel>();
final AtomicReference<CountDownLatch> latchRef
= new AtomicReference<CountDownLatch>(new CountDownLatch(1));
final AtomicInteger counter = new AtomicInteger();
ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS);
public void channelRead(ChannelHandlerContext ctx, Object msg) {
CountDownLatch latch = latchRef.get();
FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(false, flow, handler);
Channel client = newClient(server.localAddress());
try {
// The client connection on the server side
Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS);
// Write the message
// channelRead(1)
assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS));
assertEquals(1, counter.get());
assertThat(flow.queue(), IsIterableContainingInOrder.<Object>contains("2", "3"));
// channelRead(2)
latchRef.set(new CountDownLatch(1));
assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS));
assertEquals(2, counter.get());
assertThat(flow.queue(), IsIterableContainingInOrder.<Object>contains("3"));
// channelRead(3)
latchRef.set(new CountDownLatch(1));
assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS));
assertEquals(3, counter.get());
} finally {
* This is a fictional message decoder. It decodes each {@code byte}
* into three strings.
private static final class OneByteToThreeStringsDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
for (int i = 0; i < in.readableBytes(); i++) {