diff --git a/src/main/java/io/netty/buffer/api/CompositeBuf.java b/src/main/java/io/netty/buffer/api/CompositeBuf.java index b4011b8..46b95da 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuf.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuf.java @@ -21,7 +21,11 @@ import io.netty.buffer.api.ComponentProcessor.WritableComponentProcessor; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.Objects; +import java.util.Set; +import java.util.stream.Stream; final class CompositeBuf extends RcSupport implements Buf { /** @@ -62,8 +66,25 @@ final class CompositeBuf extends RcSupport implements Buf { // This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable, // will never overflow their component counts. // Allocating a new array unconditionally also prevents external modification of the array. - // TODO if any buffer is itself a composite buffer, then we should unwrap its sub-buffers - return Arrays.stream(bufs).filter(b -> b.capacity() > 0).toArray(Buf[]::new); + bufs = Arrays.stream(bufs) + .filter(b -> b.capacity() > 0) + .flatMap(CompositeBuf::flattenBuffer) + .toArray(Buf[]::new); + // Make sure there are no duplicates among the buffers. + Set duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>()); + duplicatesCheck.addAll(Arrays.asList(bufs)); + if (duplicatesCheck.size() < bufs.length) { + throw new IllegalArgumentException( + "Cannot create composite buffer with duplicate constituent buffer components."); + } + return bufs; + } + + private static Stream flattenBuffer(Buf buf) { + if (buf instanceof CompositeBuf) { + return Stream.of(((CompositeBuf) buf).bufs); + } + return Stream.of(buf); } private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop drop) { @@ -616,9 +637,6 @@ final class CompositeBuf extends RcSupport implements Buf { if (!isOwned()) { throw new IllegalStateException("This buffer cannot be extended because it is not in an owned state."); } - if (extension == this) { - throw new IllegalArgumentException("This buffer cannot be extended with itself."); - } if (bufs.length > 0 && extension.order() != order()) { throw new IllegalArgumentException( "This buffer uses " + order() + " byte order, and cannot be extended with " + @@ -639,14 +657,38 @@ final class CompositeBuf extends RcSupport implements Buf { // overflow in their component counters. return; } - // TODO if extension is itself a composite buffer, then we should extend ourselves by all of the sub-buffers long newSize = capacity() + extensionCapacity; Allocator.checkSize(newSize); Buf[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail. try { - unsafeExtendWith(extension.acquire()); + if (extension instanceof CompositeBuf) { + // If the extension is itself a composite buffer, then extend this one by all of the constituent + // component buffers. + CompositeBuf compositeExtension = (CompositeBuf) extension; + Buf[] addedBuffers = compositeExtension.bufs; + Set duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>()); + duplicatesCheck.addAll(Arrays.asList(bufs)); + duplicatesCheck.addAll(Arrays.asList(addedBuffers)); + if (duplicatesCheck.size() < bufs.length + addedBuffers.length) { + throw extensionDuplicatesException(); + } + for (Buf addedBuffer : addedBuffers) { + addedBuffer.acquire(); + } + int extendAtIndex = bufs.length; + bufs = Arrays.copyOf(bufs, extendAtIndex + addedBuffers.length); + System.arraycopy(addedBuffers, 0, bufs, extendAtIndex, addedBuffers.length); + computeBufferOffsets(); + } else { + for (Buf buf : restoreTemp) { + if (buf == extension) { + throw extensionDuplicatesException(); + } + } + unsafeExtendWith(extension.acquire()); + } if (restoreTemp.length == 0) { order = extension.order(); readOnly = extension.readOnly(); @@ -657,6 +699,12 @@ final class CompositeBuf extends RcSupport implements Buf { } } + private static IllegalArgumentException extensionDuplicatesException() { + return new IllegalArgumentException( + "The composite buffer cannot be extended with the given extension," + + " as it would cause the buffer to have duplicate constituent buffers."); + } + private void unsafeExtendWith(Buf extension) { bufs = Arrays.copyOf(bufs, bufs.length + 1); bufs[bufs.length - 1] = extension; diff --git a/src/test/java/io/netty/buffer/api/BufTest.java b/src/test/java/io/netty/buffer/api/BufTest.java index e5c4bf5..db8fd28 100644 --- a/src/test/java/io/netty/buffer/api/BufTest.java +++ b/src/test/java/io/netty/buffer/api/BufTest.java @@ -879,7 +879,7 @@ public class BufTest { } @ParameterizedTest - @MethodSource("allocators") + @MethodSource("nonCompositeAllocators") public void acquireComposingAndSlicingMustIncrementBorrows(Fixture fixture) { try (Allocator allocator = fixture.createAllocator(); Buf buf = allocator.allocate(8)) { @@ -1628,6 +1628,53 @@ public class BufTest { } } + @Test + public void compositeBuffersCannotHaveDuplicateComponents() { + try (Allocator allocator = Allocator.heap(); + Buf a = allocator.allocate(4)) { + var e = assertThrows(IllegalArgumentException.class, () -> allocator.compose(a, a)); + assertThat(e).hasMessageContaining("duplicate"); + + try (Buf composite = allocator.compose(a)) { + a.close(); + try { + e = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, a)); + assertThat(e).hasMessageContaining("duplicate"); + } finally { + a.acquire(); + } + } + } + } + + @Test + public void compositeBufferMustNotBeAllowedToContainThemselves() { + try (Allocator allocator = Allocator.heap()) { + Buf a = allocator.allocate(4); + Buf buf = allocator.compose(a); + try (buf; a) { + a.close(); + try { + assertThrows(IllegalArgumentException.class, () -> Allocator.extend(buf, buf)); + assertTrue(buf.isOwned()); + try (Buf composite = allocator.compose(buf)) { + // the composing increments the reference count of constituent buffers... + // counter-act this so it can be extended: + a.close(); // buf is now owned so it can be extended. + try { + assertThrows(IllegalArgumentException.class, () -> Allocator.extend(buf, composite)); + } finally { + a.acquire(); // restore the reference count to align with our try-with-resources structure. + } + } + assertTrue(buf.isOwned()); + } finally { + a.acquire(); + } + } + } + } + @ParameterizedTest @MethodSource("allocators") public void ensureWritableMustThrowForBorrowedBuffers(Fixture fixture) { @@ -1852,13 +1899,32 @@ public class BufTest { } try (composite) { var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite)); - assertThat(exc).hasMessageContaining("itself"); + assertThat(exc).hasMessageContaining("cannot be extended"); } } + } + + @Test + public void extendingWithZeroCapacityBufferHasNoEffect() { try (Allocator allocator = Allocator.heap(); Buf composite = allocator.compose()) { - var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite)); - assertThat(exc).hasMessageContaining("itself"); + Allocator.extend(composite, composite); + assertThat(composite.capacity()).isZero(); + assertThat(composite.countComponents()).isZero(); + } + try (Allocator allocator = Allocator.heap()) { + Buf a = allocator.allocate(1); + Buf composite = allocator.compose(a); + a.close(); + assertTrue(composite.isOwned()); + assertThat(composite.capacity()).isOne(); + assertThat(composite.countComponents()).isOne(); + try (Buf b = allocator.compose()) { + Allocator.extend(composite, b); + } + assertTrue(composite.isOwned()); + assertThat(composite.capacity()).isOne(); + assertThat(composite.countComponents()).isOne(); } }