From 5eb0127c2a0baa4bf178d244db919c9fb3a7d36a Mon Sep 17 00:00:00 2001 From: Roger Kapsi Date: Tue, 5 Apr 2016 10:44:32 -0400 Subject: [PATCH] A new ChannelHandler that allows the user to control the flow of messages if upstream handlers emit more than one event for each read() Motivation: Some handlers such as HttpObjectDecoder can emit more than one event per read() which leads to problems in downstream handlers that expect only one event and hope that ChannelConfig#setAutoRead(false) prevents further events being sent while they're processing the one they've just received. Modifications: A new handler called FlowControlHandler that feeds off read() and isAutoRead() and acts as a holding buffer if auto reading gets turned off and more events arrive while auto reading is off. Result: Fixes issues such as #4895. --- .../handler/flow/FlowControlHandler.java | 247 +++++++++++ .../io/netty/handler/flow/package-info.java | 20 + .../handler/flow/FlowControlHandlerTest.java | 389 ++++++++++++++++++ 3 files changed, 656 insertions(+) create mode 100644 handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java create mode 100644 handler/src/main/java/io/netty/handler/flow/package-info.java create mode 100644 handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java diff --git a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java new file mode 100644 index 0000000000..1c07cd0711 --- /dev/null +++ b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java @@ -0,0 +1,247 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.flow; + +import java.util.ArrayDeque; +import java.util.Queue; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +/** + * The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream. + * + * Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as + * many events as they like for any given input. A channel's auto reading configuration doesn't usually + * apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would + * like to hold subsequent events while they're processing one event. It's a common problem with the + * {@code HttpObjectDecoder} that will very often fire a {@code HttpRequest} that is immediately followed + * by a {@code LastHttpContent} event. + * + *
+ * {@link ChannelPipeline} pipeline = ...;
+ *
+ * pipeline.addLast(new HttpServerCodec());
+ * pipeline.addLast(new {@link FlowControlHandler}());
+ *
+ * pipeline.addLast(new MyExampleHandler());
+ *
+ * class MyExampleHandler extends {@link ChannelInboundHandlerAdapter} {
+ *   @Override
+ *   public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
+ *     if (msg instanceof HttpRequest) {
+ *       ctx.channel().config().setAutoRead(false);
+ *
+ *       // The FlowControlHandler will hold any subsequent events that
+ *       // were emitted by HttpObjectDecoder until auto reading is turned
+ *       // back on or Channel#read() is being called.
+ *     }
+ *   }
+ * }
+ * 
+ * + * @see ChannelConfig#setAutoRead(boolean) + */ +public class FlowControlHandler extends ChannelDuplexHandler { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class); + + private final boolean releaseMessages; + + private RecyclableArrayDeque queue; + + private ChannelConfig config; + + private boolean shouldConsume; + + public FlowControlHandler() { + this(true); + } + + public FlowControlHandler(boolean releaseMessages) { + this.releaseMessages = releaseMessages; + } + + /** + * Returns a copy of the underlying {@link Queue}. This method exists for + * testing, debugging and inspection purposes and it is not Thread safe! + */ + Queue queue() { + RecyclableArrayDeque queue = this.queue; + + if (queue == null) { + return new ArrayDeque(0); + } + + return new ArrayDeque(queue); + } + + /** + * Releases all messages and destroys the {@link Queue}. + */ + private void destroy() { + if (queue != null) { + + if (!queue.isEmpty()) { + logger.trace("Non-empty queue: {}", queue); + + if (releaseMessages) { + Object msg; + while ((msg = queue.poll()) != null) { + ReferenceCountUtil.safeRelease(msg); + } + } + } + + queue.recycle(); + queue = null; + } + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + config = ctx.channel().config(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + destroy(); + ctx.fireChannelInactive(); + } + + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + if (dequeue(ctx, 1) == 0) { + // It seems no messages were consumed. We need to read() some + // messages from upstream and once one arrives it need to be + // relayed to downstream to keep the flow going. + shouldConsume = true; + ctx.read(); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (queue == null) { + queue = RecyclableArrayDeque.newInstance(); + } + + queue.offer(msg); + + // We just received one message. Do we need to relay it regardless + // of the auto reading configuration? The answer is yes if this + // method was called as a result of a prior read() call. + int minConsume = shouldConsume ? 1 : 0; + shouldConsume = false; + + dequeue(ctx, minConsume); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + // Don't relay completion events from upstream as they + // make no sense in this context. See dequeue() where + // a new set of completion events is being produced. + } + + /** + * Dequeues one or many (or none) messages depending on the channel's auto + * reading state and returns the number of messages that were consumed from + * the internal queue. + * + * The {@code minConsume} argument is used to force {@code dequeue()} into + * consuming that number of messages regardless of the channel's auto + * reading configuration. + * + * @see #read(ChannelHandlerContext) + * @see #channelRead(ChannelHandlerContext, Object) + */ + private int dequeue(ChannelHandlerContext ctx, int minConsume) { + if (queue != null) { + + int consumed = 0; + + Object msg; + while ((consumed < minConsume) || config.isAutoRead()) { + msg = queue.poll(); + if (msg == null) { + break; + } + + ++consumed; + ctx.fireChannelRead(msg); + } + + // We're firing a completion event every time one (or more) + // messages were consumed and the queue ended up being drained + // to an empty state. + if (queue.isEmpty() && consumed > 0) { + ctx.fireChannelReadComplete(); + } + + return consumed; + } + + return 0; + } + + /** + * A recyclable {@link ArrayDeque}. + */ + private static final class RecyclableArrayDeque extends ArrayDeque { + + private static final long serialVersionUID = 0L; + + /** + * A value of {@code 2} should be a good choice for most scenarios. + */ + private static final int DEFAULT_NUM_ELEMENTS = 2; + + private static final Recycler RECYCLER = new Recycler() { + @Override + protected RecyclableArrayDeque newObject(Handle handle) { + return new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle); + } + }; + + public static RecyclableArrayDeque newInstance() { + return RECYCLER.get(); + } + + private final Handle handle; + + private RecyclableArrayDeque(int numElements, Handle handle) { + super(numElements); + this.handle = handle; + } + + public void recycle() { + clear(); + handle.recycle(this); + } + } +} diff --git a/handler/src/main/java/io/netty/handler/flow/package-info.java b/handler/src/main/java/io/netty/handler/flow/package-info.java new file mode 100644 index 0000000000..2e5c763d38 --- /dev/null +++ b/handler/src/main/java/io/netty/handler/flow/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Package to control the flow of messages. + */ +package io.netty.handler.flow; diff --git a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java new file mode 100644 index 0000000000..26305a0d08 --- /dev/null +++ b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java @@ -0,0 +1,389 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.flow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.SocketAddress; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.hamcrest.collection.IsIterableContainingInOrder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.ReferenceCountUtil; + +public class FlowControlHandlerTest { + private static EventLoopGroup GROUP; + + @BeforeClass + public static void init() { + GROUP = new NioEventLoopGroup(); + } + + @AfterClass + public static void destroy() { + GROUP.shutdownGracefully(); + } + + /** + * The {@link OneByteToThreeStringsDecoder} decodes this {@code byte[]} into three messages. + */ + private static ByteBuf newOneMessage() { + return Unpooled.wrappedBuffer(new byte[]{ 1 }); + } + + private static Channel newServer(final boolean autoRead, final ChannelHandler... handlers) { + assertTrue(handlers.length >= 1); + + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(GROUP) + .channel(NioServerSocketChannel.class) + .childOption(ChannelOption.AUTO_READ, autoRead) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new OneByteToThreeStringsDecoder()); + pipeline.addLast(handlers); + } + }); + + return serverBootstrap.bind(0) + .syncUninterruptibly() + .channel(); + } + + private static Channel newClient(SocketAddress server) { + Bootstrap bootstrap = new Bootstrap(); + + bootstrap.group(GROUP) + .channel(NioSocketChannel.class) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000) + .handler(new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + fail("In this test the client is never receiving a message from the server."); + } + }); + + return bootstrap.connect(server) + .syncUninterruptibly() + .channel(); + } + + /** + * This test demonstrates the default behavior if auto reading + * is turned on from the get-go and you're trying to turn it off + * once you've received your first message. + * + * NOTE: This test waits for the client to disconnect which is + * interpreted as the signal that all {@code byte}s have been + * transferred to the server. + */ + @Test + public void testAutoReadingOn() throws Exception { + final CountDownLatch latch = new CountDownLatch(3); + + ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ReferenceCountUtil.release(msg); + // We're turning off auto reading in the hope that no + // new messages are being sent but that is not true. + ctx.channel().config().setAutoRead(false); + + latch.countDown(); + } + }; + + Channel server = newServer(true, handler); + Channel client = newClient(server.localAddress()); + + try { + client.writeAndFlush(newOneMessage()) + .syncUninterruptibly(); + + // We received three messages even through auto reading + // was turned off after we received the first message. + assertTrue(latch.await(1L, TimeUnit.SECONDS)); + } finally { + client.close(); + server.close(); + } + } + + /** + * This test demonstrates the default behavior if auto reading + * is turned off from the get-go and you're calling read() in + * the hope that only one message will be returned. + * + * NOTE: This test waits for the client to disconnect which is + * interpreted as the signal that all {@code byte}s have been + * transferred to the server. + */ + @Test + public void testAutoReadingOff() throws Exception { + final Exchanger peerRef = new Exchanger(); + final CountDownLatch latch = new CountDownLatch(3); + + ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS); + ctx.fireChannelActive(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ReferenceCountUtil.release(msg); + latch.countDown(); + } + }; + + Channel server = newServer(false, handler); + Channel client = newClient(server.localAddress()); + + try { + // The client connection on the server side + Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS); + + // Write the message + client.writeAndFlush(newOneMessage()) + .syncUninterruptibly(); + + // Read the message + peer.read(); + + // We received all three messages but hoped that only one + // message was read because auto reading was off and we + // invoked the read() method only once. + assertTrue(latch.await(1L, TimeUnit.SECONDS)); + } finally { + client.close(); + server.close(); + } + } + + /** + * The {@link FlowControlHandler} will simply pass-through all messages + * if auto reading is on and remains on. + */ + @Test + public void testFlowAutoReadOn() throws Exception { + final CountDownLatch latch = new CountDownLatch(3); + + ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + latch.countDown(); + } + }; + + FlowControlHandler flow = new FlowControlHandler(); + Channel server = newServer(true, flow, handler); + Channel client = newClient(server.localAddress()); + try { + // Write the message + client.writeAndFlush(newOneMessage()) + .syncUninterruptibly(); + + // We should receive 3 messages + assertTrue(latch.await(1L, TimeUnit.SECONDS)); + assertTrue(flow.queue().isEmpty()); + } finally { + client.close(); + server.close(); + } + } + + /** + * The {@link FlowControlHandler} will pass down messages one by one + * if {@link ChannelConfig#setAutoRead(boolean)} is being toggled. + */ + @Test + public void testFlowToggleAutoRead() throws Exception { + final Exchanger peerRef = new Exchanger(); + final AtomicReference latchRef + = new AtomicReference(new CountDownLatch(1)); + + final AtomicInteger counter = new AtomicInteger(); + + ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS); + ctx.fireChannelActive(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ReferenceCountUtil.release(msg); + + // Disable auto reading after each message + counter.incrementAndGet(); + ctx.channel().config().setAutoRead(false); + + CountDownLatch latch = latchRef.get(); + latch.countDown(); + } + }; + + FlowControlHandler flow = new FlowControlHandler(); + Channel server = newServer(true, flow, handler); + Channel client = newClient(server.localAddress()); + try { + // The client connection on the server side + Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS); + + client.writeAndFlush(newOneMessage()) + .syncUninterruptibly(); + + // channelRead(1) + assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); + assertFalse(peer.config().isAutoRead()); + assertEquals(1, counter.get()); + assertThat(flow.queue(), IsIterableContainingInOrder.contains("2", "3")); + + // channelRead(2) + latchRef.set(new CountDownLatch(1)); + peer.config().setAutoRead(true); + assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); + assertFalse(peer.config().isAutoRead()); + assertEquals(2, counter.get()); + assertThat(flow.queue(), IsIterableContainingInOrder.contains("3")); + + // channelRead(3) + latchRef.set(new CountDownLatch(1)); + peer.config().setAutoRead(true); + assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); + assertFalse(peer.config().isAutoRead()); + assertEquals(3, counter.get()); + assertTrue(flow.queue().isEmpty()); + } finally { + client.close(); + server.close(); + } + } + + /** + * The {@link FlowControlHandler} will pass down messages one by one + * if auto reading is off and the user is calling {@code read()} on + * their own. + */ + @Test + public void testFlowAutoReadOff() throws Exception { + final Exchanger peerRef = new Exchanger(); + final AtomicReference latchRef + = new AtomicReference(new CountDownLatch(1)); + final AtomicInteger counter = new AtomicInteger(); + + ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS); + ctx.fireChannelActive(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + counter.incrementAndGet(); + + CountDownLatch latch = latchRef.get(); + latch.countDown(); + } + }; + + FlowControlHandler flow = new FlowControlHandler(); + Channel server = newServer(false, flow, handler); + Channel client = newClient(server.localAddress()); + try { + // The client connection on the server side + Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS); + + // Write the message + client.writeAndFlush(newOneMessage()) + .syncUninterruptibly(); + + // channelRead(1) + peer.read(); + assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); + assertEquals(1, counter.get()); + assertThat(flow.queue(), IsIterableContainingInOrder.contains("2", "3")); + + // channelRead(2) + latchRef.set(new CountDownLatch(1)); + peer.read(); + assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); + assertEquals(2, counter.get()); + assertThat(flow.queue(), IsIterableContainingInOrder.contains("3")); + + // channelRead(3) + latchRef.set(new CountDownLatch(1)); + peer.read(); + assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); + assertEquals(3, counter.get()); + assertTrue(flow.queue().isEmpty()); + } finally { + client.close(); + server.close(); + } + } + + /** + * This is a fictional message decoder. It decodes each {@code byte} + * into three strings. + */ + private static final class OneByteToThreeStringsDecoder extends ByteToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + for (int i = 0; i < in.readableBytes(); i++) { + out.add("1"); + out.add("2"); + out.add("3"); + } + in.readerIndex(in.readableBytes()); + } + } +}