From 77754609840f8ce53c50e45255ebf79378a0e05d Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 22 Apr 2021 16:57:31 +0200 Subject: [PATCH] Make bifurcate and ensureWritable more flexible This supports more use cases. The ensureWritable method can now amortise its allocation cost by allocating more than what is strictly necessary to satisfy the immediate call. The bifurcate method can now split at a given offset. --- src/main/java/io/netty/buffer/api/Buffer.java | 104 ++++++++++++++++-- .../io/netty/buffer/api/CompositeBuffer.java | 35 ++++-- src/main/java/io/netty/buffer/api/Send.java | 2 +- .../buffer/api/bytebuffer/NioBuffer.java | 28 +++-- .../netty/buffer/api/memseg/MemSegBuffer.java | 28 +++-- .../netty/buffer/api/unsafe/UnsafeBuffer.java | 32 ++++-- .../buffer/api/BufferEnsureWritableTest.java | 24 +++- .../api/BufferReferenceCountingTest.java | 88 +++++++++++++++ .../netty/buffer/api/BufferTestSupport.java | 12 +- 9 files changed, 299 insertions(+), 54 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/Buffer.java b/src/main/java/io/netty/buffer/api/Buffer.java index 81e1d53..b3f382d 100644 --- a/src/main/java/io/netty/buffer/api/Buffer.java +++ b/src/main/java/io/netty/buffer/api/Buffer.java @@ -54,8 +54,6 @@ import java.nio.ByteOrder; * such as with the {@link #getByte(int)} method, * from multiple threads. *

- * Confined buffers will initially be confined to the thread that allocates them. - *

* If a buffer needs to be accessed by a different thread, * then the ownership of that buffer must be sent to that thread. * This can be done with the {@link #send()} method. @@ -101,6 +99,34 @@ import java.nio.ByteOrder; * 0 <= readerOffset <= writerOffset <= capacity * * + *

Slice vs. Bifurcate

+ * + * The {@link #slice()} and {@link #bifurcate()} methods both return new buffers on the memory of the buffer they're + * called on. + * However, there are also important differences between the two, as they are aimed at different use cases that were + * previously (in the {@code ByteBuf} API) covered by {@code slice()} alone. + * + * + * + * These differences means that slicing is mostly suitable for when you temporarily want to share a focused area of a + * buffer. + * Examples of this include doing IO, or decoding a bounded part of a larger message. + * On the other hand, bifurcate is suitable for when you want to hand over a region of a buffer to some other, + * perhaps unknown, piece of code, and relinquish your ownership of that buffer region in the process. + * Examples of include aggregating messages into an accumulator buffer, and sending messages down the pipeline for + * further processing, as bifurcated buffer regions, once their data has been received in its entirety. */ public interface Buffer extends Rc, BufferAccessors { /** @@ -432,7 +458,7 @@ public interface Buffer extends Rc, BufferAccessors { * If this buffer already has the necessary space, then this method returns immediately. * If this buffer does not already have the necessary space, then it will be expanded using the * {@link BufferAllocator} the buffer was created with. - * This method is the same as calling {@link #ensureWritable(int, boolean)} where {@code allowCompaction} is + * This method is the same as calling {@link #ensureWritable(int, int, boolean)} where {@code allowCompaction} is * {@code false}. * * @param size The requested number of bytes of space that should be available for writing. @@ -440,7 +466,7 @@ public interface Buffer extends Rc, BufferAccessors { * or is {@linkplain #readOnly() read-only}. */ default void ensureWritable(int size) { - ensureWritable(size, true); + ensureWritable(size, 1, true); } /** @@ -472,13 +498,17 @@ public interface Buffer extends Rc, BufferAccessors { * * * @param size The requested number of bytes of space that should be available for writing. + * @param minimumGrowth The minimum number of bytes to grow by. If it is determined that memory should be allocated + * and copied, make sure that the new memory allocation is bigger than the old one by at least + * this many bytes. This way, the buffer can grow by more than what is immediately necessary, + * thus amortising the costs of allocating and copying. * @param allowCompaction {@code true} if the method is allowed to modify the * {@linkplain #readerOffset() reader offset} and * {@linkplain #writerOffset() writer offset}, otherwise {@code false}. * @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state, - * * or is {@linkplain #readOnly() read-only}. + * or is {@linkplain #readOnly() read-only}. */ - void ensureWritable(int size, boolean allowCompaction); + void ensureWritable(int size, int minimumGrowth, boolean allowCompaction); /** * Returns a slice of this buffer's readable bytes. @@ -492,6 +522,9 @@ public interface Buffer extends Rc, BufferAccessors { *

* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice, * so that the entire contents of the slice is ready to be read. + *

+ * See the Slice vs. Bifurcate section for details on the difference between slice + * and bifurcate. * * @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()}, * that is a view of the readable region of this buffer. @@ -511,6 +544,9 @@ public interface Buffer extends Rc, BufferAccessors { *

* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice, * so that the entire contents of the slice is ready to be read. + *

+ * See the Slice vs. Bifurcate section for details on the difference between slice + * and bifurcate. * * @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()}, * that is a view of the given region of this buffer. @@ -558,10 +594,64 @@ public interface Buffer extends Rc, BufferAccessors { * simply split its internal array in two. *

* Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}. + *

+ * See the Slice vs. Bifurcate section for details on the difference between slice + * and bifurcate. * * @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer. */ - Buffer bifurcate(); + default Buffer bifurcate() { + return bifurcate(writerOffset()); + } + + /** + * Split the buffer into two, at the given {@code splitOffset}. + *

+ * The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown. + *

+ * The region of this buffer that precede the {@code splitOffset}, will be captured and returned in a new + * buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently + * {@linkplain #send() sent} to other threads. + *

+ * The returned buffer will adopt the {@link #readerOffset()} and {@link #writerOffset()} of this buffer, + * but truncated to fit within the capacity dictated by the {@code splitOffset}. + *

+ * The memory region in the returned buffer will become inaccessible through this buffer. If the + * {@link #readerOffset()} or {@link #writerOffset()} of this buffer lie prior to the {@code splitOffset}, + * then those offsets will be moved forward so they land on offset 0 after the bifurcation. + *

+ * Effectively, the following transformation takes place: + *

{@code
+     *         This buffer:
+     *          +--------------------------------+
+     *         0|               |splitOffset     |cap
+     *          +---------------+----------------+
+     *         /               / \               \
+     *        /               /   \               \
+     *       /               /     \               \
+     *      /               /       \               \
+     *     /               /         \               \
+     *    +---------------+           +---------------+
+     *    |               |cap        |               |cap
+     *    +---------------+           +---------------+
+     *    Returned buffer.            This buffer.
+     * }
+ * When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the + * underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until + * all of the bifurcated parts have been closed. + *

+ * Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be + * bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can + * simply split its internal array in two. + *

+ * Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}. + *

+ * See the Slice vs. Bifurcate section for details on the difference between slice + * and bifurcate. + * + * @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer. + */ + Buffer bifurcate(int splitOffset); /** * Discards the read bytes, and moves the buffer contents to the beginning of the buffer. diff --git a/src/main/java/io/netty/buffer/api/CompositeBuffer.java b/src/main/java/io/netty/buffer/api/CompositeBuffer.java index c986d65..7c72a1c 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuffer.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuffer.java @@ -606,13 +606,16 @@ final class CompositeBuffer extends RcSupport implement } @Override - public void ensureWritable(int size, boolean allowCompaction) { + public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) { if (!isOwned()) { throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable."); } if (size < 0) { throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); } + if (minimumGrowth < 0) { + throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.'); + } if (readOnly) { throw bufferIsReadOnly(); } @@ -649,13 +652,20 @@ final class CompositeBuffer extends RcSupport implement // Now we have enough space. return; } + } else if (bufs.length == 1) { + // If we only have a single component buffer, then we can safely compact that in-place. + bufs[0].compact(); + computeBufferOffsets(); + if (writableBytes() >= size) { + // Now we have enough space. + return; + } } } - long newSize = capacity() + (long) size; - BufferAllocator.checkSize(newSize); - int growth = size - writableBytes(); - Buffer extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order()); + int growth = Math.max(size - writableBytes(), minimumGrowth); + BufferAllocator.checkSize(capacity() + (long) growth); + Buffer extension = allocator.allocate(growth, order()); unsafeExtendWith(extension); } @@ -739,7 +749,14 @@ final class CompositeBuffer extends RcSupport implement } @Override - public Buffer bifurcate() { + public Buffer bifurcate(int splitOffset) { + if (splitOffset < 0) { + throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.'); + } + if (capacity() < splitOffset) { + throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " + + "but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.'); + } if (!isOwned()) { throw new IllegalStateException("Cannot bifurcate a buffer that is not owned."); } @@ -748,12 +765,12 @@ final class CompositeBuffer extends RcSupport implement return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order); } - int i = searchOffsets(woff); - int off = woff - offsets[i]; + int i = searchOffsets(splitOffset); + int off = splitOffset - offsets[i]; Buffer[] bifs = Arrays.copyOf(bufs, off == 0? i : 1 + i); bufs = Arrays.copyOfRange(bufs, off == bufs[i].capacity()? 1 + i : i, bufs.length); if (off > 0 && bifs.length > 0 && off < bifs[bifs.length - 1].capacity()) { - bifs[bifs.length - 1] = bufs[0].bifurcate(); + bifs[bifs.length - 1] = bufs[0].bifurcate(off); } computeBufferOffsets(); try { diff --git a/src/main/java/io/netty/buffer/api/Send.java b/src/main/java/io/netty/buffer/api/Send.java index 005c9c5..4e4d62d 100644 --- a/src/main/java/io/netty/buffer/api/Send.java +++ b/src/main/java/io/netty/buffer/api/Send.java @@ -101,7 +101,7 @@ public interface Send> extends Deref { * @param The result type of the mapping function. * @return A new {@link Send} instance that will deliver an object that is the result of the mapping. */ - default > Send map(Class type, Function mapper) { + default > Send map(Class type, Function mapper) { return sending(type, () -> mapper.apply(receive())); } diff --git a/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java b/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java index 7fbb7c6..24f5f27 100644 --- a/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java +++ b/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java @@ -364,7 +364,7 @@ class NioBuffer extends RcSupport implements Buffer, Readable } @Override - public void ensureWritable(int size, boolean allowCompaction) { + public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) { if (!isOwned()) { throw attachTrace(new IllegalStateException( "Buffer is not owned. Only owned buffers can call ensureWritable.")); @@ -372,6 +372,9 @@ class NioBuffer extends RcSupport implements Buffer, Readable if (size < 0) { throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); } + if (minimumGrowth < 0) { + throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.'); + } if (rmem != wmem) { throw bufferIsReadOnly(); } @@ -387,7 +390,7 @@ class NioBuffer extends RcSupport implements Buffer, Readable } // Allocate a bigger buffer. - long newSize = capacity() + size - (long) writableBytes(); + long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth); BufferAllocator.checkSize(newSize); ByteBuffer buffer = (ByteBuffer) control.allocateUntethered(this, (int) newSize); buffer.order(order()); @@ -414,26 +417,33 @@ class NioBuffer extends RcSupport implements Buffer, Readable } @Override - public Buffer bifurcate() { + public Buffer bifurcate(int splitOffset) { + if (splitOffset < 0) { + throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.'); + } + if (capacity() < splitOffset) { + throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " + + "but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.'); + } if (!isOwned()) { throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); } var drop = (ArcDrop) unsafeGetDrop(); unsafeSetDrop(new ArcDrop<>(drop)); - var bifurcatedBuffer = rmem.slice(0, woff); + var bifurcatedBuffer = rmem.slice(0, splitOffset); // TODO maybe incrementing the existing ArcDrop is enough; maybe we don't need to wrap it in another ArcDrop. var bifurcatedBuf = new NioBuffer(base, bifurcatedBuffer, control, new ArcDrop<>(drop.increment())); - bifurcatedBuf.woff = woff; - bifurcatedBuf.roff = roff; + bifurcatedBuf.woff = Math.min(woff, splitOffset); + bifurcatedBuf.roff = Math.min(roff, splitOffset); bifurcatedBuf.order(order()); boolean readOnly = readOnly(); bifurcatedBuf.readOnly(readOnly); - rmem = rmem.slice(woff, rmem.capacity() - woff); + rmem = rmem.slice(splitOffset, rmem.capacity() - splitOffset); if (!readOnly) { wmem = rmem; } - woff = 0; - roff = 0; + woff = Math.max(woff, splitOffset) - splitOffset; + roff = Math.max(roff, splitOffset) - splitOffset; return bifurcatedBuf; } diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java index 75d2f99..1f7f16f 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java @@ -490,7 +490,7 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } @Override - public void ensureWritable(int size, boolean allowCompaction) { + public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) { if (!isOwned()) { throw attachTrace(new IllegalStateException( "Buffer is not owned. Only owned buffers can call ensureWritable.")); @@ -498,6 +498,9 @@ class MemSegBuffer extends RcSupport implements Buffer, Re if (size < 0) { throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); } + if (minimumGrowth < 0) { + throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.'); + } if (seg != wseg) { throw bufferIsReadOnly(); } @@ -513,7 +516,7 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } // Allocate a bigger buffer. - long newSize = capacity() + size - (long) writableBytes(); + long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth); BufferAllocator.checkSize(newSize); MemorySegment newSegment = (MemorySegment) alloc.allocateUntethered(this, (int) newSize); @@ -545,25 +548,32 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } @Override - public Buffer bifurcate() { + public Buffer bifurcate(int splitOffset) { + if (splitOffset < 0) { + throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.'); + } + if (capacity() < splitOffset) { + throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " + + "but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.'); + } if (!isOwned()) { throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); } var drop = (ArcDrop) unsafeGetDrop(); unsafeSetDrop(new ArcDrop<>(drop)); - var bifurcatedSeg = seg.asSlice(0, woff); + var bifurcatedSeg = seg.asSlice(0, splitOffset); var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop<>(drop.increment()), alloc); - bifurcatedBuf.woff = woff; - bifurcatedBuf.roff = roff; + bifurcatedBuf.woff = Math.min(woff, splitOffset); + bifurcatedBuf.roff = Math.min(roff, splitOffset); bifurcatedBuf.order(order); boolean readOnly = readOnly(); bifurcatedBuf.readOnly(readOnly); - seg = seg.asSlice(woff, seg.byteSize() - woff); + seg = seg.asSlice(splitOffset, seg.byteSize() - splitOffset); if (!readOnly) { wseg = seg; } - woff = 0; - roff = 0; + woff = Math.max(woff, splitOffset) - splitOffset; + roff = Math.max(roff, splitOffset) - splitOffset; return bifurcatedBuf; } diff --git a/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java b/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java index 3b824e4..3be6fac 100644 --- a/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java +++ b/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java @@ -398,7 +398,7 @@ public class UnsafeBuffer extends RcSupport implements Buf } @Override - public void ensureWritable(int size, boolean allowCompaction) { + public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) { if (!isOwned()) { throw attachTrace(new IllegalStateException( "Buffer is not owned. Only owned buffers can call ensureWritable.")); @@ -406,6 +406,9 @@ public class UnsafeBuffer extends RcSupport implements Buf if (size < 0) { throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); } + if (minimumGrowth < 0) { + throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.'); + } if (rsize != wsize) { throw bufferIsReadOnly(); } @@ -421,7 +424,7 @@ public class UnsafeBuffer extends RcSupport implements Buf } // Allocate a bigger buffer. - long newSize = capacity() + size - (long) writableBytes(); + long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth); BufferAllocator.checkSize(newSize); UnsafeMemory memory = (UnsafeMemory) control.allocateUntethered(this, (int) newSize); @@ -455,27 +458,34 @@ public class UnsafeBuffer extends RcSupport implements Buf } @Override - public Buffer bifurcate() { + public Buffer bifurcate(int splitOffset) { + if (splitOffset < 0) { + throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.'); + } + if (capacity() < splitOffset) { + throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " + + "but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.'); + } if (!isOwned()) { throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); } var drop = (ArcDrop) unsafeGetDrop(); unsafeSetDrop(new ArcDrop<>(drop)); // TODO maybe incrementing the existing ArcDrop is enough; maybe we don't need to wrap it in another ArcDrop. - var bifurcatedBuf = new UnsafeBuffer(memory, baseOffset, woff, control, new ArcDrop<>(drop.increment())); - bifurcatedBuf.woff = woff; - bifurcatedBuf.roff = roff; + var bifurcatedBuf = new UnsafeBuffer(memory, baseOffset, splitOffset, control, new ArcDrop<>(drop.increment())); + bifurcatedBuf.woff = Math.min(woff, splitOffset); + bifurcatedBuf.roff = Math.min(roff, splitOffset); bifurcatedBuf.order(order()); boolean readOnly = readOnly(); bifurcatedBuf.readOnly(readOnly); - rsize -= woff; - baseOffset += woff; - address += woff; + rsize -= splitOffset; + baseOffset += splitOffset; + address += splitOffset; if (!readOnly) { wsize = rsize; } - woff = 0; - roff = 0; + woff = Math.max(woff, splitOffset) - splitOffset; + roff = Math.max(roff, splitOffset) - splitOffset; return bifurcatedBuf; } diff --git a/src/test/java/io/netty/buffer/api/BufferEnsureWritableTest.java b/src/test/java/io/netty/buffer/api/BufferEnsureWritableTest.java index 4d24bf3..eb436bb 100644 --- a/src/test/java/io/netty/buffer/api/BufferEnsureWritableTest.java +++ b/src/test/java/io/netty/buffer/api/BufferEnsureWritableTest.java @@ -53,8 +53,8 @@ public class BufferEnsureWritableTest extends BufferTestSupport { @MethodSource("allocators") public void ensureWritableMustThrowIfRequestedSizeWouldGrowBeyondMaxAllowed(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); - Buffer buf = allocator.allocate(512)) { - assertThrows(IllegalArgumentException.class, () -> buf.ensureWritable(Integer.MAX_VALUE - 8)); + Buffer buf = allocator.allocate(8)) { + assertThrows(IllegalArgumentException.class, () -> buf.ensureWritable(Integer.MAX_VALUE - 7)); } } @@ -130,15 +130,31 @@ public class BufferEnsureWritableTest extends BufferTestSupport { while (buf.readableBytes() > 0) { buf.readByte(); } - buf.ensureWritable(4, true); + buf.ensureWritable(4, 4, true); buf.writeInt(42); assertThat(buf.capacity()).isEqualTo(64); buf.writerOffset(60).readerOffset(60); - buf.ensureWritable(8, true); + buf.ensureWritable(8, 8, true); buf.writeLong(42); // Don't assert the capacity on this one, because single-component // composite buffers may choose to allocate rather than compact. } } + + @ParameterizedTest + @MethodSource("allocators") + public void ensureWritableWithLargeMinimumGrowthMustGrowByAtLeastThatMuch(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(16)) { + buf.writeLong(0).writeInt(0); + buf.readLong(); + buf.readInt(); // Compaction is now possible as well. + buf.ensureWritable(8, 32, true); // We don't need to allocate. + assertThat(buf.capacity()).isEqualTo(16); + buf.writeByte((byte) 1); + buf.ensureWritable(16, 32, true); // Now we DO need to allocate, because we can't compact. + assertThat(buf.capacity()).isEqualTo(16 /* existing capacity */ + 32 /* minimum growth */); + } + } } diff --git a/src/test/java/io/netty/buffer/api/BufferReferenceCountingTest.java b/src/test/java/io/netty/buffer/api/BufferReferenceCountingTest.java index 5d8f27f..a8974fd 100644 --- a/src/test/java/io/netty/buffer/api/BufferReferenceCountingTest.java +++ b/src/test/java/io/netty/buffer/api/BufferReferenceCountingTest.java @@ -401,6 +401,26 @@ public class BufferReferenceCountingTest extends BufferTestSupport { } } + @ParameterizedTest + @MethodSource("allocators") + public void bifurcateWithNegativeOffsetMustThrow(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(8)) { + buf.bifurcate(0).close(); + assertThrows(IllegalArgumentException.class, () -> buf.bifurcate(-1)); + } + } + + @ParameterizedTest + @MethodSource("allocators") + public void bifurcateWithOversizedOffsetMustThrow(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(8)) { + assertThrows(IllegalArgumentException.class, () -> buf.bifurcate(9)); + buf.bifurcate(8).close(); + } + } + @ParameterizedTest @MethodSource("allocators") public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) { @@ -414,6 +434,53 @@ public class BufferReferenceCountingTest extends BufferTestSupport { } } + @ParameterizedTest + @MethodSource("allocators") + public void bifurcateOnOffsetOfNonOwnedBufferMustThrow(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(8)) { + try (Buffer acquired = buf.acquire()) { + var exc = assertThrows(IllegalStateException.class, () -> acquired.bifurcate(4)); + assertThat(exc).hasMessageContaining("owned"); + } + } + } + + @ParameterizedTest + @MethodSource("allocators") + public void bifurcateOnOffsetMustTruncateGreaterOffsets(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(8)) { + buf.writeInt(0x01020304); + buf.writeByte((byte) 0x05); + buf.readInt(); + try (Buffer bif = buf.bifurcate(2)) { + assertThat(buf.readerOffset()).isEqualTo(2); + assertThat(buf.writerOffset()).isEqualTo(3); + + assertThat(bif.readerOffset()).isEqualTo(2); + assertThat(bif.writerOffset()).isEqualTo(2); + } + } + } + + @ParameterizedTest + @MethodSource("allocators") + public void bifurcateOnOffsetMustExtendLesserOffsets(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(8)) { + buf.writeInt(0x01020304); + buf.readInt(); + try (Buffer bif = buf.bifurcate(6)) { + assertThat(buf.readerOffset()).isEqualTo(0); + assertThat(buf.writerOffset()).isEqualTo(0); + + assertThat(bif.readerOffset()).isEqualTo(4); + assertThat(bif.writerOffset()).isEqualTo(4); + } + } + } + @ParameterizedTest @MethodSource("allocators") public void bifurcatedPartMustContainFirstHalfOfBuffer(Fixture fixture) { @@ -507,6 +574,27 @@ public class BufferReferenceCountingTest extends BufferTestSupport { } } + @ParameterizedTest + @MethodSource("allocators") + public void mustBePossibleToBifurcateOwnedSlices(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator()) { + Buffer buf = allocator.allocate(16).order(BIG_ENDIAN); + buf.writeLong(0x0102030405060708L); + try (Buffer slice = buf.slice()) { + buf.close(); + assertTrue(slice.isOwned()); + try (Buffer bifurcate = slice.bifurcate(4)) { + bifurcate.reset().ensureWritable(Long.BYTES); + slice.reset().ensureWritable(Long.BYTES); + assertThat(bifurcate.capacity()).isEqualTo(Long.BYTES); + assertThat(slice.capacity()).isEqualTo(Long.BYTES); + assertThat(bifurcate.getLong(0)).isEqualTo(0x01020304_00000000L); + assertThat(slice.getLong(0)).isEqualTo(0x05060708_00000000L); + } + } + } + } + @ParameterizedTest @MethodSource("allocators") public void bifurcatedBufferMustHaveSameByteOrderAsParent(Fixture fixture) { diff --git a/src/test/java/io/netty/buffer/api/BufferTestSupport.java b/src/test/java/io/netty/buffer/api/BufferTestSupport.java index ca6bd14..7e56591 100644 --- a/src/test/java/io/netty/buffer/api/BufferTestSupport.java +++ b/src/test/java/io/netty/buffer/api/BufferTestSupport.java @@ -92,13 +92,13 @@ public abstract class BufferTestSupport { if ("nosample".equalsIgnoreCase(sampleSetting)) { return fixture -> true; } - Instant today = Instant.now().truncatedTo(ChronoUnit.DAYS); + Instant today = Instant.now().truncatedTo(ChronoUnit.DAYS); // New seed every day. SplittableRandom rng = new SplittableRandom(today.hashCode()); AtomicInteger counter = new AtomicInteger(); return fixture -> { - boolean res = counter.getAndIncrement() < 1 || rng.nextInt(0, 100) <= 2; - return res; - }; // Filter out 97% of tests. + // Filter out 95% of tests. + return counter.getAndIncrement() < 1 || rng.nextInt(0, 100) < 5; + }; } static Fixture[] allocators() { @@ -977,6 +977,10 @@ public abstract class BufferTestSupport { return bs; } + public static void assertEquals(Buffer expected, Buffer actual) { + assertThat(toByteArray(actual)).containsExactly(toByteArray(expected)); + } + public static void assertEquals(byte expected, byte actual) { if (expected != actual) { fail(String.format("expected: %1$s (0x%1$X) but was: %2$s (0x%2$X)", expected, actual));