Merge pull request #25 from netty/composite-flatten

Avoid nesting composite buffers
This commit is contained in:
Chris Vest 2021-01-21 14:13:01 +01:00 committed by GitHub
commit 6e24f5d155
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 125 additions and 11 deletions

View File

@ -21,7 +21,11 @@ import io.netty.buffer.api.ComponentProcessor.WritableComponentProcessor;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
/**
@ -62,8 +66,25 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
// This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable,
// will never overflow their component counts.
// Allocating a new array unconditionally also prevents external modification of the array.
// TODO if any buffer is itself a composite buffer, then we should unwrap its sub-buffers
return Arrays.stream(bufs).filter(b -> b.capacity() > 0).toArray(Buf[]::new);
bufs = Arrays.stream(bufs)
.filter(b -> b.capacity() > 0)
.flatMap(CompositeBuf::flattenBuffer)
.toArray(Buf[]::new);
// Make sure there are no duplicates among the buffers.
Set<Buf> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
duplicatesCheck.addAll(Arrays.asList(bufs));
if (duplicatesCheck.size() < bufs.length) {
throw new IllegalArgumentException(
"Cannot create composite buffer with duplicate constituent buffer components.");
}
return bufs;
}
private static Stream<Buf> flattenBuffer(Buf buf) {
if (buf instanceof CompositeBuf) {
return Stream.of(((CompositeBuf) buf).bufs);
}
return Stream.of(buf);
}
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
@ -616,9 +637,6 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
if (!isOwned()) {
throw new IllegalStateException("This buffer cannot be extended because it is not in an owned state.");
}
if (extension == this) {
throw new IllegalArgumentException("This buffer cannot be extended with itself.");
}
if (bufs.length > 0 && extension.order() != order()) {
throw new IllegalArgumentException(
"This buffer uses " + order() + " byte order, and cannot be extended with " +
@ -639,14 +657,38 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
// overflow in their component counters.
return;
}
// TODO if extension is itself a composite buffer, then we should extend ourselves by all of the sub-buffers
long newSize = capacity() + extensionCapacity;
Allocator.checkSize(newSize);
Buf[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
try {
unsafeExtendWith(extension.acquire());
if (extension instanceof CompositeBuf) {
// If the extension is itself a composite buffer, then extend this one by all of the constituent
// component buffers.
CompositeBuf compositeExtension = (CompositeBuf) extension;
Buf[] addedBuffers = compositeExtension.bufs;
Set<Buf> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
duplicatesCheck.addAll(Arrays.asList(bufs));
duplicatesCheck.addAll(Arrays.asList(addedBuffers));
if (duplicatesCheck.size() < bufs.length + addedBuffers.length) {
throw extensionDuplicatesException();
}
for (Buf addedBuffer : addedBuffers) {
addedBuffer.acquire();
}
int extendAtIndex = bufs.length;
bufs = Arrays.copyOf(bufs, extendAtIndex + addedBuffers.length);
System.arraycopy(addedBuffers, 0, bufs, extendAtIndex, addedBuffers.length);
computeBufferOffsets();
} else {
for (Buf buf : restoreTemp) {
if (buf == extension) {
throw extensionDuplicatesException();
}
}
unsafeExtendWith(extension.acquire());
}
if (restoreTemp.length == 0) {
order = extension.order();
readOnly = extension.readOnly();
@ -657,6 +699,12 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
}
private static IllegalArgumentException extensionDuplicatesException() {
return new IllegalArgumentException(
"The composite buffer cannot be extended with the given extension," +
" as it would cause the buffer to have duplicate constituent buffers.");
}
private void unsafeExtendWith(Buf extension) {
bufs = Arrays.copyOf(bufs, bufs.length + 1);
bufs[bufs.length - 1] = extension;

View File

@ -879,7 +879,7 @@ public class BufTest {
}
@ParameterizedTest
@MethodSource("allocators")
@MethodSource("nonCompositeAllocators")
public void acquireComposingAndSlicingMustIncrementBorrows(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
@ -1628,6 +1628,53 @@ public class BufTest {
}
}
@Test
public void compositeBuffersCannotHaveDuplicateComponents() {
try (Allocator allocator = Allocator.heap();
Buf a = allocator.allocate(4)) {
var e = assertThrows(IllegalArgumentException.class, () -> allocator.compose(a, a));
assertThat(e).hasMessageContaining("duplicate");
try (Buf composite = allocator.compose(a)) {
a.close();
try {
e = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, a));
assertThat(e).hasMessageContaining("duplicate");
} finally {
a.acquire();
}
}
}
}
@Test
public void compositeBufferMustNotBeAllowedToContainThemselves() {
try (Allocator allocator = Allocator.heap()) {
Buf a = allocator.allocate(4);
Buf buf = allocator.compose(a);
try (buf; a) {
a.close();
try {
assertThrows(IllegalArgumentException.class, () -> Allocator.extend(buf, buf));
assertTrue(buf.isOwned());
try (Buf composite = allocator.compose(buf)) {
// the composing increments the reference count of constituent buffers...
// counter-act this so it can be extended:
a.close(); // buf is now owned so it can be extended.
try {
assertThrows(IllegalArgumentException.class, () -> Allocator.extend(buf, composite));
} finally {
a.acquire(); // restore the reference count to align with our try-with-resources structure.
}
}
assertTrue(buf.isOwned());
} finally {
a.acquire();
}
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void ensureWritableMustThrowForBorrowedBuffers(Fixture fixture) {
@ -1852,13 +1899,32 @@ public class BufTest {
}
try (composite) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite));
assertThat(exc).hasMessageContaining("itself");
assertThat(exc).hasMessageContaining("cannot be extended");
}
}
}
@Test
public void extendingWithZeroCapacityBufferHasNoEffect() {
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose()) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite));
assertThat(exc).hasMessageContaining("itself");
Allocator.extend(composite, composite);
assertThat(composite.capacity()).isZero();
assertThat(composite.countComponents()).isZero();
}
try (Allocator allocator = Allocator.heap()) {
Buf a = allocator.allocate(1);
Buf composite = allocator.compose(a);
a.close();
assertTrue(composite.isOwned());
assertThat(composite.capacity()).isOne();
assertThat(composite.countComponents()).isOne();
try (Buf b = allocator.compose()) {
Allocator.extend(composite, b);
}
assertTrue(composite.isOwned());
assertThat(composite.capacity()).isOne();
assertThat(composite.countComponents()).isOne();
}
}