From 236097e0810c411c1483eff6a08b7fbfbc0f17a5 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 3 Dec 2020 17:48:28 +0100 Subject: [PATCH] Make it possible to extend composite buffers after creation Motivation: Composite buffers are uniquely positioned to be able to extend their underlying storage relatively cheaply. This fact is relied upon in a couple of buffer use cases within Netty, that we wish to support. Modification: Add a static `extend` method to Allocator, so that the CompositeBuf class can remain internal. The `extend` method inserts the extension buffer at the end of the composite buffer as if it had been included from the start. This involves checking offsets and byte order invariants. We also require that the composite buffer be in an owned state. Result: It's now possible to extend a composite buffer with a specific buffer, after the composite buffer has been created. --- .../java/io/netty/buffer/api/Allocator.java | 21 ++ .../io/netty/buffer/api/CompositeBuf.java | 61 +++- .../java/io/netty/buffer/api/BufTest.java | 263 ++++++++++++++++++ 3 files changed, 334 insertions(+), 11 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/Allocator.java b/src/main/java/io/netty/buffer/api/Allocator.java index e0e81bc..ef70175 100644 --- a/src/main/java/io/netty/buffer/api/Allocator.java +++ b/src/main/java/io/netty/buffer/api/Allocator.java @@ -111,6 +111,27 @@ public interface Allocator extends AutoCloseable { return new CompositeBuf(this, bufs); } + /** + * Extend the given composite buffer with the given extension buffer. + * This works as if the extension had originally been included at the end of the list of constituent buffers when + * the composite buffer was created. + * The composite buffer is modified in-place. + * + * @see #compose(Buf...) + * @param composite The composite buffer (from a prior {@link #compose(Buf...)} call) to extend with the given + * extension buffer. + * @param extension The buffer to extend the composite buffer with. + */ + static void extend(Buf composite, Buf extension) { + if (composite.getClass() != CompositeBuf.class) { + throw new IllegalArgumentException( + "Expected the first buffer to be a composite buffer, " + + "but it is a " + composite.getClass() + " buffer: " + composite + '.'); + } + CompositeBuf buf = (CompositeBuf) composite; + buf.extendWith(extension); + } + /** * Close this allocator, freeing all of its internal resources. It is not specified if the allocator can still be * used after this method has been called on it. diff --git a/src/main/java/io/netty/buffer/api/CompositeBuf.java b/src/main/java/io/netty/buffer/api/CompositeBuf.java index d7da05e..3492b0e 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuf.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuf.java @@ -19,6 +19,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; import java.util.NoSuchElementException; +import java.util.Objects; final class CompositeBuf extends RcSupport implements Buf { /** @@ -64,6 +65,16 @@ final class CompositeBuf extends RcSupport implements Buf { throw new IllegalArgumentException("Constituent buffers have inconsistent byte order."); } } + } + this.bufs = bufs; + computeBufferOffsets(); + tornBufAccessors = new TornBufAccessors(this); + } + + private void computeBufferOffsets() { + if (bufs.length > 0) { + int woff = 0; + int roff = 0; boolean woffMidpoint = false; for (Buf buf : bufs) { if (buf.writableBytes() == 0) { @@ -73,7 +84,7 @@ final class CompositeBuf extends RcSupport implements Buf { woffMidpoint = true; } else if (buf.writerOffset() != 0) { throw new IllegalArgumentException( - "The given buffers cannot be composed because they have an unwritten gap: " + + "The given buffers cannot be composed because they leave an unwritten gap: " + Arrays.toString(bufs) + '.'); } } @@ -86,19 +97,17 @@ final class CompositeBuf extends RcSupport implements Buf { roffMidpoint = true; } else if (buf.readerOffset() != 0) { throw new IllegalArgumentException( - "The given buffers cannot be composed because they have an unread gap: " + + "The given buffers cannot be composed because they leave an unread gap: " + Arrays.toString(bufs) + '.'); } } assert roff <= woff: "The given buffers place the read offset ahead of the write offset: " + Arrays.toString(bufs) + '.'; + // Commit computed offsets. + this.woff = woff; + this.roff = roff; } - this.bufs = bufs; - computeBufferOffsets(); - tornBufAccessors = new TornBufAccessors(this); - } - private void computeBufferOffsets() { offsets = new int[bufs.length]; long cap = 0; for (int i = 0; i < bufs.length; i++) { @@ -129,7 +138,7 @@ final class CompositeBuf extends RcSupport implements Buf { @Override public ByteOrder order() { - return bufs[0].order(); + return bufs.length > 0? bufs[0].order() : ByteOrder.nativeOrder(); } @Override @@ -481,12 +490,42 @@ final class CompositeBuf extends RcSupport implements Buf { long newSize = capacity() + (long) size; Allocator.checkSize(newSize); int growth = size - writableBytes(); - bufs = Arrays.copyOf(bufs, bufs.length + 1); - bufs[bufs.length - 1] = allocator.allocate(growth); - computeBufferOffsets(); + Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order()); + unsafeExtendWith(extension); } } + void extendWith(Buf extension) { + Objects.requireNonNull(extension, "Extension buffer cannot be null."); + if (!isOwned()) { + throw new IllegalStateException("This buffer cannot be extended because it is not in an owned state."); + } + if (extension == this) { + throw new IllegalArgumentException("This buffer cannot be extended with itself."); + } + if (bufs.length > 0 && extension.order() != order()) { + throw new IllegalArgumentException( + "This buffer uses " + order() + " byte order, and cannot be extended with " + + "a buffer that uses " + extension.order() + " byte order."); + } + long newSize = capacity() + (long) extension.capacity(); + Allocator.checkSize(newSize); + + Buf[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail. + try { + unsafeExtendWith(extension.acquire()); + } catch (Exception e) { + bufs = restoreTemp; + throw e; + } + } + + private void unsafeExtendWith(Buf extension) { + bufs = Arrays.copyOf(bufs, bufs.length + 1); + bufs[bufs.length - 1] = extension; + computeBufferOffsets(); + } + // @Override public byte readByte() { diff --git a/src/test/java/io/netty/buffer/api/BufTest.java b/src/test/java/io/netty/buffer/api/BufTest.java index 1add288..70d027f 100644 --- a/src/test/java/io/netty/buffer/api/BufTest.java +++ b/src/test/java/io/netty/buffer/api/BufTest.java @@ -1512,6 +1512,40 @@ public class BufTest { } } + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void ensureWritableOnCompositeBuffersMustRespectExistingBigEndianByteOrder(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator()) { + Buf composite; + try (Buf a = allocator.allocate(4, ByteOrder.BIG_ENDIAN)) { + composite = allocator.compose(a); + } + try (composite) { + composite.writeInt(0x01020304); + composite.ensureWritable(4); + composite.writeInt(0x05060708); + assertEquals(0x0102030405060708L, composite.readLong()); + } + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void ensureWritableOnCompositeBuffersMustRespectExistingLittleEndianByteOrder(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator()) { + Buf composite; + try (Buf a = allocator.allocate(4, ByteOrder.LITTLE_ENDIAN)) { + composite = allocator.compose(a); + } + try (composite) { + composite.writeInt(0x05060708); + composite.ensureWritable(4); + composite.writeInt(0x01020304); + assertEquals(0x0102030405060708L, composite.readLong()); + } + } + } + @ParameterizedTest @MethodSource("allocators") public void pooledBuffersMustResetStateBeforeReuse(Fixture fixture) { @@ -1544,6 +1578,235 @@ public class BufTest { } } + @Test + public void emptyCompositeBufferMustUseNativeByteOrder() { + try (Allocator allocator = Allocator.heap(); + Buf composite = allocator.compose()) { + assertThat(composite.order()).isEqualTo(ByteOrder.nativeOrder()); + } + } + + @Test + public void extendOnNonCompositeBufferMustThrow() { + try (Allocator allocator = Allocator.heap(); + Buf a = allocator.allocate(8); + Buf b = allocator.allocate(8)) { + var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(a, b)); + assertThat(exc).hasMessageContaining("Expected").hasMessageContaining("composite"); + } + } + + @Test + public void extendingNonOwnedCompositeBufferMustThrow() { + try (Allocator allocator = Allocator.heap(); + Buf a = allocator.allocate(8); + Buf b = allocator.allocate(8); + Buf composed = allocator.compose(a)) { + try (Buf ignore = composed.acquire()) { + var exc = assertThrows(IllegalStateException.class, () -> Allocator.extend(composed, b)); + assertThat(exc).hasMessageContaining("owned"); + } + } + } + + @Test + public void extendingCompositeBufferWithItselfMustThrow() { + try (Allocator allocator = Allocator.heap()) { + Buf composite; + try (Buf a = allocator.allocate(8)) { + composite = allocator.compose(a); + } + try (composite) { + var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite)); + assertThat(exc).hasMessageContaining("itself"); + } + } + try (Allocator allocator = Allocator.heap(); + Buf composite = allocator.compose()) { + var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, composite)); + assertThat(exc).hasMessageContaining("itself"); + } + } + + @Test + public void extendingCompositeBufferWithNullMustThrow() { + try (Allocator allocator = Allocator.heap(); + Buf composite = allocator.compose()) { + assertThrows(NullPointerException.class, () -> Allocator.extend(composite, null)); + } + } + + @Test + public void extendingCompositeBufferMustIncreaseCapacityByGivenBigEndianBuffer() { + try (Allocator allocator = Allocator.heap(); + Buf composite = allocator.compose()) { + assertThat(composite.capacity()).isZero(); + try (Buf buf = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) { + Allocator.extend(composite, buf); + } + assertThat(composite.capacity()).isEqualTo(8); + composite.writeLong(0x0102030405060708L); + assertThat(composite.readLong()).isEqualTo(0x0102030405060708L); + } + } + + @Test + public void extendingCompositeBufferMustIncreaseCapacityByGivenLittleEndianBuffer() { + try (Allocator allocator = Allocator.heap(); + Buf composite = allocator.compose()) { + assertThat(composite.capacity()).isZero(); + try (Buf buf = allocator.allocate(8, ByteOrder.LITTLE_ENDIAN)) { + Allocator.extend(composite, buf); + } + assertThat(composite.capacity()).isEqualTo(8); + composite.writeLong(0x0102030405060708L); + assertThat(composite.readLong()).isEqualTo(0x0102030405060708L); + } + } + + @Test + public void extendingBigEndianCompositeBufferMustThrowIfExtensionIsLittleEndian() { + try (Allocator allocator = Allocator.heap()) { + Buf composite; + try (Buf a = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) { + composite = allocator.compose(a); + } + try (composite) { + try (Buf b = allocator.allocate(8, ByteOrder.LITTLE_ENDIAN)) { + var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b)); + assertThat(exc).hasMessageContaining("byte order"); + } + } + } + } + + @Test + public void extendingLittleEndianCompositeBufferMustThrowIfExtensionIsBigEndian() { + try (Allocator allocator = Allocator.heap()) { + Buf composite; + try (Buf a = allocator.allocate(8, ByteOrder.LITTLE_ENDIAN)) { + composite = allocator.compose(a); + } + try (composite) { + try (Buf b = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) { + var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b)); + assertThat(exc).hasMessageContaining("byte order"); + } + } + } + } + + @Test + public void emptyCompositeBufferMustAllowExtendingWithBufferWithBigEndianByteOrder() { + try (Allocator allocator = Allocator.heap()) { + try (Buf composite = allocator.compose()) { + try (Buf b = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) { + Allocator.extend(composite, b); + assertThat(composite.order()).isEqualTo(ByteOrder.BIG_ENDIAN); + } + } + } + } + + @Test + public void emptyCompositeBufferMustAllowExtendingWithBufferWithLittleEndianByteOrder() { + try (Allocator allocator = Allocator.heap()) { + try (Buf composite = allocator.compose()) { + try (Buf b = allocator.allocate(8, ByteOrder.LITTLE_ENDIAN)) { + Allocator.extend(composite, b); + assertThat(composite.order()).isEqualTo(ByteOrder.LITTLE_ENDIAN); + } + } + } + } + + @Test + public void whenExtendingCompositeBufferWithWriteOffsetAtCapacityExtensionWriteOffsetCanBeNonZero() { + try (Allocator allocator = Allocator.heap()) { + Buf composite; + try (Buf a = allocator.allocate(8)) { + composite = allocator.compose(a); + } + try (composite) { + composite.writeLong(0); + try (Buf b = allocator.allocate(8)) { + b.writeInt(1); + Allocator.extend(composite, b); + assertThat(composite.capacity()).isEqualTo(16); + assertThat(composite.writerOffset()).isEqualTo(12); + } + } + } + } + + @Test + public void whenExtendingCompositeBufferWithWriteOffsetLessThanCapacityExtensionWriteOffsetMustZero() { + try (Allocator allocator = Allocator.heap()) { + Buf composite; + try (Buf a = allocator.allocate(8)) { + composite = allocator.compose(a); + } + try (composite) { + composite.writeInt(0); + try (Buf b = allocator.allocate(8)) { + b.writeInt(1); + var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b)); + assertThat(exc).hasMessageContaining("unwritten gap"); + b.writerOffset(0); + Allocator.extend(composite, b); + assertThat(composite.capacity()).isEqualTo(16); + assertThat(composite.writerOffset()).isEqualTo(4); + } + } + } + } + + @Test + public void whenExtendingCompositeBufferWithReadOffsetAtCapacityExtensionReadOffsetCanBeNonZero() { + try (Allocator allocator = Allocator.heap()) { + Buf composite; + try (Buf a = allocator.allocate(8)) { + composite = allocator.compose(a); + } + try (composite) { + composite.writeLong(0); + composite.readLong(); + try (Buf b = allocator.allocate(8)) { + b.writeInt(1); + b.readInt(); + Allocator.extend(composite, b); + assertThat(composite.capacity()).isEqualTo(16); + assertThat(composite.writerOffset()).isEqualTo(12); + } + } + } + } + + @Test + public void whenExtendingCompositeBufferWithReadOffsetLessThanCapacityExtensionReadOffsetMustZero() { + try (Allocator allocator = Allocator.heap()) { + Buf composite; + try (Buf a = allocator.allocate(8)) { + composite = allocator.compose(a); + } + try (composite) { + composite.writeLong(0); + composite.readInt(); + try (Buf b = allocator.allocate(8)) { + b.writeInt(1); + b.readInt(); + var exc = assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b)); + assertThat(exc).hasMessageContaining("unread gap"); + b.readerOffset(0); + Allocator.extend(composite, b); + assertThat(composite.capacity()).isEqualTo(16); + assertThat(composite.writerOffset()).isEqualTo(12); + assertThat(composite.readerOffset()).isEqualTo(4); + } + } + } + } + // @ParameterizedTest @MethodSource("allocators")