From 11e8163aa9d29074d5002da662c5284e82ecd0d4 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 6 Oct 2015 13:37:52 +0200 Subject: [PATCH] [#4284] Forward decoded messages more frequently Motivation: At the moment we only forward decoded messages that were added the out List once the full decode loop was completed. This has the affect that resources may not be released as fast as possible and as an application may incounter higher latency if the user triggeres a writeAndFlush(...) as a result of the decoded messages. Modifications: - forward decoded messages after each decode call Result: Forwarding decoded messages through the pipeline in a more eager fashion. --- .../handler/codec/ByteToMessageDecoder.java | 27 ++++++++++----- .../netty/handler/codec/ReplayingDecoder.java | 12 +++++-- .../util/internal/RecyclableArrayList.java | 33 ++++++++++++++++--- 3 files changed, 57 insertions(+), 15 deletions(-) diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 9e745b4376..ef071243b3 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -259,11 +259,8 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter } int size = out.size(); - decodeWasNull = size == 0; - - for (int i = 0; i < size; i ++) { - ctx.fireChannelRead(out.get(i)); - } + decodeWasNull = !out.insertSinceRecycled(); + fireChannelRead(ctx, out, size); out.recycle(); } } else { @@ -271,6 +268,15 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter } } + /** + * Get {@code numElements} out of the {@link List} and forward these through the pipeline. + */ + static void fireChannelRead(ChannelHandlerContext ctx, List msgs, int numElements) { + for (int i = 0; i < numElements; i ++) { + ctx.fireChannelRead(msgs.get(i)); + } + } + @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { numReads = 0; @@ -318,9 +324,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter cumulation = null; } int size = out.size(); - for (int i = 0; i < size; i++) { - ctx.fireChannelRead(out.get(i)); - } + fireChannelRead(ctx, out, size); if (size > 0) { // Something was read, call fireChannelReadComplete() ctx.fireChannelReadComplete(); @@ -345,6 +349,13 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter try { while (in.isReadable()) { int outSize = out.size(); + + if (outSize > 0) { + fireChannelRead(ctx, out, outSize); + out.clear(); + outSize = 0; + } + int oldInputLength = in.readableBytes(); decode(ctx, in, out); diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index f43e11b882..8b9fbed6df 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -341,11 +341,10 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { cumulation.release(); cumulation = null; } + int size = out.size(); - for (int i = 0; i < size; i++) { - ctx.fireChannelRead(out.get(i)); - } if (size > 0) { + fireChannelRead(ctx, out, size); // Something was read, call fireChannelReadComplete() ctx.fireChannelReadComplete(); } @@ -364,6 +363,13 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { while (in.isReadable()) { int oldReaderIndex = checkpoint = in.readerIndex(); int outSize = out.size(); + + if (outSize > 0) { + fireChannelRead(ctx, out, outSize); + out.clear(); + outSize = 0; + } + S oldState = state; int oldInputLength = in.readableBytes(); try { diff --git a/common/src/main/java/io/netty/util/internal/RecyclableArrayList.java b/common/src/main/java/io/netty/util/internal/RecyclableArrayList.java index 38847bd279..b81b0a08d2 100644 --- a/common/src/main/java/io/netty/util/internal/RecyclableArrayList.java +++ b/common/src/main/java/io/netty/util/internal/RecyclableArrayList.java @@ -40,6 +40,8 @@ public final class RecyclableArrayList extends ArrayList { } }; + private boolean insertSinceRecycled; + /** * Create a new empty {@link RecyclableArrayList} instance */ @@ -70,13 +72,21 @@ public final class RecyclableArrayList extends ArrayList { @Override public boolean addAll(Collection c) { checkNullElements(c); - return super.addAll(c); + if (super.addAll(c)) { + insertSinceRecycled = true; + return true; + } + return false; } @Override public boolean addAll(int index, Collection c) { checkNullElements(c); - return super.addAll(index, c); + if (super.addAll(index, c)) { + insertSinceRecycled = true; + return true; + } + return false; } private static void checkNullElements(Collection c) { @@ -103,7 +113,11 @@ public final class RecyclableArrayList extends ArrayList { if (element == null) { throw new NullPointerException("element"); } - return super.add(element); + if (super.add(element)) { + insertSinceRecycled = true; + return true; + } + return false; } @Override @@ -112,6 +126,7 @@ public final class RecyclableArrayList extends ArrayList { throw new NullPointerException("element"); } super.add(index, element); + insertSinceRecycled = true; } @Override @@ -119,7 +134,16 @@ public final class RecyclableArrayList extends ArrayList { if (element == null) { throw new NullPointerException("element"); } - return super.set(index, element); + Object old = super.set(index, element); + insertSinceRecycled = true; + return old; + } + + /** + * Returns {@code true} if any elements where added or set. This will be reset once {@link #recycle()} was called. + */ + public boolean insertSinceRecycled() { + return insertSinceRecycled; } /** @@ -127,6 +151,7 @@ public final class RecyclableArrayList extends ArrayList { */ public boolean recycle() { clear(); + insertSinceRecycled = false; return RECYCLER.recycle(this, handle); } }