[#4284] Forward decoded messages more frequently
Motivation: At the moment we only forward decoded messages that were added the out List once the full decode loop was completed. This has the affect that resources may not be released as fast as possible and as an application may incounter higher latency if the user triggeres a writeAndFlush(...) as a result of the decoded messages. Modifications: - forward decoded messages after each decode call Result: Forwarding decoded messages through the pipeline in a more eager fashion.
This commit is contained in:
parent
207d0ab2ab
commit
a605fa4411
@ -260,11 +260,8 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int size = out.size();
|
int size = out.size();
|
||||||
decodeWasNull = size == 0;
|
decodeWasNull = !out.insertSinceRecycled();
|
||||||
|
fireChannelRead(ctx, out, size);
|
||||||
for (int i = 0; i < size; i ++) {
|
|
||||||
ctx.fireChannelRead(out.get(i));
|
|
||||||
}
|
|
||||||
out.recycle();
|
out.recycle();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -272,6 +269,15 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get {@code numElements} out of the {@link List} and forward these through the pipeline.
|
||||||
|
*/
|
||||||
|
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
|
||||||
|
for (int i = 0; i < numElements; i ++) {
|
||||||
|
ctx.fireChannelRead(msgs.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
numReads = 0;
|
numReads = 0;
|
||||||
@ -319,9 +325,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
|||||||
cumulation = null;
|
cumulation = null;
|
||||||
}
|
}
|
||||||
int size = out.size();
|
int size = out.size();
|
||||||
for (int i = 0; i < size; i++) {
|
fireChannelRead(ctx, out, size);
|
||||||
ctx.fireChannelRead(out.get(i));
|
|
||||||
}
|
|
||||||
if (size > 0) {
|
if (size > 0) {
|
||||||
// Something was read, call fireChannelReadComplete()
|
// Something was read, call fireChannelReadComplete()
|
||||||
ctx.fireChannelReadComplete();
|
ctx.fireChannelReadComplete();
|
||||||
@ -346,6 +350,13 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
|||||||
try {
|
try {
|
||||||
while (in.isReadable()) {
|
while (in.isReadable()) {
|
||||||
int outSize = out.size();
|
int outSize = out.size();
|
||||||
|
|
||||||
|
if (outSize > 0) {
|
||||||
|
fireChannelRead(ctx, out, outSize);
|
||||||
|
out.clear();
|
||||||
|
outSize = 0;
|
||||||
|
}
|
||||||
|
|
||||||
int oldInputLength = in.readableBytes();
|
int oldInputLength = in.readableBytes();
|
||||||
decode(ctx, in, out);
|
decode(ctx, in, out);
|
||||||
|
|
||||||
|
@ -341,11 +341,10 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
|
|||||||
cumulation.release();
|
cumulation.release();
|
||||||
cumulation = null;
|
cumulation = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
int size = out.size();
|
int size = out.size();
|
||||||
for (int i = 0; i < size; i++) {
|
|
||||||
ctx.fireChannelRead(out.get(i));
|
|
||||||
}
|
|
||||||
if (size > 0) {
|
if (size > 0) {
|
||||||
|
fireChannelRead(ctx, out, size);
|
||||||
// Something was read, call fireChannelReadComplete()
|
// Something was read, call fireChannelReadComplete()
|
||||||
ctx.fireChannelReadComplete();
|
ctx.fireChannelReadComplete();
|
||||||
}
|
}
|
||||||
@ -364,6 +363,13 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
|
|||||||
while (in.isReadable()) {
|
while (in.isReadable()) {
|
||||||
int oldReaderIndex = checkpoint = in.readerIndex();
|
int oldReaderIndex = checkpoint = in.readerIndex();
|
||||||
int outSize = out.size();
|
int outSize = out.size();
|
||||||
|
|
||||||
|
if (outSize > 0) {
|
||||||
|
fireChannelRead(ctx, out, outSize);
|
||||||
|
out.clear();
|
||||||
|
outSize = 0;
|
||||||
|
}
|
||||||
|
|
||||||
S oldState = state;
|
S oldState = state;
|
||||||
int oldInputLength = in.readableBytes();
|
int oldInputLength = in.readableBytes();
|
||||||
try {
|
try {
|
||||||
|
@ -40,6 +40,8 @@ public final class RecyclableArrayList extends ArrayList<Object> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private boolean insertSinceRecycled;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new empty {@link RecyclableArrayList} instance
|
* Create a new empty {@link RecyclableArrayList} instance
|
||||||
*/
|
*/
|
||||||
@ -70,13 +72,21 @@ public final class RecyclableArrayList extends ArrayList<Object> {
|
|||||||
@Override
|
@Override
|
||||||
public boolean addAll(Collection<?> c) {
|
public boolean addAll(Collection<?> c) {
|
||||||
checkNullElements(c);
|
checkNullElements(c);
|
||||||
return super.addAll(c);
|
if (super.addAll(c)) {
|
||||||
|
insertSinceRecycled = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean addAll(int index, Collection<?> c) {
|
public boolean addAll(int index, Collection<?> c) {
|
||||||
checkNullElements(c);
|
checkNullElements(c);
|
||||||
return super.addAll(index, c);
|
if (super.addAll(index, c)) {
|
||||||
|
insertSinceRecycled = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void checkNullElements(Collection<?> c) {
|
private static void checkNullElements(Collection<?> c) {
|
||||||
@ -103,7 +113,11 @@ public final class RecyclableArrayList extends ArrayList<Object> {
|
|||||||
if (element == null) {
|
if (element == null) {
|
||||||
throw new NullPointerException("element");
|
throw new NullPointerException("element");
|
||||||
}
|
}
|
||||||
return super.add(element);
|
if (super.add(element)) {
|
||||||
|
insertSinceRecycled = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -112,6 +126,7 @@ public final class RecyclableArrayList extends ArrayList<Object> {
|
|||||||
throw new NullPointerException("element");
|
throw new NullPointerException("element");
|
||||||
}
|
}
|
||||||
super.add(index, element);
|
super.add(index, element);
|
||||||
|
insertSinceRecycled = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -119,7 +134,16 @@ public final class RecyclableArrayList extends ArrayList<Object> {
|
|||||||
if (element == null) {
|
if (element == null) {
|
||||||
throw new NullPointerException("element");
|
throw new NullPointerException("element");
|
||||||
}
|
}
|
||||||
return super.set(index, element);
|
Object old = super.set(index, element);
|
||||||
|
insertSinceRecycled = true;
|
||||||
|
return old;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns {@code true} if any elements where added or set. This will be reset once {@link #recycle()} was called.
|
||||||
|
*/
|
||||||
|
public boolean insertSinceRecycled() {
|
||||||
|
return insertSinceRecycled;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -127,6 +151,7 @@ public final class RecyclableArrayList extends ArrayList<Object> {
|
|||||||
*/
|
*/
|
||||||
public boolean recycle() {
|
public boolean recycle() {
|
||||||
clear();
|
clear();
|
||||||
|
insertSinceRecycled = false;
|
||||||
return RECYCLER.recycle(this, handle);
|
return RECYCLER.recycle(this, handle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user