Fix performance regression which was caused by calling the fireInboundBufferUpdated() a way to often

* Also use a ThreadLocal for the decoder buffer to safe space as it is cheap
This commit is contained in:
Norman Maurer 2013-04-03 17:07:52 +02:00
parent c3559ddbda
commit 9828267165
2 changed files with 111 additions and 112 deletions

View File

@ -45,18 +45,23 @@ public abstract class ByteToMessageDecoder
private volatile boolean singleDecode; private volatile boolean singleDecode;
private boolean decodeWasNull; private boolean decodeWasNull;
private MessageBuf<Object> decoderOutput;
private static final ThreadLocal<MessageBuf<Object>> decoderOutput =
new ThreadLocal<MessageBuf<Object>>() {
@Override
protected MessageBuf<Object> initialValue() {
return Unpooled.messageBuffer();
}
};
@Override @Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
decoderOutput = Unpooled.messageBuffer();
return super.newInboundBuffer(ctx); return super.newInboundBuffer(ctx);
} }
@Override @Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
super.freeInboundBuffer(ctx); super.freeInboundBuffer(ctx);
decoderOutput.release();
} }
/** /**
@ -130,64 +135,60 @@ public abstract class ByteToMessageDecoder
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in) { protected void callDecode(ChannelHandlerContext ctx, ByteBuf in) {
boolean wasNull = false; boolean wasNull = false;
boolean decoded = false; boolean decoded = false;
MessageBuf<Object> out = decoderOutput(); MessageBuf<Object> out = decoderOutput();
assert out.isEmpty(); assert out.isEmpty();
while (in.isReadable()) { try {
try { while (in.isReadable()) {
int outSize = out.size(); try {
int oldInputLength = in.readableBytes(); int outSize = out.size();
decode(ctx, in, out); int oldInputLength = in.readableBytes();
if (outSize == out.size()) { decode(ctx, in, out);
wasNull = true; if (outSize == out.size()) {
wasNull = true;
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
wasNull = false;
if (oldInputLength == in.readableBytes()) { if (oldInputLength == in.readableBytes()) {
throw new IllegalStateException(
"decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break; break;
}
} catch (Throwable t) {
if (t instanceof CodecException) {
throw (CodecException) t;
} else { } else {
continue; throw new DecoderException(t);
} }
} }
wasNull = false;
if (oldInputLength == in.readableBytes()) {
throw new IllegalStateException(
"decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
} catch (Throwable t) {
if (t instanceof CodecException) {
throw (CodecException) t;
} else {
throw new DecoderException(t);
}
} finally {
for (;;) {
Object msg = out.poll();
if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
if (decoded) {
decoded = false;
ctx.fireInboundBufferUpdated();
}
} }
} } finally {
for (;;) {
Object msg = out.poll();
if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
if (decoded) { if (decoded) {
decodeWasNull = false; decodeWasNull = false;
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
} else { } else {
if (wasNull) { if (wasNull) {
decodeWasNull = true; decodeWasNull = true;
}
} }
} }
} }
@ -217,7 +218,7 @@ public abstract class ByteToMessageDecoder
} }
final MessageBuf<Object> decoderOutput() { final MessageBuf<Object> decoderOutput() {
return decoderOutput; return decoderOutput.get();
} }
} }

View File

@ -406,77 +406,75 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
@Override @Override
protected void callDecode(ChannelHandlerContext ctx, ByteBuf buf) { protected void callDecode(ChannelHandlerContext ctx, ByteBuf buf) {
boolean wasNull = false; boolean wasNull = false;
ByteBuf in = cumulation; ByteBuf in = cumulation;
MessageBuf<Object> out = decoderOutput(); MessageBuf<Object> out = decoderOutput();
boolean decoded = false; boolean decoded = false;
while (in.isReadable()) {
try { assert out.isEmpty();
int oldReaderIndex = checkpoint = in.readerIndex();
int outSize = out.size(); try {
S oldState = state; while (in.isReadable()) {
try { try {
decode(ctx, replayable, out); int oldReaderIndex = checkpoint = in.readerIndex();
if (outSize == out.size()) { int outSize = out.size();
wasNull = true; S oldState = state;
if (oldReaderIndex == in.readerIndex() && oldState == state) { try {
throw new IllegalStateException( decode(ctx, replayable, out);
"null cannot be returned if no data is consumed and state didn't change."); if (outSize == out.size()) {
} else { wasNull = true;
// Previous data has been discarded or caused state transition. if (oldReaderIndex == in.readerIndex() && oldState == state) {
// Probably it is reading on. throw new IllegalStateException(
continue; "null cannot be returned if no data is consumed and state didn't change.");
} else {
// Previous data has been discarded or caused state transition.
// Probably it is reading on.
continue;
}
}
} catch (Signal replay) {
replay.expect(REPLAY);
// Return to the checkpoint (or oldPosition) and retry.
int checkpoint = this.checkpoint;
if (checkpoint >= 0) {
in.readerIndex(checkpoint);
} else {
// Called by cleanup() - no need to maintain the readerIndex
// anymore because the buffer has been released already.
} }
}
} catch (Signal replay) {
replay.expect(REPLAY);
// Return to the checkpoint (or oldPosition) and retry.
int checkpoint = this.checkpoint;
if (checkpoint >= 0) {
in.readerIndex(checkpoint);
} else {
// Called by cleanup() - no need to maintain the readerIndex
// anymore because the buffer has been released already.
}
break;
}
wasNull = false;
if (oldReaderIndex == in.readerIndex() && oldState == state) {
throw new IllegalStateException(
"decode() method must consume at least one byte " +
"if it returned a decoded message (caused by: " +
getClass() + ')');
}
} catch (Throwable t) {
if (t instanceof CodecException) {
throw (CodecException) t;
} else {
throw new DecoderException(t);
}
} finally {
for (;;) {
Object msg = out.poll();
if (msg == null) {
break; break;
} }
decoded = true; wasNull = false;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
} if (oldReaderIndex == in.readerIndex() && oldState == state) {
if (decoded) { throw new IllegalStateException(
decoded = false; "decode() method must consume at least one byte " +
ctx.fireInboundBufferUpdated(); "if it returned a decoded message (caused by: " +
getClass() + ')');
}
} catch (Throwable t) {
if (t instanceof CodecException) {
throw (CodecException) t;
} else {
throw new DecoderException(t);
}
} }
} }
} } finally {
for (;;) {
if (decoded) { Object msg = out.poll();
decodeWasNull = false; if (msg == null) {
ctx.fireInboundBufferUpdated(); break;
} else { }
if (wasNull) { decoded = true;
decodeWasNull = true; ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
if (decoded) {
decodeWasNull = false;
ctx.fireInboundBufferUpdated();
} else {
if (wasNull) {
decodeWasNull = true;
}
} }
} }
} }