Other ByteToMessageDecoder streamlining (#9891)

Motivation

This PR is a reduced-scope replacement for #8931. It doesn't include the
changes related to how/when discarding read bytes is done, which we plan
to address in subsequent updates.

Modifications

- Avoid copying bytes in COMPOSITE_CUMULATOR in all cases, performing a
shallow copy where necessary; also guard against (unusual) case where
input buffer is composite with writer index != capacity
- Ensure we don't pass a non-contiguous buffer when MERGE_CUMULATOR is
used
- Manually inline some calls to ByteBuf#writeBytes(...) to eliminate
redundant checks and reduce stack depth

Also includes prior minor review comments from @trustin

Result

More correct handling of merge/composite cases and
more efficient handling of composite case.
This commit is contained in:
Nick Hill 2019-12-22 23:54:31 -08:00 committed by Norman Maurer
parent 7f241f1f3c
commit 1d22fbd78c
2 changed files with 47 additions and 32 deletions

View File

@ -80,6 +80,11 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
public static final Cumulator MERGE_CUMULATOR = new Cumulator() { public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override @Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (!cumulation.isReadable() && in.isContiguous()) {
// If cumulation is empty and input buffer is contiguous, use it directly
cumulation.release();
return in;
}
try { try {
final int required = in.readableBytes(); final int required = in.readableBytes();
if (required > cumulation.maxWritableBytes() || if (required > cumulation.maxWritableBytes() ||
@ -91,7 +96,9 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe. // assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
return expandCumulation(alloc, cumulation, in); return expandCumulation(alloc, cumulation, in);
} }
return cumulation.writeBytes(in); cumulation.writeBytes(in, in.readerIndex(), required);
in.readerIndex(in.writerIndex());
return cumulation;
} finally { } finally {
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError) // for whatever release (for example because of OutOfMemoryError)
@ -108,31 +115,33 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() { public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
@Override @Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (!cumulation.isReadable()) {
cumulation.release();
return in;
}
CompositeByteBuf composite = null;
try { try {
if (cumulation.refCnt() > 1) { if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the
// user use slice().retain() or duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
return expandCumulation(alloc, cumulation, in);
}
final CompositeByteBuf composite;
if (cumulation instanceof CompositeByteBuf) {
composite = (CompositeByteBuf) cumulation; composite = (CompositeByteBuf) cumulation;
// Writer index must equal capacity if we are going to "write"
// new components to the end
if (composite.writerIndex() != composite.capacity()) {
composite.capacity(composite.writerIndex());
}
} else { } else {
composite = alloc.compositeBuffer(MAX_VALUE); composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
composite.addComponent(true, cumulation);
} }
composite.addComponent(true, in); composite.addFlattenedComponents(true, in);
in = null; in = null;
return composite; return composite;
} finally { } finally {
if (in != null) { if (in != null) {
// We must release if the ownership was not transferred as otherwise it may produce a leak if // We must release if the ownership was not transferred as otherwise it may produce a leak
// writeBytes(...) throw for whatever release (for example because of OutOfMemoryError).
in.release(); in.release();
// Also release any new buffer allocated if we're not returning it
if (composite != null && composite != cumulation) {
composite.release();
}
} }
} }
} }
@ -261,13 +270,9 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
if (msg instanceof ByteBuf) { if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance(); CodecOutputList out = CodecOutputList.newInstance();
try { try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null; first = cumulation == null;
if (first) { cumulation = cumulator.cumulate(ctx.alloc(),
cumulation = data; first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out); callDecode(ctx, cumulation, out);
} catch (DecoderException e) { } catch (DecoderException e) {
throw e; throw e;
@ -517,13 +522,18 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
} }
} }
private static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) { static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity( int oldBytes = oldCumulation.readableBytes();
oldCumulation.readableBytes() + in.readableBytes(), MAX_VALUE)); int newBytes = in.readableBytes();
int totalBytes = oldBytes + newBytes;
ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE));
ByteBuf toRelease = newCumulation; ByteBuf toRelease = newCumulation;
try { try {
newCumulation.writeBytes(oldCumulation); // This avoids redundant checks and stack depth compared to calling writeBytes(...)
newCumulation.writeBytes(in); newCumulation.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes)
.setBytes(oldBytes, in, in.readerIndex(), newBytes)
.writerIndex(totalBytes);
in.readerIndex(in.writerIndex());
toRelease = oldCumulation; toRelease = oldCumulation;
return newCumulation; return newCumulation;
} finally { } finally {

View File

@ -325,11 +325,11 @@ public class ByteToMessageDecoderTest {
} }
@Override @Override
public ByteBuf writeBytes(ByteBuf src) { public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
if (--untilFailure <= 0) { if (--untilFailure <= 0) {
throw error; throw error;
} }
return super.writeBytes(src); return super.setBytes(index, src, srcIndex, length);
} }
Error writeError() { Error writeError() {
@ -340,6 +340,7 @@ public class ByteToMessageDecoderTest {
@Test @Test
public void releaseWhenMergeCumulateThrows() { public void releaseWhenMergeCumulateThrows() {
WriteFailingByteBuf oldCumulation = new WriteFailingByteBuf(1, 64); WriteFailingByteBuf oldCumulation = new WriteFailingByteBuf(1, 64);
oldCumulation.writeZero(1);
ByteBuf in = Unpooled.buffer().writeZero(12); ByteBuf in = Unpooled.buffer().writeZero(12);
Throwable thrown = null; Throwable thrown = null;
@ -363,7 +364,7 @@ public class ByteToMessageDecoderTest {
} }
private void releaseWhenMergeCumulateThrowsInExpand(int untilFailure, boolean shouldFail) { private void releaseWhenMergeCumulateThrowsInExpand(int untilFailure, boolean shouldFail) {
ByteBuf oldCumulation = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8); ByteBuf oldCumulation = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8).writeZero(1);
final WriteFailingByteBuf newCumulation = new WriteFailingByteBuf(untilFailure, 16); final WriteFailingByteBuf newCumulation = new WriteFailingByteBuf(untilFailure, 16);
ByteBufAllocator allocator = new AbstractByteBufAllocator(false) { ByteBufAllocator allocator = new AbstractByteBufAllocator(false) {
@ -415,7 +416,11 @@ public class ByteToMessageDecoderTest {
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) { public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
throw error; throw error;
} }
}; @Override
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, ByteBuf buffer) {
throw error;
}
}.writeZero(1);
ByteBuf in = Unpooled.buffer().writeZero(12); ByteBuf in = Unpooled.buffer().writeZero(12);
try { try {
ByteToMessageDecoder.COMPOSITE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, cumulation, in); ByteToMessageDecoder.COMPOSITE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, cumulation, in);