Make it possible to extend composite buffers after creation

Motivation:
Composite buffers are uniquely positioned to be able to extend their underlying storage relatively cheaply.
This fact is relied upon in a couple of buffer use cases within Netty, that we wish to support.

Modification:
Add a static `extend` method to Allocator, so that the CompositeBuf class can remain internal.
The `extend` method inserts the extension buffer at the end of the composite buffer as if it had been included from the start.
This involves checking offsets and byte order invariants.
We also require that the composite buffer be in an owned state.

Result:
It's now possible to extend a composite buffer with a specific buffer, after the composite buffer has been created.
This commit is contained in:
Chris Vest 2020-12-03 17:48:28 +01:00
parent b0da25d888
commit 236097e081
3 changed files with 334 additions and 11 deletions

View File

@ -111,6 +111,27 @@ public interface Allocator extends AutoCloseable {
return new CompositeBuf(this, bufs);
}
/**
* Extend the given composite buffer with the given extension buffer.
* This works as if the extension had originally been included at the end of the list of constituent buffers when
* the composite buffer was created.
* The composite buffer is modified in-place.
*
* @see #compose(Buf...)
* @param composite The composite buffer (from a prior {@link #compose(Buf...)} call) to extend with the given
* extension buffer.
* @param extension The buffer to extend the composite buffer with.
*/
static void extend(Buf composite, Buf extension) {
if (composite.getClass() != CompositeBuf.class) {
throw new IllegalArgumentException(
"Expected the first buffer to be a composite buffer, " +
"but it is a " + composite.getClass() + " buffer: " + composite + '.');
}
CompositeBuf buf = (CompositeBuf) composite;
buf.extendWith(extension);
}
/**
* Close this allocator, freeing all of its internal resources. It is not specified if the allocator can still be
* used after this method has been called on it.

View File

@ -19,6 +19,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.Objects;
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
/**
@ -64,6 +65,16 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
throw new IllegalArgumentException("Constituent buffers have inconsistent byte order.");
}
}
}
this.bufs = bufs;
computeBufferOffsets();
tornBufAccessors = new TornBufAccessors(this);
}
private void computeBufferOffsets() {
if (bufs.length > 0) {
int woff = 0;
int roff = 0;
boolean woffMidpoint = false;
for (Buf buf : bufs) {
if (buf.writableBytes() == 0) {
@ -73,7 +84,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
woffMidpoint = true;
} else if (buf.writerOffset() != 0) {
throw new IllegalArgumentException(
"The given buffers cannot be composed because they have an unwritten gap: " +
"The given buffers cannot be composed because they leave an unwritten gap: " +
Arrays.toString(bufs) + '.');
}
}
@ -86,19 +97,17 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
roffMidpoint = true;
} else if (buf.readerOffset() != 0) {
throw new IllegalArgumentException(
"The given buffers cannot be composed because they have an unread gap: " +
"The given buffers cannot be composed because they leave an unread gap: " +
Arrays.toString(bufs) + '.');
}
}
assert roff <= woff:
"The given buffers place the read offset ahead of the write offset: " + Arrays.toString(bufs) + '.';
// Commit computed offsets.
this.woff = woff;
this.roff = roff;
}
this.bufs = bufs;
computeBufferOffsets();
tornBufAccessors = new TornBufAccessors(this);
}
private void computeBufferOffsets() {
offsets = new int[bufs.length];
long cap = 0;
for (int i = 0; i < bufs.length; i++) {
@ -129,7 +138,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
@Override
public ByteOrder order() {
return bufs[0].order();
return bufs.length > 0? bufs[0].order() : ByteOrder.nativeOrder();
}
@Override
@ -481,12 +490,42 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
long newSize = capacity() + (long) size;
Allocator.checkSize(newSize);
int growth = size - writableBytes();
bufs = Arrays.copyOf(bufs, bufs.length + 1);
bufs[bufs.length - 1] = allocator.allocate(growth);
computeBufferOffsets();
Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
unsafeExtendWith(extension);
}
}
void extendWith(Buf extension) {
Objects.requireNonNull(extension, "Extension buffer cannot be null.");
if (!isOwned()) {
throw new IllegalStateException("This buffer cannot be extended because it is not in an owned state.");
}
if (extension == this) {
throw new IllegalArgumentException("This buffer cannot be extended with itself.");
}
if (bufs.length > 0 && extension.order() != order()) {
throw new IllegalArgumentException(
"This buffer uses " + order() + " byte order, and cannot be extended with " +
"a buffer that uses " + extension.order() + " byte order.");
}
long newSize = capacity() + (long) extension.capacity();
Allocator.checkSize(newSize);
Buf[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
try {
unsafeExtendWith(extension.acquire());
} catch (Exception e) {
bufs = restoreTemp;
throw e;
}
}
private void unsafeExtendWith(Buf extension) {
bufs = Arrays.copyOf(bufs, bufs.length + 1);
bufs[bufs.length - 1] = extension;
computeBufferOffsets();
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors.">
@Override
public byte readByte() {

View File

@ -1512,6 +1512,40 @@ public class BufTest {
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableOnCompositeBuffersMustRespectExistingBigEndianByteOrder(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator()) {
Buf composite;
try (Buf a = allocator.allocate(4, ByteOrder.BIG_ENDIAN)) {
composite = allocator.compose(a);
}
try (composite) {
composite.writeInt(0x01020304);
composite.ensureWritable(4);
composite.writeInt(0x05060708);
assertEquals(0x0102030405060708L, composite.readLong());
}
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableOnCompositeBuffersMustRespectExistingLittleEndianByteOrder(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator()) {
Buf composite;
try (Buf a = allocator.allocate(4, ByteOrder.LITTLE_ENDIAN)) {
composite = allocator.compose(a);
}
try (composite) {
composite.writeInt(0x05060708);
composite.ensureWritable(4);
composite.writeInt(0x01020304);
assertEquals(0x0102030405060708L, composite.readLong());
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void pooledBuffersMustResetStateBeforeReuse(Fixture fixture) {
@ -1544,6 +1578,235 @@ public class BufTest {
}
}
@Test
public void emptyCompositeBufferMustUseNativeByteOrder() {
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose()) {
assertThat(composite.order()).isEqualTo(ByteOrder.nativeOrder());
}
}
@Test
public void extendOnNonCompositeBufferMustThrow() {
try (Allocator allocator = Allocator.heap();
Buf a = allocator.allocate(8);
Buf b = allocator.allocate(8)) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(a, b));
assertThat(exc).hasMessageContaining("Expected").hasMessageContaining("composite");
}
}
@Test
public void extendingNonOwnedCompositeBufferMustThrow() {
try (Allocator allocator = Allocator.heap();
Buf a = allocator.allocate(8);
Buf b = allocator.allocate(8);
Buf composed = allocator.compose(a)) {
try (Buf ignore = composed.acquire()) {
var exc = assertThrows(IllegalStateException.class, () -> Allocator.extend(composed, b));
assertThat(exc).hasMessageContaining("owned");
}
}
}
@Test
public void extendingCompositeBufferWithItselfMustThrow() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8)) {
composite = allocator.compose(a);
}
try (composite) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite));
assertThat(exc).hasMessageContaining("itself");
}
}
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose()) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite));
assertThat(exc).hasMessageContaining("itself");
}
}
@Test
public void extendingCompositeBufferWithNullMustThrow() {
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose()) {
assertThrows(NullPointerException.class, () -> Allocator.extend(composite, null));
}
}
@Test
public void extendingCompositeBufferMustIncreaseCapacityByGivenBigEndianBuffer() {
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose()) {
assertThat(composite.capacity()).isZero();
try (Buf buf = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) {
Allocator.extend(composite, buf);
}
assertThat(composite.capacity()).isEqualTo(8);
composite.writeLong(0x0102030405060708L);
assertThat(composite.readLong()).isEqualTo(0x0102030405060708L);
}
}
@Test
public void extendingCompositeBufferMustIncreaseCapacityByGivenLittleEndianBuffer() {
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose()) {
assertThat(composite.capacity()).isZero();
try (Buf buf = allocator.allocate(8, ByteOrder.LITTLE_ENDIAN)) {
Allocator.extend(composite, buf);
}
assertThat(composite.capacity()).isEqualTo(8);
composite.writeLong(0x0102030405060708L);
assertThat(composite.readLong()).isEqualTo(0x0102030405060708L);
}
}
@Test
public void extendingBigEndianCompositeBufferMustThrowIfExtensionIsLittleEndian() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) {
composite = allocator.compose(a);
}
try (composite) {
try (Buf b = allocator.allocate(8, ByteOrder.LITTLE_ENDIAN)) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b));
assertThat(exc).hasMessageContaining("byte order");
}
}
}
}
@Test
public void extendingLittleEndianCompositeBufferMustThrowIfExtensionIsBigEndian() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8, ByteOrder.LITTLE_ENDIAN)) {
composite = allocator.compose(a);
}
try (composite) {
try (Buf b = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b));
assertThat(exc).hasMessageContaining("byte order");
}
}
}
}
@Test
public void emptyCompositeBufferMustAllowExtendingWithBufferWithBigEndianByteOrder() {
try (Allocator allocator = Allocator.heap()) {
try (Buf composite = allocator.compose()) {
try (Buf b = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) {
Allocator.extend(composite, b);
assertThat(composite.order()).isEqualTo(ByteOrder.BIG_ENDIAN);
}
}
}
}
@Test
public void emptyCompositeBufferMustAllowExtendingWithBufferWithLittleEndianByteOrder() {
try (Allocator allocator = Allocator.heap()) {
try (Buf composite = allocator.compose()) {
try (Buf b = allocator.allocate(8, ByteOrder.LITTLE_ENDIAN)) {
Allocator.extend(composite, b);
assertThat(composite.order()).isEqualTo(ByteOrder.LITTLE_ENDIAN);
}
}
}
}
@Test
public void whenExtendingCompositeBufferWithWriteOffsetAtCapacityExtensionWriteOffsetCanBeNonZero() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8)) {
composite = allocator.compose(a);
}
try (composite) {
composite.writeLong(0);
try (Buf b = allocator.allocate(8)) {
b.writeInt(1);
Allocator.extend(composite, b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
}
}
}
}
@Test
public void whenExtendingCompositeBufferWithWriteOffsetLessThanCapacityExtensionWriteOffsetMustZero() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8)) {
composite = allocator.compose(a);
}
try (composite) {
composite.writeInt(0);
try (Buf b = allocator.allocate(8)) {
b.writeInt(1);
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b));
assertThat(exc).hasMessageContaining("unwritten gap");
b.writerOffset(0);
Allocator.extend(composite, b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(4);
}
}
}
}
@Test
public void whenExtendingCompositeBufferWithReadOffsetAtCapacityExtensionReadOffsetCanBeNonZero() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8)) {
composite = allocator.compose(a);
}
try (composite) {
composite.writeLong(0);
composite.readLong();
try (Buf b = allocator.allocate(8)) {
b.writeInt(1);
b.readInt();
Allocator.extend(composite, b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
}
}
}
}
@Test
public void whenExtendingCompositeBufferWithReadOffsetLessThanCapacityExtensionReadOffsetMustZero() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8)) {
composite = allocator.compose(a);
}
try (composite) {
composite.writeLong(0);
composite.readInt();
try (Buf b = allocator.allocate(8)) {
b.writeInt(1);
b.readInt();
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b));
assertThat(exc).hasMessageContaining("unread gap");
b.readerOffset(0);
Allocator.extend(composite, b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
assertThat(composite.readerOffset()).isEqualTo(4);
}
}
}
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors tests.">
@ParameterizedTest
@MethodSource("allocators")