[#1812] Rework ByteToMessageDecoder.channelRead(..) method to allow for inlining

This commit is contained in:
Norman Maurer 2013-10-23 13:55:53 +02:00
parent 3de7a0bf76
commit 5a844d0bd6

View File

@ -49,6 +49,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
ByteBuf cumulation; ByteBuf cumulation;
private boolean singleDecode; private boolean singleDecode;
private boolean decodeWasNull; private boolean decodeWasNull;
private boolean first;
protected ByteToMessageDecoder() { protected ByteToMessageDecoder() {
if (getClass().isAnnotationPresent(Sharable.class)) { if (getClass().isAnnotationPresent(Sharable.class)) {
@ -121,64 +122,57 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RecyclableArrayList out = RecyclableArrayList.newInstance(); if (msg instanceof ByteBuf) {
try { RecyclableArrayList out = RecyclableArrayList.newInstance();
if (msg instanceof ByteBuf) { try {
ByteBuf data = (ByteBuf) msg; ByteBuf data = (ByteBuf) msg;
if (cumulation == null) { first = cumulation == null;
if (first) {
cumulation = data; cumulation = data;
try {
callDecode(ctx, cumulation, out);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
cumulation.release();
cumulation = null;
}
}
} else { } else {
try { if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { expandCumulation(ctx, data.readableBytes());
ByteBuf oldCumulation = cumulation;
cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + data.readableBytes());
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
}
cumulation.writeBytes(data);
callDecode(ctx, cumulation, out);
} finally {
if (cumulation != null) {
if (!cumulation.isReadable()) {
cumulation.release();
cumulation = null;
} else {
cumulation.discardSomeReadBytes();
}
}
data.release();
} }
cumulation.writeBytes(data);
data.release();
} }
} else { callDecode(ctx, cumulation, out);
out.add(msg); } catch (DecoderException e) {
} throw e;
} catch (DecoderException e) { } catch (Throwable t) {
throw e; throw new DecoderException(t);
} catch (Throwable t) { } finally {
throw new DecoderException(t); if (cumulation != null && !cumulation.isReadable()) {
} finally { cumulation.release();
int size = out.size(); cumulation = null;
if (size == 0) { }
decodeWasNull = true; int size = out.size();
} else { decodeWasNull = size == 0;
for (int i = 0; i < size; i ++) { for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i)); ctx.fireChannelRead(out.get(i));
} }
out.recycle();
} }
out.recycle(); } else {
ctx.fireChannelRead(msg);
} }
} }
private void expandCumulation(ChannelHandlerContext ctx, int readable) {
ByteBuf oldCumulation = cumulation;
cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
}
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (cumulation != null && !first) {
// discard some bytes if possible to make more room in the
// buffer
cumulation.discardSomeReadBytes();
}
if (decodeWasNull) { if (decodeWasNull) {
decodeWasNull = false; decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) { if (!ctx.channel().config().isAutoRead()) {