CompositeByteBuf optimizations and new addFlattenedComponents method (#8939)
Motivation: The CompositeByteBuf discardReadBytes / discardReadComponents methods are currently quite inefficient, including when there are no read components to discard. We would like to call the latter more frequently in ByteToMessageDecoder#COMPOSITE_CUMULATOR. In the same context it would be beneficial to perform a "shallow copy" of a composite buffer (for example when it has a refcount > 1) to avoid having to allocate and copy the contained bytes just to obtain an "independent" cumulation. Modifications: - Optimize discardReadBytes() and discardReadComponents() implementations (start at first comp rather than performing a binary search for the readerIndex). - New addFlattenedComponents(boolean,ByteBuf) method which performs a shallow copy if the provided buffer is also composite and avoids adding any empty buffers, plus unit test. - Other minor optimizations to avoid unnecessary checks. Results: discardReadXX methods are faster, composite buffers can be easily appended without deepening the buffer "tree" or retaining unused components.
This commit is contained in:
parent
4c56e4bad6
commit
1f93bd36b6
@ -939,6 +939,12 @@ final class AdvancedLeakAwareCompositeByteBuf extends SimpleLeakAwareCompositeBy
|
|||||||
return super.addComponent(increaseWriterIndex, cIndex, buffer);
|
return super.addComponent(increaseWriterIndex, cIndex, buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, ByteBuf buffer) {
|
||||||
|
recordLeakNonRefCountingOperation(leak);
|
||||||
|
return super.addFlattenedComponents(increaseWriterIndex, buffer);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompositeByteBuf removeComponent(int cIndex) {
|
public CompositeByteBuf removeComponent(int cIndex) {
|
||||||
recordLeakNonRefCountingOperation(leak);
|
recordLeakNonRefCountingOperation(leak);
|
||||||
|
@ -287,7 +287,7 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
|
|||||||
c.reposition(components[cIndex - 1].endOffset);
|
c.reposition(components[cIndex - 1].endOffset);
|
||||||
}
|
}
|
||||||
if (increaseWriterIndex) {
|
if (increaseWriterIndex) {
|
||||||
writerIndex(writerIndex() + readableBytes);
|
writerIndex += readableBytes;
|
||||||
}
|
}
|
||||||
return cIndex;
|
return cIndex;
|
||||||
} finally {
|
} finally {
|
||||||
@ -370,7 +370,7 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
|
|||||||
updateComponentOffsets(ci); // only need to do this here for components after the added ones
|
updateComponentOffsets(ci); // only need to do this here for components after the added ones
|
||||||
}
|
}
|
||||||
if (increaseWriterIndex && ci > cIndex && ci <= componentCount) {
|
if (increaseWriterIndex && ci > cIndex && ci <= componentCount) {
|
||||||
writerIndex(writerIndex() + components[ci - 1].endOffset - components[cIndex].offset);
|
writerIndex += components[ci - 1].endOffset - components[cIndex].offset;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -413,6 +413,74 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
|
|||||||
return addComponents(false, cIndex, buffers);
|
return addComponents(false, cIndex, buffers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the given {@link ByteBuf} and increase the {@code writerIndex} if {@code increaseWriterIndex} is
|
||||||
|
* {@code true}. If the provided buffer is a {@link CompositeByteBuf} itself, a "shallow copy" of its
|
||||||
|
* readable components will be performed. Thus the actual number of new components added may vary
|
||||||
|
* and in particular will be zero if the provided buffer is not readable.
|
||||||
|
* <p>
|
||||||
|
* {@link ByteBuf#release()} ownership of {@code buffer} is transferred to this {@link CompositeByteBuf}.
|
||||||
|
* @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transferred to this
|
||||||
|
* {@link CompositeByteBuf}.
|
||||||
|
*/
|
||||||
|
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, ByteBuf buffer) {
|
||||||
|
requireNonNull(buffer, "buffer");
|
||||||
|
final int ridx = buffer.readerIndex();
|
||||||
|
final int widx = buffer.writerIndex();
|
||||||
|
if (ridx == widx) {
|
||||||
|
buffer.release();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
if (!(buffer instanceof CompositeByteBuf)) {
|
||||||
|
addComponent0(increaseWriterIndex, componentCount, buffer);
|
||||||
|
consolidateIfNeeded();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
final CompositeByteBuf from = (CompositeByteBuf) buffer;
|
||||||
|
from.checkIndex(ridx, widx - ridx);
|
||||||
|
final Component[] fromComponents = from.components;
|
||||||
|
final int compCountBefore = componentCount;
|
||||||
|
final int writerIndexBefore = writerIndex;
|
||||||
|
try {
|
||||||
|
for (int cidx = from.toComponentIndex0(ridx), newOffset = capacity();; cidx++) {
|
||||||
|
final Component component = fromComponents[cidx];
|
||||||
|
final int compOffset = component.offset;
|
||||||
|
final int fromIdx = Math.max(ridx, compOffset);
|
||||||
|
final int toIdx = Math.min(widx, component.endOffset);
|
||||||
|
final int len = toIdx - fromIdx;
|
||||||
|
if (len > 0) { // skip empty components
|
||||||
|
// Note that it's safe to just retain the unwrapped buf here, even in the case
|
||||||
|
// of PooledSlicedByteBufs - those slices will still be properly released by the
|
||||||
|
// source Component's free() method.
|
||||||
|
addComp(componentCount, new Component(
|
||||||
|
component.buf.retain(), component.idx(fromIdx), newOffset, len, null));
|
||||||
|
}
|
||||||
|
if (widx == toIdx) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
newOffset += len;
|
||||||
|
}
|
||||||
|
if (increaseWriterIndex) {
|
||||||
|
writerIndex = writerIndexBefore + (widx - ridx);
|
||||||
|
}
|
||||||
|
consolidateIfNeeded();
|
||||||
|
buffer.release();
|
||||||
|
buffer = null;
|
||||||
|
return this;
|
||||||
|
} finally {
|
||||||
|
if (buffer != null) {
|
||||||
|
// if we did not succeed, attempt to rollback any components that were added
|
||||||
|
if (increaseWriterIndex) {
|
||||||
|
writerIndex = writerIndexBefore;
|
||||||
|
}
|
||||||
|
for (int cidx = componentCount - 1; cidx >= compCountBefore; cidx--) {
|
||||||
|
components[cidx].free();
|
||||||
|
removeComp(cidx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO optimize further, similar to ByteBuf[] version
|
// TODO optimize further, similar to ByteBuf[] version
|
||||||
// (difference here is that we don't know *always* know precise size increase in advance,
|
// (difference here is that we don't know *always* know precise size increase in advance,
|
||||||
// but we do in the most common case that the Iterable is a Collection)
|
// but we do in the most common case that the Iterable is a Collection)
|
||||||
@ -766,9 +834,9 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
|
|||||||
removeCompRange(i + 1, size);
|
removeCompRange(i + 1, size);
|
||||||
|
|
||||||
if (readerIndex() > newCapacity) {
|
if (readerIndex() > newCapacity) {
|
||||||
setIndex(newCapacity, newCapacity);
|
setIndex0(newCapacity, newCapacity);
|
||||||
} else if (writerIndex() > newCapacity) {
|
} else if (writerIndex > newCapacity) {
|
||||||
writerIndex(newCapacity);
|
writerIndex = newCapacity;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
@ -815,6 +883,9 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (size <= 2) { // fast-path for 1 and 2 component count
|
||||||
|
return size == 1 || offset < components[0].endOffset ? 0 : 1;
|
||||||
|
}
|
||||||
for (int low = 0, high = size; low <= high;) {
|
for (int low = 0, high = size; low <= high;) {
|
||||||
int mid = low + high >>> 1;
|
int mid = low + high >>> 1;
|
||||||
Component c = components[mid];
|
Component c = components[mid];
|
||||||
@ -1678,16 +1749,26 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remove read components.
|
// Remove read components.
|
||||||
int firstComponentId = toComponentIndex0(readerIndex);
|
int firstComponentId = 0;
|
||||||
for (int i = 0; i < firstComponentId; i ++) {
|
Component c = null;
|
||||||
components[i].free();
|
for (int size = componentCount; firstComponentId < size; firstComponentId++) {
|
||||||
|
c = components[firstComponentId];
|
||||||
|
if (c.endOffset > readerIndex) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
c.free();
|
||||||
|
}
|
||||||
|
if (firstComponentId == 0) {
|
||||||
|
return this; // Nothing to discard
|
||||||
|
}
|
||||||
|
Component la = lastAccessed;
|
||||||
|
if (la != null && la.endOffset < readerIndex) {
|
||||||
|
lastAccessed = null;
|
||||||
}
|
}
|
||||||
lastAccessed = null;
|
|
||||||
removeCompRange(0, firstComponentId);
|
removeCompRange(0, firstComponentId);
|
||||||
|
|
||||||
// Update indexes and markers.
|
// Update indexes and markers.
|
||||||
Component first = components[0];
|
int offset = c.offset;
|
||||||
int offset = first.offset;
|
|
||||||
updateComponentOffsets(0);
|
updateComponentOffsets(0);
|
||||||
setIndex(readerIndex - offset, writerIndex - offset);
|
setIndex(readerIndex - offset, writerIndex - offset);
|
||||||
return this;
|
return this;
|
||||||
@ -1713,36 +1794,30 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove read components.
|
int firstComponentId = 0;
|
||||||
int firstComponentId = toComponentIndex0(readerIndex);
|
Component c = null;
|
||||||
for (int i = 0; i < firstComponentId; i ++) {
|
for (int size = componentCount; firstComponentId < size; firstComponentId++) {
|
||||||
Component c = components[i];
|
c = components[firstComponentId];
|
||||||
c.free();
|
if (c.endOffset > readerIndex) {
|
||||||
if (lastAccessed == c) {
|
break;
|
||||||
lastAccessed = null;
|
|
||||||
}
|
}
|
||||||
|
c.free();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove or replace the first readable component with a new slice.
|
// Replace the first readable component with a new slice.
|
||||||
Component c = components[firstComponentId];
|
int trimmedBytes = readerIndex - c.offset;
|
||||||
if (readerIndex == c.endOffset) {
|
c.offset = 0;
|
||||||
// new slice would be empty, so remove instead
|
c.endOffset -= readerIndex;
|
||||||
c.free();
|
c.adjustment += readerIndex;
|
||||||
if (lastAccessed == c) {
|
ByteBuf slice = c.slice;
|
||||||
lastAccessed = null;
|
if (slice != null) {
|
||||||
}
|
// We must replace the cached slice with a derived one to ensure that
|
||||||
firstComponentId++;
|
// it can later be released properly in the case of PooledSlicedByteBuf.
|
||||||
} else {
|
c.slice = slice.slice(trimmedBytes, c.length());
|
||||||
int trimmedBytes = readerIndex - c.offset;
|
}
|
||||||
c.offset = 0;
|
Component la = lastAccessed;
|
||||||
c.endOffset -= readerIndex;
|
if (la != null && la.endOffset < readerIndex) {
|
||||||
c.adjustment += readerIndex;
|
lastAccessed = null;
|
||||||
ByteBuf slice = c.slice;
|
|
||||||
if (slice != null) {
|
|
||||||
// We must replace the cached slice with a derived one to ensure that
|
|
||||||
// it can later be released properly in the case of PooledSlicedByteBuf.
|
|
||||||
c.slice = slice.slice(trimmedBytes, c.length());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
removeCompRange(0, firstComponentId);
|
removeCompRange(0, firstComponentId);
|
||||||
|
@ -548,6 +548,12 @@ class WrappedCompositeByteBuf extends CompositeByteBuf {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, ByteBuf buffer) {
|
||||||
|
wrapped.addFlattenedComponents(increaseWriterIndex, buffer);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompositeByteBuf removeComponent(int cIndex) {
|
public CompositeByteBuf removeComponent(int cIndex) {
|
||||||
wrapped.removeComponent(cIndex);
|
wrapped.removeComponent(cIndex);
|
||||||
|
@ -1086,6 +1086,71 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
|
|||||||
cbuf.release();
|
cbuf.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddFlattenedComponents() {
|
||||||
|
ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 });
|
||||||
|
CompositeByteBuf newComposite = Unpooled.compositeBuffer()
|
||||||
|
.addComponent(true, b1)
|
||||||
|
.addFlattenedComponents(true, b1.retain())
|
||||||
|
.addFlattenedComponents(true, Unpooled.EMPTY_BUFFER);
|
||||||
|
|
||||||
|
assertEquals(2, newComposite.numComponents());
|
||||||
|
assertEquals(6, newComposite.capacity());
|
||||||
|
assertEquals(6, newComposite.writerIndex());
|
||||||
|
|
||||||
|
// It is important to use a pooled allocator here to ensure
|
||||||
|
// the slices returned by readRetainedSlice are of type
|
||||||
|
// PooledSlicedByteBuf, which maintains an independent refcount
|
||||||
|
// (so that we can be sure to cover this case)
|
||||||
|
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer()
|
||||||
|
.writeBytes(new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
|
||||||
|
|
||||||
|
// use mixture of slice and retained slice
|
||||||
|
ByteBuf s1 = buffer.readRetainedSlice(2);
|
||||||
|
ByteBuf s2 = s1.retainedSlice(0, 2);
|
||||||
|
ByteBuf s3 = buffer.slice(0, 2).retain();
|
||||||
|
ByteBuf s4 = s2.retainedSlice(0, 2);
|
||||||
|
buffer.release();
|
||||||
|
|
||||||
|
ByteBuf compositeToAdd = Unpooled.compositeBuffer()
|
||||||
|
.addComponent(s1)
|
||||||
|
.addComponent(Unpooled.EMPTY_BUFFER)
|
||||||
|
.addComponents(s2, s3, s4);
|
||||||
|
// set readable range to be from middle of first component
|
||||||
|
// to middle of penultimate component
|
||||||
|
compositeToAdd.setIndex(1, 5);
|
||||||
|
|
||||||
|
assertEquals(1, compositeToAdd.refCnt());
|
||||||
|
assertEquals(1, s4.refCnt());
|
||||||
|
|
||||||
|
ByteBuf compositeCopy = compositeToAdd.copy();
|
||||||
|
|
||||||
|
newComposite.addFlattenedComponents(true, compositeToAdd);
|
||||||
|
|
||||||
|
// verify that added range matches
|
||||||
|
ByteBufUtil.equals(compositeCopy, 0,
|
||||||
|
newComposite, 6, compositeCopy.readableBytes());
|
||||||
|
|
||||||
|
// should not include empty component or last component
|
||||||
|
// (latter outside of the readable range)
|
||||||
|
assertEquals(5, newComposite.numComponents());
|
||||||
|
assertEquals(10, newComposite.capacity());
|
||||||
|
assertEquals(10, newComposite.writerIndex());
|
||||||
|
|
||||||
|
assertEquals(0, compositeToAdd.refCnt());
|
||||||
|
// s4 wasn't in added range so should have been jettisoned
|
||||||
|
assertEquals(0, s4.refCnt());
|
||||||
|
assertEquals(1, newComposite.refCnt());
|
||||||
|
|
||||||
|
// releasing composite should release the remaining components
|
||||||
|
newComposite.release();
|
||||||
|
assertEquals(0, newComposite.refCnt());
|
||||||
|
assertEquals(0, s1.refCnt());
|
||||||
|
assertEquals(0, s2.refCnt());
|
||||||
|
assertEquals(0, s3.refCnt());
|
||||||
|
assertEquals(0, b1.refCnt());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIterator() {
|
public void testIterator() {
|
||||||
CompositeByteBuf cbuf = compositeBuffer();
|
CompositeByteBuf cbuf = compositeBuffer();
|
||||||
|
Loading…
Reference in New Issue
Block a user