diff --git a/buffer/src/main/java/io/netty/buffer/AdvancedLeakAwareCompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/AdvancedLeakAwareCompositeByteBuf.java index ff93de665c..5b36354561 100644 --- a/buffer/src/main/java/io/netty/buffer/AdvancedLeakAwareCompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/AdvancedLeakAwareCompositeByteBuf.java @@ -939,6 +939,12 @@ final class AdvancedLeakAwareCompositeByteBuf extends SimpleLeakAwareCompositeBy return super.addComponent(increaseWriterIndex, cIndex, buffer); } + @Override + public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, ByteBuf buffer) { + recordLeakNonRefCountingOperation(leak); + return super.addFlattenedComponents(increaseWriterIndex, buffer); + } + @Override public CompositeByteBuf removeComponent(int cIndex) { recordLeakNonRefCountingOperation(leak); diff --git a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java index 40c4ad2da0..60dcd65cf7 100644 --- a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java @@ -287,7 +287,7 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements c.reposition(components[cIndex - 1].endOffset); } if (increaseWriterIndex) { - writerIndex(writerIndex() + readableBytes); + writerIndex += readableBytes; } return cIndex; } finally { @@ -370,7 +370,7 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements updateComponentOffsets(ci); // only need to do this here for components after the added ones } if (increaseWriterIndex && ci > cIndex && ci <= componentCount) { - writerIndex(writerIndex() + components[ci - 1].endOffset - components[cIndex].offset); + writerIndex += components[ci - 1].endOffset - components[cIndex].offset; } } } @@ -413,6 +413,74 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements return addComponents(false, cIndex, buffers); } + /** + * Add the given {@link ByteBuf} and increase the {@code writerIndex} if {@code increaseWriterIndex} is + * {@code true}. If the provided buffer is a {@link CompositeByteBuf} itself, a "shallow copy" of its + * readable components will be performed. Thus the actual number of new components added may vary + * and in particular will be zero if the provided buffer is not readable. + *

+ * {@link ByteBuf#release()} ownership of {@code buffer} is transferred to this {@link CompositeByteBuf}. + * @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transferred to this + * {@link CompositeByteBuf}. + */ + public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, ByteBuf buffer) { + requireNonNull(buffer, "buffer"); + final int ridx = buffer.readerIndex(); + final int widx = buffer.writerIndex(); + if (ridx == widx) { + buffer.release(); + return this; + } + if (!(buffer instanceof CompositeByteBuf)) { + addComponent0(increaseWriterIndex, componentCount, buffer); + consolidateIfNeeded(); + return this; + } + final CompositeByteBuf from = (CompositeByteBuf) buffer; + from.checkIndex(ridx, widx - ridx); + final Component[] fromComponents = from.components; + final int compCountBefore = componentCount; + final int writerIndexBefore = writerIndex; + try { + for (int cidx = from.toComponentIndex0(ridx), newOffset = capacity();; cidx++) { + final Component component = fromComponents[cidx]; + final int compOffset = component.offset; + final int fromIdx = Math.max(ridx, compOffset); + final int toIdx = Math.min(widx, component.endOffset); + final int len = toIdx - fromIdx; + if (len > 0) { // skip empty components + // Note that it's safe to just retain the unwrapped buf here, even in the case + // of PooledSlicedByteBufs - those slices will still be properly released by the + // source Component's free() method. + addComp(componentCount, new Component( + component.buf.retain(), component.idx(fromIdx), newOffset, len, null)); + } + if (widx == toIdx) { + break; + } + newOffset += len; + } + if (increaseWriterIndex) { + writerIndex = writerIndexBefore + (widx - ridx); + } + consolidateIfNeeded(); + buffer.release(); + buffer = null; + return this; + } finally { + if (buffer != null) { + // if we did not succeed, attempt to rollback any components that were added + if (increaseWriterIndex) { + writerIndex = writerIndexBefore; + } + for (int cidx = componentCount - 1; cidx >= compCountBefore; cidx--) { + components[cidx].free(); + removeComp(cidx); + } + } + } + } + // TODO optimize further, similar to ByteBuf[] version // (difference here is that we don't know *always* know precise size increase in advance, // but we do in the most common case that the Iterable is a Collection) @@ -766,9 +834,9 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements removeCompRange(i + 1, size); if (readerIndex() > newCapacity) { - setIndex(newCapacity, newCapacity); - } else if (writerIndex() > newCapacity) { - writerIndex(newCapacity); + setIndex0(newCapacity, newCapacity); + } else if (writerIndex > newCapacity) { + writerIndex = newCapacity; } } return this; @@ -815,6 +883,9 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements } } } + if (size <= 2) { // fast-path for 1 and 2 component count + return size == 1 || offset < components[0].endOffset ? 0 : 1; + } for (int low = 0, high = size; low <= high;) { int mid = low + high >>> 1; Component c = components[mid]; @@ -1678,16 +1749,26 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements } // Remove read components. - int firstComponentId = toComponentIndex0(readerIndex); - for (int i = 0; i < firstComponentId; i ++) { - components[i].free(); + int firstComponentId = 0; + Component c = null; + for (int size = componentCount; firstComponentId < size; firstComponentId++) { + c = components[firstComponentId]; + if (c.endOffset > readerIndex) { + break; + } + c.free(); + } + if (firstComponentId == 0) { + return this; // Nothing to discard + } + Component la = lastAccessed; + if (la != null && la.endOffset < readerIndex) { + lastAccessed = null; } - lastAccessed = null; removeCompRange(0, firstComponentId); // Update indexes and markers. - Component first = components[0]; - int offset = first.offset; + int offset = c.offset; updateComponentOffsets(0); setIndex(readerIndex - offset, writerIndex - offset); return this; @@ -1713,36 +1794,30 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements return this; } - // Remove read components. - int firstComponentId = toComponentIndex0(readerIndex); - for (int i = 0; i < firstComponentId; i ++) { - Component c = components[i]; - c.free(); - if (lastAccessed == c) { - lastAccessed = null; + int firstComponentId = 0; + Component c = null; + for (int size = componentCount; firstComponentId < size; firstComponentId++) { + c = components[firstComponentId]; + if (c.endOffset > readerIndex) { + break; } + c.free(); } - // Remove or replace the first readable component with a new slice. - Component c = components[firstComponentId]; - if (readerIndex == c.endOffset) { - // new slice would be empty, so remove instead - c.free(); - if (lastAccessed == c) { - lastAccessed = null; - } - firstComponentId++; - } else { - int trimmedBytes = readerIndex - c.offset; - c.offset = 0; - c.endOffset -= readerIndex; - c.adjustment += readerIndex; - ByteBuf slice = c.slice; - if (slice != null) { - // We must replace the cached slice with a derived one to ensure that - // it can later be released properly in the case of PooledSlicedByteBuf. - c.slice = slice.slice(trimmedBytes, c.length()); - } + // Replace the first readable component with a new slice. + int trimmedBytes = readerIndex - c.offset; + c.offset = 0; + c.endOffset -= readerIndex; + c.adjustment += readerIndex; + ByteBuf slice = c.slice; + if (slice != null) { + // We must replace the cached slice with a derived one to ensure that + // it can later be released properly in the case of PooledSlicedByteBuf. + c.slice = slice.slice(trimmedBytes, c.length()); + } + Component la = lastAccessed; + if (la != null && la.endOffset < readerIndex) { + lastAccessed = null; } removeCompRange(0, firstComponentId); diff --git a/buffer/src/main/java/io/netty/buffer/WrappedCompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/WrappedCompositeByteBuf.java index 9550030c48..23a523a2c5 100644 --- a/buffer/src/main/java/io/netty/buffer/WrappedCompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/WrappedCompositeByteBuf.java @@ -548,6 +548,12 @@ class WrappedCompositeByteBuf extends CompositeByteBuf { return this; } + @Override + public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, ByteBuf buffer) { + wrapped.addFlattenedComponents(increaseWriterIndex, buffer); + return this; + } + @Override public CompositeByteBuf removeComponent(int cIndex) { wrapped.removeComponent(cIndex); diff --git a/buffer/src/test/java/io/netty/buffer/AbstractCompositeByteBufTest.java b/buffer/src/test/java/io/netty/buffer/AbstractCompositeByteBufTest.java index ee2a66ee12..5564392ece 100644 --- a/buffer/src/test/java/io/netty/buffer/AbstractCompositeByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/AbstractCompositeByteBufTest.java @@ -1086,6 +1086,71 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest { cbuf.release(); } + @Test + public void testAddFlattenedComponents() { + ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 }); + CompositeByteBuf newComposite = Unpooled.compositeBuffer() + .addComponent(true, b1) + .addFlattenedComponents(true, b1.retain()) + .addFlattenedComponents(true, Unpooled.EMPTY_BUFFER); + + assertEquals(2, newComposite.numComponents()); + assertEquals(6, newComposite.capacity()); + assertEquals(6, newComposite.writerIndex()); + + // It is important to use a pooled allocator here to ensure + // the slices returned by readRetainedSlice are of type + // PooledSlicedByteBuf, which maintains an independent refcount + // (so that we can be sure to cover this case) + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer() + .writeBytes(new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); + + // use mixture of slice and retained slice + ByteBuf s1 = buffer.readRetainedSlice(2); + ByteBuf s2 = s1.retainedSlice(0, 2); + ByteBuf s3 = buffer.slice(0, 2).retain(); + ByteBuf s4 = s2.retainedSlice(0, 2); + buffer.release(); + + ByteBuf compositeToAdd = Unpooled.compositeBuffer() + .addComponent(s1) + .addComponent(Unpooled.EMPTY_BUFFER) + .addComponents(s2, s3, s4); + // set readable range to be from middle of first component + // to middle of penultimate component + compositeToAdd.setIndex(1, 5); + + assertEquals(1, compositeToAdd.refCnt()); + assertEquals(1, s4.refCnt()); + + ByteBuf compositeCopy = compositeToAdd.copy(); + + newComposite.addFlattenedComponents(true, compositeToAdd); + + // verify that added range matches + ByteBufUtil.equals(compositeCopy, 0, + newComposite, 6, compositeCopy.readableBytes()); + + // should not include empty component or last component + // (latter outside of the readable range) + assertEquals(5, newComposite.numComponents()); + assertEquals(10, newComposite.capacity()); + assertEquals(10, newComposite.writerIndex()); + + assertEquals(0, compositeToAdd.refCnt()); + // s4 wasn't in added range so should have been jettisoned + assertEquals(0, s4.refCnt()); + assertEquals(1, newComposite.refCnt()); + + // releasing composite should release the remaining components + newComposite.release(); + assertEquals(0, newComposite.refCnt()); + assertEquals(0, s1.refCnt()); + assertEquals(0, s2.refCnt()); + assertEquals(0, s3.refCnt()); + assertEquals(0, b1.refCnt()); + } + @Test public void testIterator() { CompositeByteBuf cbuf = compositeBuffer();