[#1812] Rework ByteToMessageDecoder.channelRead(..) method to allow for inlining
This commit is contained in:
parent
544d68b396
commit
8930cefab8
@ -49,6 +49,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
ByteBuf cumulation;
|
||||
private boolean singleDecode;
|
||||
private boolean decodeWasNull;
|
||||
private boolean first;
|
||||
|
||||
protected ByteToMessageDecoder() {
|
||||
if (getClass().isAnnotationPresent(Sharable.class)) {
|
||||
@ -121,64 +122,57 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
RecyclableArrayList out = RecyclableArrayList.newInstance();
|
||||
try {
|
||||
if (msg instanceof ByteBuf) {
|
||||
if (msg instanceof ByteBuf) {
|
||||
RecyclableArrayList out = RecyclableArrayList.newInstance();
|
||||
try {
|
||||
ByteBuf data = (ByteBuf) msg;
|
||||
if (cumulation == null) {
|
||||
first = cumulation == null;
|
||||
if (first) {
|
||||
cumulation = data;
|
||||
try {
|
||||
callDecode(ctx, cumulation, out);
|
||||
} finally {
|
||||
if (cumulation != null && !cumulation.isReadable()) {
|
||||
cumulation.release();
|
||||
cumulation = null;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
|
||||
ByteBuf oldCumulation = cumulation;
|
||||
cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + data.readableBytes());
|
||||
cumulation.writeBytes(oldCumulation);
|
||||
oldCumulation.release();
|
||||
}
|
||||
cumulation.writeBytes(data);
|
||||
callDecode(ctx, cumulation, out);
|
||||
} finally {
|
||||
if (cumulation != null) {
|
||||
if (!cumulation.isReadable()) {
|
||||
cumulation.release();
|
||||
cumulation = null;
|
||||
} else {
|
||||
cumulation.discardSomeReadBytes();
|
||||
}
|
||||
}
|
||||
data.release();
|
||||
if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
|
||||
expandCumulation(ctx, data.readableBytes());
|
||||
}
|
||||
cumulation.writeBytes(data);
|
||||
data.release();
|
||||
}
|
||||
} else {
|
||||
out.add(msg);
|
||||
}
|
||||
} catch (DecoderException e) {
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
throw new DecoderException(t);
|
||||
} finally {
|
||||
int size = out.size();
|
||||
if (size == 0) {
|
||||
decodeWasNull = true;
|
||||
} else {
|
||||
callDecode(ctx, cumulation, out);
|
||||
} catch (DecoderException e) {
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
throw new DecoderException(t);
|
||||
} finally {
|
||||
if (cumulation != null && !cumulation.isReadable()) {
|
||||
cumulation.release();
|
||||
cumulation = null;
|
||||
}
|
||||
int size = out.size();
|
||||
decodeWasNull = size == 0;
|
||||
|
||||
for (int i = 0; i < size; i ++) {
|
||||
ctx.fireChannelRead(out.get(i));
|
||||
}
|
||||
out.recycle();
|
||||
}
|
||||
out.recycle();
|
||||
} else {
|
||||
ctx.fireChannelRead(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private void expandCumulation(ChannelHandlerContext ctx, int readable) {
|
||||
ByteBuf oldCumulation = cumulation;
|
||||
cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable);
|
||||
cumulation.writeBytes(oldCumulation);
|
||||
oldCumulation.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
if (cumulation != null && !first) {
|
||||
// discard some bytes if possible to make more room in the
|
||||
// buffer
|
||||
cumulation.discardSomeReadBytes();
|
||||
}
|
||||
if (decodeWasNull) {
|
||||
decodeWasNull = false;
|
||||
if (!ctx.channel().config().isAutoRead()) {
|
||||
|
Loading…
Reference in New Issue
Block a user