diff --git a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java
new file mode 100644
index 0000000000..1c07cd0711
--- /dev/null
+++ b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java
@@ -0,0 +1,247 @@
+/*
+ * Copyright 2016 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.netty.handler.flow;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+/**
+ * The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream.
+ *
+ * Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as
+ * many events as they like for any given input. A channel's auto reading configuration doesn't usually
+ * apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would
+ * like to hold subsequent events while they're processing one event. It's a common problem with the
+ * {@code HttpObjectDecoder} that will very often fire a {@code HttpRequest} that is immediately followed
+ * by a {@code LastHttpContent} event.
+ *
+ *
+ * {@link ChannelPipeline} pipeline = ...;
+ *
+ * pipeline.addLast(new HttpServerCodec());
+ * pipeline.addLast(new {@link FlowControlHandler}());
+ *
+ * pipeline.addLast(new MyExampleHandler());
+ *
+ * class MyExampleHandler extends {@link ChannelInboundHandlerAdapter} {
+ * @Override
+ * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
+ * if (msg instanceof HttpRequest) {
+ * ctx.channel().config().setAutoRead(false);
+ *
+ * // The FlowControlHandler will hold any subsequent events that
+ * // were emitted by HttpObjectDecoder until auto reading is turned
+ * // back on or Channel#read() is being called.
+ * }
+ * }
+ * }
+ *
+ *
+ * @see ChannelConfig#setAutoRead(boolean)
+ */
+public class FlowControlHandler extends ChannelDuplexHandler {
+ private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class);
+
+ private final boolean releaseMessages;
+
+ private RecyclableArrayDeque queue;
+
+ private ChannelConfig config;
+
+ private boolean shouldConsume;
+
+ public FlowControlHandler() {
+ this(true);
+ }
+
+ public FlowControlHandler(boolean releaseMessages) {
+ this.releaseMessages = releaseMessages;
+ }
+
+ /**
+ * Returns a copy of the underlying {@link Queue}. This method exists for
+ * testing, debugging and inspection purposes and it is not Thread safe!
+ */
+ Queue queue() {
+ RecyclableArrayDeque queue = this.queue;
+
+ if (queue == null) {
+ return new ArrayDeque(0);
+ }
+
+ return new ArrayDeque(queue);
+ }
+
+ /**
+ * Releases all messages and destroys the {@link Queue}.
+ */
+ private void destroy() {
+ if (queue != null) {
+
+ if (!queue.isEmpty()) {
+ logger.trace("Non-empty queue: {}", queue);
+
+ if (releaseMessages) {
+ Object msg;
+ while ((msg = queue.poll()) != null) {
+ ReferenceCountUtil.safeRelease(msg);
+ }
+ }
+ }
+
+ queue.recycle();
+ queue = null;
+ }
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ config = ctx.channel().config();
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ destroy();
+ ctx.fireChannelInactive();
+ }
+
+ @Override
+ public void read(ChannelHandlerContext ctx) throws Exception {
+ if (dequeue(ctx, 1) == 0) {
+ // It seems no messages were consumed. We need to read() some
+ // messages from upstream and once one arrives it need to be
+ // relayed to downstream to keep the flow going.
+ shouldConsume = true;
+ ctx.read();
+ }
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (queue == null) {
+ queue = RecyclableArrayDeque.newInstance();
+ }
+
+ queue.offer(msg);
+
+ // We just received one message. Do we need to relay it regardless
+ // of the auto reading configuration? The answer is yes if this
+ // method was called as a result of a prior read() call.
+ int minConsume = shouldConsume ? 1 : 0;
+ shouldConsume = false;
+
+ dequeue(ctx, minConsume);
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ // Don't relay completion events from upstream as they
+ // make no sense in this context. See dequeue() where
+ // a new set of completion events is being produced.
+ }
+
+ /**
+ * Dequeues one or many (or none) messages depending on the channel's auto
+ * reading state and returns the number of messages that were consumed from
+ * the internal queue.
+ *
+ * The {@code minConsume} argument is used to force {@code dequeue()} into
+ * consuming that number of messages regardless of the channel's auto
+ * reading configuration.
+ *
+ * @see #read(ChannelHandlerContext)
+ * @see #channelRead(ChannelHandlerContext, Object)
+ */
+ private int dequeue(ChannelHandlerContext ctx, int minConsume) {
+ if (queue != null) {
+
+ int consumed = 0;
+
+ Object msg;
+ while ((consumed < minConsume) || config.isAutoRead()) {
+ msg = queue.poll();
+ if (msg == null) {
+ break;
+ }
+
+ ++consumed;
+ ctx.fireChannelRead(msg);
+ }
+
+ // We're firing a completion event every time one (or more)
+ // messages were consumed and the queue ended up being drained
+ // to an empty state.
+ if (queue.isEmpty() && consumed > 0) {
+ ctx.fireChannelReadComplete();
+ }
+
+ return consumed;
+ }
+
+ return 0;
+ }
+
+ /**
+ * A recyclable {@link ArrayDeque}.
+ */
+ private static final class RecyclableArrayDeque extends ArrayDeque {
+
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * A value of {@code 2} should be a good choice for most scenarios.
+ */
+ private static final int DEFAULT_NUM_ELEMENTS = 2;
+
+ private static final Recycler RECYCLER = new Recycler() {
+ @Override
+ protected RecyclableArrayDeque newObject(Handle handle) {
+ return new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle);
+ }
+ };
+
+ public static RecyclableArrayDeque newInstance() {
+ return RECYCLER.get();
+ }
+
+ private final Handle handle;
+
+ private RecyclableArrayDeque(int numElements, Handle handle) {
+ super(numElements);
+ this.handle = handle;
+ }
+
+ public void recycle() {
+ clear();
+ handle.recycle(this);
+ }
+ }
+}
diff --git a/handler/src/main/java/io/netty/handler/flow/package-info.java b/handler/src/main/java/io/netty/handler/flow/package-info.java
new file mode 100644
index 0000000000..2e5c763d38
--- /dev/null
+++ b/handler/src/main/java/io/netty/handler/flow/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Package to control the flow of messages.
+ */
+package io.netty.handler.flow;
diff --git a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java
new file mode 100644
index 0000000000..26305a0d08
--- /dev/null
+++ b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java
@@ -0,0 +1,389 @@
+/*
+ * Copyright 2016 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.netty.handler.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.ReferenceCountUtil;
+
+public class FlowControlHandlerTest {
+ private static EventLoopGroup GROUP;
+
+ @BeforeClass
+ public static void init() {
+ GROUP = new NioEventLoopGroup();
+ }
+
+ @AfterClass
+ public static void destroy() {
+ GROUP.shutdownGracefully();
+ }
+
+ /**
+ * The {@link OneByteToThreeStringsDecoder} decodes this {@code byte[]} into three messages.
+ */
+ private static ByteBuf newOneMessage() {
+ return Unpooled.wrappedBuffer(new byte[]{ 1 });
+ }
+
+ private static Channel newServer(final boolean autoRead, final ChannelHandler... handlers) {
+ assertTrue(handlers.length >= 1);
+
+ ServerBootstrap serverBootstrap = new ServerBootstrap();
+ serverBootstrap.group(GROUP)
+ .channel(NioServerSocketChannel.class)
+ .childOption(ChannelOption.AUTO_READ, autoRead)
+ .childHandler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(new OneByteToThreeStringsDecoder());
+ pipeline.addLast(handlers);
+ }
+ });
+
+ return serverBootstrap.bind(0)
+ .syncUninterruptibly()
+ .channel();
+ }
+
+ private static Channel newClient(SocketAddress server) {
+ Bootstrap bootstrap = new Bootstrap();
+
+ bootstrap.group(GROUP)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
+ .handler(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ fail("In this test the client is never receiving a message from the server.");
+ }
+ });
+
+ return bootstrap.connect(server)
+ .syncUninterruptibly()
+ .channel();
+ }
+
+ /**
+ * This test demonstrates the default behavior if auto reading
+ * is turned on from the get-go and you're trying to turn it off
+ * once you've received your first message.
+ *
+ * NOTE: This test waits for the client to disconnect which is
+ * interpreted as the signal that all {@code byte}s have been
+ * transferred to the server.
+ */
+ @Test
+ public void testAutoReadingOn() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(3);
+
+ ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ ReferenceCountUtil.release(msg);
+ // We're turning off auto reading in the hope that no
+ // new messages are being sent but that is not true.
+ ctx.channel().config().setAutoRead(false);
+
+ latch.countDown();
+ }
+ };
+
+ Channel server = newServer(true, handler);
+ Channel client = newClient(server.localAddress());
+
+ try {
+ client.writeAndFlush(newOneMessage())
+ .syncUninterruptibly();
+
+ // We received three messages even through auto reading
+ // was turned off after we received the first message.
+ assertTrue(latch.await(1L, TimeUnit.SECONDS));
+ } finally {
+ client.close();
+ server.close();
+ }
+ }
+
+ /**
+ * This test demonstrates the default behavior if auto reading
+ * is turned off from the get-go and you're calling read() in
+ * the hope that only one message will be returned.
+ *
+ * NOTE: This test waits for the client to disconnect which is
+ * interpreted as the signal that all {@code byte}s have been
+ * transferred to the server.
+ */
+ @Test
+ public void testAutoReadingOff() throws Exception {
+ final Exchanger peerRef = new Exchanger();
+ final CountDownLatch latch = new CountDownLatch(3);
+
+ ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS);
+ ctx.fireChannelActive();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ ReferenceCountUtil.release(msg);
+ latch.countDown();
+ }
+ };
+
+ Channel server = newServer(false, handler);
+ Channel client = newClient(server.localAddress());
+
+ try {
+ // The client connection on the server side
+ Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS);
+
+ // Write the message
+ client.writeAndFlush(newOneMessage())
+ .syncUninterruptibly();
+
+ // Read the message
+ peer.read();
+
+ // We received all three messages but hoped that only one
+ // message was read because auto reading was off and we
+ // invoked the read() method only once.
+ assertTrue(latch.await(1L, TimeUnit.SECONDS));
+ } finally {
+ client.close();
+ server.close();
+ }
+ }
+
+ /**
+ * The {@link FlowControlHandler} will simply pass-through all messages
+ * if auto reading is on and remains on.
+ */
+ @Test
+ public void testFlowAutoReadOn() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(3);
+
+ ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ latch.countDown();
+ }
+ };
+
+ FlowControlHandler flow = new FlowControlHandler();
+ Channel server = newServer(true, flow, handler);
+ Channel client = newClient(server.localAddress());
+ try {
+ // Write the message
+ client.writeAndFlush(newOneMessage())
+ .syncUninterruptibly();
+
+ // We should receive 3 messages
+ assertTrue(latch.await(1L, TimeUnit.SECONDS));
+ assertTrue(flow.queue().isEmpty());
+ } finally {
+ client.close();
+ server.close();
+ }
+ }
+
+ /**
+ * The {@link FlowControlHandler} will pass down messages one by one
+ * if {@link ChannelConfig#setAutoRead(boolean)} is being toggled.
+ */
+ @Test
+ public void testFlowToggleAutoRead() throws Exception {
+ final Exchanger peerRef = new Exchanger();
+ final AtomicReference latchRef
+ = new AtomicReference(new CountDownLatch(1));
+
+ final AtomicInteger counter = new AtomicInteger();
+
+ ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS);
+ ctx.fireChannelActive();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ ReferenceCountUtil.release(msg);
+
+ // Disable auto reading after each message
+ counter.incrementAndGet();
+ ctx.channel().config().setAutoRead(false);
+
+ CountDownLatch latch = latchRef.get();
+ latch.countDown();
+ }
+ };
+
+ FlowControlHandler flow = new FlowControlHandler();
+ Channel server = newServer(true, flow, handler);
+ Channel client = newClient(server.localAddress());
+ try {
+ // The client connection on the server side
+ Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS);
+
+ client.writeAndFlush(newOneMessage())
+ .syncUninterruptibly();
+
+ // channelRead(1)
+ assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS));
+ assertFalse(peer.config().isAutoRead());
+ assertEquals(1, counter.get());
+ assertThat(flow.queue(), IsIterableContainingInOrder.contains("2", "3"));
+
+ // channelRead(2)
+ latchRef.set(new CountDownLatch(1));
+ peer.config().setAutoRead(true);
+ assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS));
+ assertFalse(peer.config().isAutoRead());
+ assertEquals(2, counter.get());
+ assertThat(flow.queue(), IsIterableContainingInOrder.contains("3"));
+
+ // channelRead(3)
+ latchRef.set(new CountDownLatch(1));
+ peer.config().setAutoRead(true);
+ assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS));
+ assertFalse(peer.config().isAutoRead());
+ assertEquals(3, counter.get());
+ assertTrue(flow.queue().isEmpty());
+ } finally {
+ client.close();
+ server.close();
+ }
+ }
+
+ /**
+ * The {@link FlowControlHandler} will pass down messages one by one
+ * if auto reading is off and the user is calling {@code read()} on
+ * their own.
+ */
+ @Test
+ public void testFlowAutoReadOff() throws Exception {
+ final Exchanger peerRef = new Exchanger();
+ final AtomicReference latchRef
+ = new AtomicReference(new CountDownLatch(1));
+ final AtomicInteger counter = new AtomicInteger();
+
+ ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS);
+ ctx.fireChannelActive();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ counter.incrementAndGet();
+
+ CountDownLatch latch = latchRef.get();
+ latch.countDown();
+ }
+ };
+
+ FlowControlHandler flow = new FlowControlHandler();
+ Channel server = newServer(false, flow, handler);
+ Channel client = newClient(server.localAddress());
+ try {
+ // The client connection on the server side
+ Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS);
+
+ // Write the message
+ client.writeAndFlush(newOneMessage())
+ .syncUninterruptibly();
+
+ // channelRead(1)
+ peer.read();
+ assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS));
+ assertEquals(1, counter.get());
+ assertThat(flow.queue(), IsIterableContainingInOrder.contains("2", "3"));
+
+ // channelRead(2)
+ latchRef.set(new CountDownLatch(1));
+ peer.read();
+ assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS));
+ assertEquals(2, counter.get());
+ assertThat(flow.queue(), IsIterableContainingInOrder.contains("3"));
+
+ // channelRead(3)
+ latchRef.set(new CountDownLatch(1));
+ peer.read();
+ assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS));
+ assertEquals(3, counter.get());
+ assertTrue(flow.queue().isEmpty());
+ } finally {
+ client.close();
+ server.close();
+ }
+ }
+
+ /**
+ * This is a fictional message decoder. It decodes each {@code byte}
+ * into three strings.
+ */
+ private static final class OneByteToThreeStringsDecoder extends ByteToMessageDecoder {
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
+ for (int i = 0; i < in.readableBytes(); i++) {
+ out.add("1");
+ out.add("2");
+ out.add("3");
+ }
+ in.readerIndex(in.readableBytes());
+ }
+ }
+}