Other ByteToMessageDecoder streamlining (#9891)

Motivation

This PR is a reduced-scope replacement for #8931. It doesn't include the
changes related to how/when discarding read bytes is done, which we plan
to address in subsequent updates.

Modifications

- Avoid copying bytes in COMPOSITE_CUMULATOR in all cases, performing a
shallow copy where necessary; also guard against (unusual) case where
input buffer is composite with writer index != capacity
- Ensure we don't pass a non-contiguous buffer when MERGE_CUMULATOR is
used
- Manually inline some calls to ByteBuf#writeBytes(...) to eliminate
redundant checks and reduce stack depth

Also includes prior minor review comments from @trustin

Result

More correct handling of merge/composite cases and
more efficient handling of composite case.
This commit is contained in:
Nick Hill 2019-12-22 23:54:31 -08:00 committed by Norman Maurer
parent f60c13a57f
commit 019cdca40b
2 changed files with 35 additions and 21 deletions

View File

@ -90,6 +90,11 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
* Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
*/
public static final Cumulator MERGE_CUMULATOR = (alloc, cumulation, in) -> {
if (!cumulation.isReadable() && in.isContiguous()) {
// If cumulation is empty and input buffer is contiguous, use it directly
cumulation.release();
return in;
}
try {
final int required = in.readableBytes();
if (required > cumulation.maxWritableBytes() ||
@ -101,7 +106,9 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
return expandCumulation(alloc, cumulation, in);
}
return cumulation.writeBytes(in);
cumulation.writeBytes(in, in.readerIndex(), required);
in.readerIndex(in.writerIndex());
return cumulation;
} finally {
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
@ -116,31 +123,33 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
*/
public static final Cumulator COMPOSITE_CUMULATOR = (alloc, cumulation, in) -> {
if (!cumulation.isReadable()) {
cumulation.release();
return in;
}
CompositeByteBuf composite = null;
try {
if (cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the
// user use slice().retain() or duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
return expandCumulation(alloc, cumulation, in);
}
final CompositeByteBuf composite;
if (cumulation instanceof CompositeByteBuf) {
if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
composite = (CompositeByteBuf) cumulation;
// Writer index must equal capacity if we are going to "write"
// new components to the end
if (composite.writerIndex() != composite.capacity()) {
composite.capacity(composite.writerIndex());
}
} else {
composite = alloc.compositeBuffer(MAX_VALUE);
composite.addComponent(true, cumulation);
composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
}
composite.addComponent(true, in);
composite.addFlattenedComponents(true, in);
in = null;
return composite;
} finally {
if (in != null) {
// We must release if the ownership was not transferred as otherwise it may produce a leak if
// writeBytes(...) throw for whatever release (for example because of OutOfMemoryError).
// We must release if the ownership was not transferred as otherwise it may produce a leak
in.release();
// Also release any new buffer allocated if we're not returning it
if (composite != null && composite != cumulation) {
composite.release();
}
}
}
};

View File

@ -324,11 +324,11 @@ public class ByteToMessageDecoderTest {
}
@Override
public ByteBuf writeBytes(ByteBuf src) {
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
if (--untilFailure <= 0) {
throw error;
}
return super.writeBytes(src);
return super.setBytes(index, src, srcIndex, length);
}
Error writeError() {
@ -339,6 +339,7 @@ public class ByteToMessageDecoderTest {
@Test
public void releaseWhenMergeCumulateThrows() {
WriteFailingByteBuf oldCumulation = new WriteFailingByteBuf(1, 64);
oldCumulation.writeZero(1);
ByteBuf in = Unpooled.buffer().writeZero(12);
Throwable thrown = null;
@ -362,7 +363,7 @@ public class ByteToMessageDecoderTest {
}
private void releaseWhenMergeCumulateThrowsInExpand(int untilFailure, boolean shouldFail) {
ByteBuf oldCumulation = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8);
ByteBuf oldCumulation = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8).writeZero(1);
final WriteFailingByteBuf newCumulation = new WriteFailingByteBuf(untilFailure, 16);
ByteBufAllocator allocator = new AbstractByteBufAllocator(false) {
@ -414,7 +415,11 @@ public class ByteToMessageDecoderTest {
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
throw error;
}
};
@Override
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, ByteBuf buffer) {
throw error;
}
}.writeZero(1);
ByteBuf in = Unpooled.buffer().writeZero(12);
try {
ByteToMessageDecoder.COMPOSITE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, cumulation, in);