Avoid nesting composite buffers

Motivation:
There is no reason that composite buffers should nest when composed.
Instead, when composite buffers are used to compose or extend other composite buffers, we should unwrap them and copy the references to their constituent buffers.

Modification:
Composite buffers now always unwrap and flatten themselves when they participate in composition or extension of other composite buffers.

Result:
Composite buffers are now always guaranteed* to contain a single level of non-composed leaf buffers.

*assuming no other unknown buffer-wrapping buffer type is in the mix.
This commit is contained in:
Chris Vest 2021-01-18 16:06:53 +01:00
parent 202dd54ff2
commit 14d55c3e0b
2 changed files with 125 additions and 11 deletions

View File

@ -21,7 +21,11 @@ import io.netty.buffer.api.ComponentProcessor.WritableComponentProcessor;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
/**
@ -62,8 +66,25 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
// This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable,
// will never overflow their component counts.
// Allocating a new array unconditionally also prevents external modification of the array.
// TODO if any buffer is itself a composite buffer, then we should unwrap its sub-buffers
return Arrays.stream(bufs).filter(b -> b.capacity() > 0).toArray(Buf[]::new);
bufs = Arrays.stream(bufs)
.filter(b -> b.capacity() > 0)
.flatMap(CompositeBuf::flattenBuffer)
.toArray(Buf[]::new);
// Make sure there are no duplicates among the buffers.
Set<Buf> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
duplicatesCheck.addAll(Arrays.asList(bufs));
if (duplicatesCheck.size() < bufs.length) {
throw new IllegalArgumentException(
"Cannot create composite buffer with duplicate constituent buffer components.");
}
return bufs;
}
private static Stream<Buf> flattenBuffer(Buf buf) {
if (buf instanceof CompositeBuf) {
return Stream.of(((CompositeBuf) buf).bufs);
}
return Stream.of(buf);
}
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
@ -616,9 +637,6 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
if (!isOwned()) {
throw new IllegalStateException("This buffer cannot be extended because it is not in an owned state.");
}
if (extension == this) {
throw new IllegalArgumentException("This buffer cannot be extended with itself.");
}
if (bufs.length > 0 && extension.order() != order()) {
throw new IllegalArgumentException(
"This buffer uses " + order() + " byte order, and cannot be extended with " +
@ -639,14 +657,38 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
// overflow in their component counters.
return;
}
// TODO if extension is itself a composite buffer, then we should extend ourselves by all of the sub-buffers
long newSize = capacity() + extensionCapacity;
Allocator.checkSize(newSize);
Buf[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
try {
unsafeExtendWith(extension.acquire());
if (extension instanceof CompositeBuf) {
// If the extension is itself a composite buffer, then extend this one by all of the constituent
// component buffers.
CompositeBuf compositeExtension = (CompositeBuf) extension;
Buf[] addedBuffers = compositeExtension.bufs;
Set<Buf> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
duplicatesCheck.addAll(Arrays.asList(bufs));
duplicatesCheck.addAll(Arrays.asList(addedBuffers));
if (duplicatesCheck.size() < bufs.length + addedBuffers.length) {
throw extensionDuplicatesException();
}
for (Buf addedBuffer : addedBuffers) {
addedBuffer.acquire();
}
int extendAtIndex = bufs.length;
bufs = Arrays.copyOf(bufs, extendAtIndex + addedBuffers.length);
System.arraycopy(addedBuffers, 0, bufs, extendAtIndex, addedBuffers.length);
computeBufferOffsets();
} else {
for (Buf buf : restoreTemp) {
if (buf == extension) {
throw extensionDuplicatesException();
}
}
unsafeExtendWith(extension.acquire());
}
if (restoreTemp.length == 0) {
order = extension.order();
readOnly = extension.readOnly();
@ -657,6 +699,12 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
}
private static IllegalArgumentException extensionDuplicatesException() {
return new IllegalArgumentException(
"The composite buffer cannot be extended with the given extension," +
" as it would cause the buffer to have duplicate constituent buffers.");
}
private void unsafeExtendWith(Buf extension) {
bufs = Arrays.copyOf(bufs, bufs.length + 1);
bufs[bufs.length - 1] = extension;

View File

@ -879,7 +879,7 @@ public class BufTest {
}
@ParameterizedTest
@MethodSource("allocators")
@MethodSource("nonCompositeAllocators")
public void acquireComposingAndSlicingMustIncrementBorrows(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
@ -1628,6 +1628,53 @@ public class BufTest {
}
}
@Test
public void compositeBuffersCannotHaveDuplicateComponents() {
try (Allocator allocator = Allocator.heap();
Buf a = allocator.allocate(4)) {
var e = assertThrows(IllegalArgumentException.class, () -> allocator.compose(a, a));
assertThat(e).hasMessageContaining("duplicate");
try (Buf composite = allocator.compose(a)) {
a.close();
try {
e = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, a));
assertThat(e).hasMessageContaining("duplicate");
} finally {
a.acquire();
}
}
}
}
@Test
public void compositeBufferMustNotBeAllowedToContainThemselves() {
try (Allocator allocator = Allocator.heap()) {
Buf a = allocator.allocate(4);
Buf buf = allocator.compose(a);
try (buf; a) {
a.close();
try {
assertThrows(IllegalArgumentException.class, () -> Allocator.extend(buf, buf));
assertTrue(buf.isOwned());
try (Buf composite = allocator.compose(buf)) {
// the composing increments the reference count of constituent buffers...
// counter-act this so it can be extended:
a.close(); // buf is now owned so it can be extended.
try {
assertThrows(IllegalArgumentException.class, () -> Allocator.extend(buf, composite));
} finally {
a.acquire(); // restore the reference count to align with our try-with-resources structure.
}
}
assertTrue(buf.isOwned());
} finally {
a.acquire();
}
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void ensureWritableMustThrowForBorrowedBuffers(Fixture fixture) {
@ -1852,13 +1899,32 @@ public class BufTest {
}
try (composite) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite));
assertThat(exc).hasMessageContaining("itself");
assertThat(exc).hasMessageContaining("cannot be extended");
}
}
}
@Test
public void extendingWithZeroCapacityBufferHasNoEffect() {
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose()) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite));
assertThat(exc).hasMessageContaining("itself");
Allocator.extend(composite, composite);
assertThat(composite.capacity()).isZero();
assertThat(composite.countComponents()).isZero();
}
try (Allocator allocator = Allocator.heap()) {
Buf a = allocator.allocate(1);
Buf composite = allocator.compose(a);
a.close();
assertTrue(composite.isOwned());
assertThat(composite.capacity()).isOne();
assertThat(composite.countComponents()).isOne();
try (Buf b = allocator.compose()) {
Allocator.extend(composite, b);
}
assertTrue(composite.isOwned());
assertThat(composite.capacity()).isOne();
assertThat(composite.countComponents()).isOne();
}
}