[#4284] Forward decoded messages more frequently

Motivation:

At the moment we only forward decoded messages that were added the out List once the full decode loop was completed. This has the affect that resources may not be released as fast as possible and as an application may incounter higher latency if the user triggeres a writeAndFlush(...) as a result of the decoded messages.

Modifications:

- forward decoded messages after each decode call

Result:

Forwarding decoded messages through the pipeline in a more eager fashion.
This commit is contained in:
Norman Maurer 2015-10-06 13:37:52 +02:00
parent 66c3c58d3e
commit 11e8163aa9
3 changed files with 57 additions and 15 deletions

View File

@ -259,11 +259,8 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
}
int size = out.size();
decodeWasNull = size == 0;
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
}
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
@ -271,6 +268,15 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
}
}
/**
* Get {@code numElements} out of the {@link List} and forward these through the pipeline.
*/
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.get(i));
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
@ -318,9 +324,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
cumulation = null;
}
int size = out.size();
for (int i = 0; i < size; i++) {
ctx.fireChannelRead(out.get(i));
}
fireChannelRead(ctx, out, size);
if (size > 0) {
// Something was read, call fireChannelReadComplete()
ctx.fireChannelReadComplete();
@ -345,6 +349,13 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
outSize = 0;
}
int oldInputLength = in.readableBytes();
decode(ctx, in, out);

View File

@ -341,11 +341,10 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
cumulation.release();
cumulation = null;
}
int size = out.size();
for (int i = 0; i < size; i++) {
ctx.fireChannelRead(out.get(i));
}
if (size > 0) {
fireChannelRead(ctx, out, size);
// Something was read, call fireChannelReadComplete()
ctx.fireChannelReadComplete();
}
@ -364,6 +363,13 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
while (in.isReadable()) {
int oldReaderIndex = checkpoint = in.readerIndex();
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
outSize = 0;
}
S oldState = state;
int oldInputLength = in.readableBytes();
try {

View File

@ -40,6 +40,8 @@ public final class RecyclableArrayList extends ArrayList<Object> {
}
};
private boolean insertSinceRecycled;
/**
* Create a new empty {@link RecyclableArrayList} instance
*/
@ -70,13 +72,21 @@ public final class RecyclableArrayList extends ArrayList<Object> {
@Override
public boolean addAll(Collection<?> c) {
checkNullElements(c);
return super.addAll(c);
if (super.addAll(c)) {
insertSinceRecycled = true;
return true;
}
return false;
}
@Override
public boolean addAll(int index, Collection<?> c) {
checkNullElements(c);
return super.addAll(index, c);
if (super.addAll(index, c)) {
insertSinceRecycled = true;
return true;
}
return false;
}
private static void checkNullElements(Collection<?> c) {
@ -103,7 +113,11 @@ public final class RecyclableArrayList extends ArrayList<Object> {
if (element == null) {
throw new NullPointerException("element");
}
return super.add(element);
if (super.add(element)) {
insertSinceRecycled = true;
return true;
}
return false;
}
@Override
@ -112,6 +126,7 @@ public final class RecyclableArrayList extends ArrayList<Object> {
throw new NullPointerException("element");
}
super.add(index, element);
insertSinceRecycled = true;
}
@Override
@ -119,7 +134,16 @@ public final class RecyclableArrayList extends ArrayList<Object> {
if (element == null) {
throw new NullPointerException("element");
}
return super.set(index, element);
Object old = super.set(index, element);
insertSinceRecycled = true;
return old;
}
/**
* Returns {@code true} if any elements where added or set. This will be reset once {@link #recycle()} was called.
*/
public boolean insertSinceRecycled() {
return insertSinceRecycled;
}
/**
@ -127,6 +151,7 @@ public final class RecyclableArrayList extends ArrayList<Object> {
*/
public boolean recycle() {
clear();
insertSinceRecycled = false;
return RECYCLER.recycle(this, handle);
}
}