[#4284] Forward decoded messages more frequently

Motivation:

At the moment we only forward decoded messages that were added the out List once the full decode loop was completed. This has the affect that resources may not be released as fast as possible and as an application may incounter higher latency if the user triggeres a writeAndFlush(...) as a result of the decoded messages.

Modifications:

- forward decoded messages after each decode call

Result:

Forwarding decoded messages through the pipeline in a more eager fashion.
This commit is contained in:
Norman Maurer 2015-10-06 13:37:52 +02:00
parent 80de5fa9c9
commit 99dfc9ea79
3 changed files with 57 additions and 15 deletions

View File

@ -259,11 +259,8 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
} }
int size = out.size(); int size = out.size();
decodeWasNull = size == 0; decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
}
out.recycle(); out.recycle();
} }
} else { } else {
@ -271,6 +268,15 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
} }
} }
/**
* Get {@code numElements} out of the {@link List} and forward these through the pipeline.
*/
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.get(i));
}
}
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0; numReads = 0;
@ -318,9 +324,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
cumulation = null; cumulation = null;
} }
int size = out.size(); int size = out.size();
for (int i = 0; i < size; i++) { fireChannelRead(ctx, out, size);
ctx.fireChannelRead(out.get(i));
}
if (size > 0) { if (size > 0) {
// Something was read, call fireChannelReadComplete() // Something was read, call fireChannelReadComplete()
ctx.fireChannelReadComplete(); ctx.fireChannelReadComplete();
@ -345,6 +349,13 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
try { try {
while (in.isReadable()) { while (in.isReadable()) {
int outSize = out.size(); int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
outSize = 0;
}
int oldInputLength = in.readableBytes(); int oldInputLength = in.readableBytes();
decode(ctx, in, out); decode(ctx, in, out);

View File

@ -341,11 +341,10 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
cumulation.release(); cumulation.release();
cumulation = null; cumulation = null;
} }
int size = out.size(); int size = out.size();
for (int i = 0; i < size; i++) {
ctx.fireChannelRead(out.get(i));
}
if (size > 0) { if (size > 0) {
fireChannelRead(ctx, out, size);
// Something was read, call fireChannelReadComplete() // Something was read, call fireChannelReadComplete()
ctx.fireChannelReadComplete(); ctx.fireChannelReadComplete();
} }
@ -364,6 +363,13 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
while (in.isReadable()) { while (in.isReadable()) {
int oldReaderIndex = checkpoint = in.readerIndex(); int oldReaderIndex = checkpoint = in.readerIndex();
int outSize = out.size(); int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
outSize = 0;
}
S oldState = state; S oldState = state;
int oldInputLength = in.readableBytes(); int oldInputLength = in.readableBytes();
try { try {

View File

@ -40,6 +40,8 @@ public final class RecyclableArrayList extends ArrayList<Object> {
} }
}; };
private boolean insertSinceRecycled;
/** /**
* Create a new empty {@link RecyclableArrayList} instance * Create a new empty {@link RecyclableArrayList} instance
*/ */
@ -70,13 +72,21 @@ public final class RecyclableArrayList extends ArrayList<Object> {
@Override @Override
public boolean addAll(Collection<?> c) { public boolean addAll(Collection<?> c) {
checkNullElements(c); checkNullElements(c);
return super.addAll(c); if (super.addAll(c)) {
insertSinceRecycled = true;
return true;
}
return false;
} }
@Override @Override
public boolean addAll(int index, Collection<?> c) { public boolean addAll(int index, Collection<?> c) {
checkNullElements(c); checkNullElements(c);
return super.addAll(index, c); if (super.addAll(index, c)) {
insertSinceRecycled = true;
return true;
}
return false;
} }
private static void checkNullElements(Collection<?> c) { private static void checkNullElements(Collection<?> c) {
@ -103,7 +113,11 @@ public final class RecyclableArrayList extends ArrayList<Object> {
if (element == null) { if (element == null) {
throw new NullPointerException("element"); throw new NullPointerException("element");
} }
return super.add(element); if (super.add(element)) {
insertSinceRecycled = true;
return true;
}
return false;
} }
@Override @Override
@ -112,6 +126,7 @@ public final class RecyclableArrayList extends ArrayList<Object> {
throw new NullPointerException("element"); throw new NullPointerException("element");
} }
super.add(index, element); super.add(index, element);
insertSinceRecycled = true;
} }
@Override @Override
@ -119,7 +134,16 @@ public final class RecyclableArrayList extends ArrayList<Object> {
if (element == null) { if (element == null) {
throw new NullPointerException("element"); throw new NullPointerException("element");
} }
return super.set(index, element); Object old = super.set(index, element);
insertSinceRecycled = true;
return old;
}
/**
* Returns {@code true} if any elements where added or set. This will be reset once {@link #recycle()} was called.
*/
public boolean insertSinceRecycled() {
return insertSinceRecycled;
} }
/** /**
@ -127,6 +151,7 @@ public final class RecyclableArrayList extends ArrayList<Object> {
*/ */
public boolean recycle() { public boolean recycle() {
clear(); clear();
insertSinceRecycled = false;
return RECYCLER.recycle(this, handle); return RECYCLER.recycle(this, handle);
} }
} }