Merge pull request #12 from netty/extend-composite

Make it possible to extend composite buffers after creation
This commit is contained in:
Chris Vest 2020-12-08 19:25:32 +01:00 committed by GitHub
commit 3aeebdd058
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 334 additions and 11 deletions

View File

@ -111,6 +111,27 @@ public interface Allocator extends AutoCloseable {
return new CompositeBuf(this, bufs);
}
/**
* Extend the given composite buffer with the given extension buffer.
* This works as if the extension had originally been included at the end of the list of constituent buffers when
* the composite buffer was created.
* The composite buffer is modified in-place.
*
* @see #compose(Buf...)
* @param composite The composite buffer (from a prior {@link #compose(Buf...)} call) to extend with the given
* extension buffer.
* @param extension The buffer to extend the composite buffer with.
*/
static void extend(Buf composite, Buf extension) {
if (composite.getClass() != CompositeBuf.class) {
throw new IllegalArgumentException(
"Expected the first buffer to be a composite buffer, " +
"but it is a " + composite.getClass() + " buffer: " + composite + '.');
}
CompositeBuf buf = (CompositeBuf) composite;
buf.extendWith(extension);
}
/**
* Close this allocator, freeing all of its internal resources. It is not specified if the allocator can still be
* used after this method has been called on it.

View File

@ -19,6 +19,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.Objects;
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
/**
@ -64,6 +65,16 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
throw new IllegalArgumentException("Constituent buffers have inconsistent byte order.");
}
}
}
this.bufs = bufs;
computeBufferOffsets();
tornBufAccessors = new TornBufAccessors(this);
}
private void computeBufferOffsets() {
if (bufs.length > 0) {
int woff = 0;
int roff = 0;
boolean woffMidpoint = false;
for (Buf buf : bufs) {
if (buf.writableBytes() == 0) {
@ -73,7 +84,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
woffMidpoint = true;
} else if (buf.writerOffset() != 0) {
throw new IllegalArgumentException(
"The given buffers cannot be composed because they have an unwritten gap: " +
"The given buffers cannot be composed because they leave an unwritten gap: " +
Arrays.toString(bufs) + '.');
}
}
@ -86,19 +97,17 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
roffMidpoint = true;
} else if (buf.readerOffset() != 0) {
throw new IllegalArgumentException(
"The given buffers cannot be composed because they have an unread gap: " +
"The given buffers cannot be composed because they leave an unread gap: " +
Arrays.toString(bufs) + '.');
}
}
assert roff <= woff:
"The given buffers place the read offset ahead of the write offset: " + Arrays.toString(bufs) + '.';
// Commit computed offsets.
this.woff = woff;
this.roff = roff;
}
this.bufs = bufs;
computeBufferOffsets();
tornBufAccessors = new TornBufAccessors(this);
}
private void computeBufferOffsets() {
offsets = new int[bufs.length];
long cap = 0;
for (int i = 0; i < bufs.length; i++) {
@ -129,7 +138,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
@Override
public ByteOrder order() {
return bufs[0].order();
return bufs.length > 0? bufs[0].order() : ByteOrder.nativeOrder();
}
@Override
@ -481,12 +490,42 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
long newSize = capacity() + (long) size;
Allocator.checkSize(newSize);
int growth = size - writableBytes();
bufs = Arrays.copyOf(bufs, bufs.length + 1);
bufs[bufs.length - 1] = allocator.allocate(growth);
computeBufferOffsets();
Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
unsafeExtendWith(extension);
}
}
void extendWith(Buf extension) {
Objects.requireNonNull(extension, "Extension buffer cannot be null.");
if (!isOwned()) {
throw new IllegalStateException("This buffer cannot be extended because it is not in an owned state.");
}
if (extension == this) {
throw new IllegalArgumentException("This buffer cannot be extended with itself.");
}
if (bufs.length > 0 && extension.order() != order()) {
throw new IllegalArgumentException(
"This buffer uses " + order() + " byte order, and cannot be extended with " +
"a buffer that uses " + extension.order() + " byte order.");
}
long newSize = capacity() + (long) extension.capacity();
Allocator.checkSize(newSize);
Buf[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
try {
unsafeExtendWith(extension.acquire());
} catch (Exception e) {
bufs = restoreTemp;
throw e;
}
}
private void unsafeExtendWith(Buf extension) {
bufs = Arrays.copyOf(bufs, bufs.length + 1);
bufs[bufs.length - 1] = extension;
computeBufferOffsets();
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors.">
@Override
public byte readByte() {

View File

@ -1512,6 +1512,40 @@ public class BufTest {
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableOnCompositeBuffersMustRespectExistingBigEndianByteOrder(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator()) {
Buf composite;
try (Buf a = allocator.allocate(4, ByteOrder.BIG_ENDIAN)) {
composite = allocator.compose(a);
}
try (composite) {
composite.writeInt(0x01020304);
composite.ensureWritable(4);
composite.writeInt(0x05060708);
assertEquals(0x0102030405060708L, composite.readLong());
}
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableOnCompositeBuffersMustRespectExistingLittleEndianByteOrder(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator()) {
Buf composite;
try (Buf a = allocator.allocate(4, ByteOrder.LITTLE_ENDIAN)) {
composite = allocator.compose(a);
}
try (composite) {
composite.writeInt(0x05060708);
composite.ensureWritable(4);
composite.writeInt(0x01020304);
assertEquals(0x0102030405060708L, composite.readLong());
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void pooledBuffersMustResetStateBeforeReuse(Fixture fixture) {
@ -1544,6 +1578,235 @@ public class BufTest {
}
}
@Test
public void emptyCompositeBufferMustUseNativeByteOrder() {
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose()) {
assertThat(composite.order()).isEqualTo(ByteOrder.nativeOrder());
}
}
@Test
public void extendOnNonCompositeBufferMustThrow() {
try (Allocator allocator = Allocator.heap();
Buf a = allocator.allocate(8);
Buf b = allocator.allocate(8)) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(a, b));
assertThat(exc).hasMessageContaining("Expected").hasMessageContaining("composite");
}
}
@Test
public void extendingNonOwnedCompositeBufferMustThrow() {
try (Allocator allocator = Allocator.heap();
Buf a = allocator.allocate(8);
Buf b = allocator.allocate(8);
Buf composed = allocator.compose(a)) {
try (Buf ignore = composed.acquire()) {
var exc = assertThrows(IllegalStateException.class, () -> Allocator.extend(composed, b));
assertThat(exc).hasMessageContaining("owned");
}
}
}
@Test
public void extendingCompositeBufferWithItselfMustThrow() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8)) {
composite = allocator.compose(a);
}
try (composite) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite));
assertThat(exc).hasMessageContaining("itself");
}
}
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose()) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite));
assertThat(exc).hasMessageContaining("itself");
}
}
@Test
public void extendingCompositeBufferWithNullMustThrow() {
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose()) {
assertThrows(NullPointerException.class, () -> Allocator.extend(composite, null));
}
}
@Test
public void extendingCompositeBufferMustIncreaseCapacityByGivenBigEndianBuffer() {
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose()) {
assertThat(composite.capacity()).isZero();
try (Buf buf = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) {
Allocator.extend(composite, buf);
}
assertThat(composite.capacity()).isEqualTo(8);
composite.writeLong(0x0102030405060708L);
assertThat(composite.readLong()).isEqualTo(0x0102030405060708L);
}
}
@Test
public void extendingCompositeBufferMustIncreaseCapacityByGivenLittleEndianBuffer() {
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose()) {
assertThat(composite.capacity()).isZero();
try (Buf buf = allocator.allocate(8, ByteOrder.LITTLE_ENDIAN)) {
Allocator.extend(composite, buf);
}
assertThat(composite.capacity()).isEqualTo(8);
composite.writeLong(0x0102030405060708L);
assertThat(composite.readLong()).isEqualTo(0x0102030405060708L);
}
}
@Test
public void extendingBigEndianCompositeBufferMustThrowIfExtensionIsLittleEndian() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) {
composite = allocator.compose(a);
}
try (composite) {
try (Buf b = allocator.allocate(8, ByteOrder.LITTLE_ENDIAN)) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b));
assertThat(exc).hasMessageContaining("byte order");
}
}
}
}
@Test
public void extendingLittleEndianCompositeBufferMustThrowIfExtensionIsBigEndian() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8, ByteOrder.LITTLE_ENDIAN)) {
composite = allocator.compose(a);
}
try (composite) {
try (Buf b = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) {
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b));
assertThat(exc).hasMessageContaining("byte order");
}
}
}
}
@Test
public void emptyCompositeBufferMustAllowExtendingWithBufferWithBigEndianByteOrder() {
try (Allocator allocator = Allocator.heap()) {
try (Buf composite = allocator.compose()) {
try (Buf b = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) {
Allocator.extend(composite, b);
assertThat(composite.order()).isEqualTo(ByteOrder.BIG_ENDIAN);
}
}
}
}
@Test
public void emptyCompositeBufferMustAllowExtendingWithBufferWithLittleEndianByteOrder() {
try (Allocator allocator = Allocator.heap()) {
try (Buf composite = allocator.compose()) {
try (Buf b = allocator.allocate(8, ByteOrder.LITTLE_ENDIAN)) {
Allocator.extend(composite, b);
assertThat(composite.order()).isEqualTo(ByteOrder.LITTLE_ENDIAN);
}
}
}
}
@Test
public void whenExtendingCompositeBufferWithWriteOffsetAtCapacityExtensionWriteOffsetCanBeNonZero() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8)) {
composite = allocator.compose(a);
}
try (composite) {
composite.writeLong(0);
try (Buf b = allocator.allocate(8)) {
b.writeInt(1);
Allocator.extend(composite, b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
}
}
}
}
@Test
public void whenExtendingCompositeBufferWithWriteOffsetLessThanCapacityExtensionWriteOffsetMustZero() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8)) {
composite = allocator.compose(a);
}
try (composite) {
composite.writeInt(0);
try (Buf b = allocator.allocate(8)) {
b.writeInt(1);
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b));
assertThat(exc).hasMessageContaining("unwritten gap");
b.writerOffset(0);
Allocator.extend(composite, b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(4);
}
}
}
}
@Test
public void whenExtendingCompositeBufferWithReadOffsetAtCapacityExtensionReadOffsetCanBeNonZero() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8)) {
composite = allocator.compose(a);
}
try (composite) {
composite.writeLong(0);
composite.readLong();
try (Buf b = allocator.allocate(8)) {
b.writeInt(1);
b.readInt();
Allocator.extend(composite, b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
}
}
}
}
@Test
public void whenExtendingCompositeBufferWithReadOffsetLessThanCapacityExtensionReadOffsetMustZero() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8)) {
composite = allocator.compose(a);
}
try (composite) {
composite.writeLong(0);
composite.readInt();
try (Buf b = allocator.allocate(8)) {
b.writeInt(1);
b.readInt();
var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b));
assertThat(exc).hasMessageContaining("unread gap");
b.readerOffset(0);
Allocator.extend(composite, b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
assertThat(composite.readerOffset()).isEqualTo(4);
}
}
}
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors tests.">
@ParameterizedTest
@MethodSource("allocators")