diff --git a/src/main/java/io/netty/buffer/api/AllocatorControl.java b/src/main/java/io/netty/buffer/api/AllocatorControl.java index 25aa6f6..53a1bf3 100644 --- a/src/main/java/io/netty/buffer/api/AllocatorControl.java +++ b/src/main/java/io/netty/buffer/api/AllocatorControl.java @@ -28,7 +28,7 @@ public interface AllocatorControl { * This allows a buffer to implement {@link Buf#ensureWritable(int)} by having new memory allocated to it, * without that memory being attached to some other lifetime. * - * @param originator The buffer that originated the request for an untethered memory allocated. + * @param originator The buffer that originated the request for an untethered memory allocated. * @param size The size of the requested memory allocation, in bytes. * @return A "recoverable memory" object that is the requested allocation. */ diff --git a/src/main/java/io/netty/buffer/api/Buf.java b/src/main/java/io/netty/buffer/api/Buf.java index 946f473..98404a1 100644 --- a/src/main/java/io/netty/buffer/api/Buf.java +++ b/src/main/java/io/netty/buffer/api/Buf.java @@ -373,4 +373,48 @@ public interface Buf extends Rc, BufAccessors { * That is, if {@link #countBorrows()} is not {@code 0}. */ void ensureWritable(int size); + + /** + * Split the buffer into two, at the {@linkplain #writerOffset() write offset} position. + *

+ * The region of this buffer that contain the read and readable bytes, will be captured and returned in a new + * buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently + * {@linkplain #send() sent} to other threads. + *

+ * The returned buffer will adopt the {@link #readerOffset()} of this buffer, and have its {@link #writerOffset()} + * and {@link #capacity()} both set to the equal to the write offset of this buffer. + *

+ * The memory region in the returned buffer will become inaccessible through this buffer. This buffer will have its + * capacity reduced by the capacity of the returned buffer, and the read and write offsets of this buffer will both + * become zero, even though their position in memory remain unchanged. + *

+ * Effectively, the following transformation takes place: + *

{@code
+     *         This buffer:
+     *          +------------------------------------------+
+     *         0|   |r/o                  |w/o             |cap
+     *          +---+---------------------+----------------+
+     *         /   /                     / \               \
+     *        /   /                     /   \               \
+     *       /   /                     /     \               \
+     *      /   /                     /       \               \
+     *     /   /                     /         \               \
+     *    +---+---------------------+           +---------------+
+     *    |   |r/o                  |w/o & cap  |r/o & w/o      |cap
+     *    +---+---------------------+           +---------------+
+     *    Returned buffer.                      This buffer.
+     * }
+ * When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the + * underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until + * all of the bifurcated parts have been closed. + *

+ * Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be + * bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can + * simply split its internal array in two. + *

+ * Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}. + * + * @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer. + */ + Buf bifurcate(); } diff --git a/src/main/java/io/netty/buffer/api/CompositeBuf.java b/src/main/java/io/netty/buffer/api/CompositeBuf.java index 0f1346e..7d2577e 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuf.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuf.java @@ -27,12 +27,9 @@ final class CompositeBuf extends RcSupport implements Buf { * non-composite copy of the buffer. */ private static final int MAX_CAPACITY = Integer.MAX_VALUE - 8; - private static final Drop COMPOSITE_DROP = new Drop() { - @Override - public void drop(CompositeBuf obj) { - for (Buf buf : obj.bufs) { - buf.close(); - } + private static final Drop COMPOSITE_DROP = buf -> { + for (Buf b : buf.bufs) { + b.close(); } }; @@ -45,6 +42,7 @@ final class CompositeBuf extends RcSupport implements Buf { private int roff; private int woff; private int subOffset; // The next offset *within* a consituent buffer to read from or write to. + private ByteOrder order; CompositeBuf(Allocator allocator, Buf[] bufs) { this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array. @@ -64,6 +62,9 @@ final class CompositeBuf extends RcSupport implements Buf { throw new IllegalArgumentException("Constituent buffers have inconsistent byte order."); } } + order = bufs[0].order(); + } else { + order = ByteOrder.nativeOrder(); } this.bufs = bufs; computeBufferOffsets(); @@ -129,15 +130,18 @@ final class CompositeBuf extends RcSupport implements Buf { @Override public Buf order(ByteOrder order) { - for (Buf buf : bufs) { - buf.order(order); + if (this.order != order) { + this.order = order; + for (Buf buf : bufs) { + buf.order(order); + } } return this; } @Override public ByteOrder order() { - return bufs.length > 0? bufs[0].order() : ByteOrder.nativeOrder(); + return order; } @Override @@ -543,6 +547,9 @@ final class CompositeBuf extends RcSupport implements Buf { "This buffer uses " + order() + " byte order, and cannot be extended with " + "a buffer that uses " + extension.order() + " byte order."); } + if (bufs.length == 0) { + order = extension.order(); + } long newSize = capacity() + (long) extension.capacity(); Allocator.checkSize(newSize); @@ -561,6 +568,36 @@ final class CompositeBuf extends RcSupport implements Buf { computeBufferOffsets(); } + @Override + public Buf bifurcate() { + if (!isOwned()) { + throw new IllegalStateException("Cannot bifurcate a buffer that is not owned."); + } + if (bufs.length == 0) { + // Bifurcating a zero-length buffer is trivial. + return new CompositeBuf(allocator, true, bufs, unsafeGetDrop()).order(order); + } + + int i = searchOffsets(woff); + int off = woff - offsets[i]; + Buf[] bifs = Arrays.copyOf(bufs, off == 0? i : 1 + i); + bufs = Arrays.copyOfRange(bufs, off == bufs[i].capacity()? 1 + i : i, bufs.length); + if (off > 0 && bifs.length > 0 && off < bifs[bifs.length - 1].capacity()) { + bifs[bifs.length - 1] = bufs[0].bifurcate(); + } + computeBufferOffsets(); + try { + var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop()); + compositeBuf.order = order; // Preserve byte order even if bifs array is empty. + return compositeBuf; + } finally { + // Drop our references to the buffers in the bifs array. They belong to the new composite buffer now. + for (Buf bif : bifs) { + bif.close(); + } + } + } + // @Override public byte readByte() { @@ -856,7 +893,7 @@ final class CompositeBuf extends RcSupport implements Buf { received[i] = sends[i].receive(); } var composite = new CompositeBuf(allocator, true, received, drop); - drop.accept(composite); + drop.attach(composite); return composite; } }; diff --git a/src/main/java/io/netty/buffer/api/Drop.java b/src/main/java/io/netty/buffer/api/Drop.java index 21fde57..1658c7e 100644 --- a/src/main/java/io/netty/buffer/api/Drop.java +++ b/src/main/java/io/netty/buffer/api/Drop.java @@ -15,8 +15,6 @@ */ package io.netty.buffer.api; -import java.util.function.Consumer; - /** * The Drop interface is used by {@link Rc} instances to implement their resource disposal mechanics. The {@link * #drop(Object)} method will be called by the Rc when their last reference is closed. @@ -24,7 +22,7 @@ import java.util.function.Consumer; * @param */ @FunctionalInterface -public interface Drop extends Consumer { +public interface Drop { /** * Dispose of the resources in the given Rc. * @@ -37,7 +35,6 @@ public interface Drop extends Consumer { * * @param obj The new Rc instance with the new owner. */ - @Override - default void accept(T obj) { + default void attach(T obj) { } } diff --git a/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java b/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java index 68c4c1f..1f0a9ef 100644 --- a/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java +++ b/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java @@ -49,7 +49,7 @@ class NativeMemoryCleanerDrop implements Drop { } @Override - public void accept(Buf buf) { + public void attach(Buf buf) { // Unregister old cleanable, if any, to avoid uncontrolled build-up. GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null); if (c != null) { diff --git a/src/main/java/io/netty/buffer/api/RcSupport.java b/src/main/java/io/netty/buffer/api/RcSupport.java index ff75c74..4e59120 100644 --- a/src/main/java/io/netty/buffer/api/RcSupport.java +++ b/src/main/java/io/netty/buffer/api/RcSupport.java @@ -15,9 +15,12 @@ */ package io.netty.buffer.api; +import java.util.Objects; +import java.util.function.Function; + public abstract class RcSupport, T extends RcSupport> implements Rc { private int acquires; // Closed if negative. - private final Drop drop; + private Drop drop; protected RcSupport(Drop drop) { this.drop = drop; @@ -114,6 +117,11 @@ public abstract class RcSupport, T extends RcSupport> impl return drop; } + protected Drop unsafeExchangeDrop(Drop replacement) { + drop = Objects.requireNonNull(replacement, "Replacement drop cannot be null."); + return replacement; + } + @SuppressWarnings("unchecked") private I self() { return (I) this; diff --git a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java index dc547e1..5b178ae 100644 --- a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java +++ b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java @@ -54,7 +54,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop { protected Buf createBuf(int size, Drop drop) { var buf = manager.allocateShared(this, size, drop, null); - drop.accept(buf); + drop.attach(buf); return buf; } @@ -119,7 +119,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop { public void recoverMemory(Object memory) { var drop = getDrop(); var buf = manager.recoverMemory(memory, drop); - drop.accept(buf); + drop.attach(buf); buf.close(); } diff --git a/src/main/java/io/netty/buffer/api/TransferSend.java b/src/main/java/io/netty/buffer/api/TransferSend.java index 987187b..24098f4 100644 --- a/src/main/java/io/netty/buffer/api/TransferSend.java +++ b/src/main/java/io/netty/buffer/api/TransferSend.java @@ -37,7 +37,7 @@ class TransferSend, T extends Rc> implements Send { public I receive() { gateReception(); var copy = outgoing.transferOwnership(drop); - drop.accept(copy); + drop.attach(copy); return (I) copy; } diff --git a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java b/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java new file mode 100644 index 0000000..2e17421 --- /dev/null +++ b/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java @@ -0,0 +1,81 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.memseg; + +import io.netty.buffer.api.Drop; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; + +class BifurcatedDrop implements Drop { + private static final VarHandle COUNT; + static { + try { + COUNT = MethodHandles.lookup().findVarHandle(BifurcatedDrop.class, "count", int.class); + } catch (Exception e) { + throw new ExceptionInInitializerError(e); + } + } + + private final T originalBuf; + private final Drop delegate; + @SuppressWarnings("FieldMayBeFinal") + private volatile int count; + + BifurcatedDrop(T originalBuf, Drop delegate) { + this.originalBuf = originalBuf; + this.delegate = delegate; + count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop. + } + + void increment() { + int c; + do { + c = count; + checkValidState(c); + } while (!COUNT.compareAndSet(this, c, c + 1)); + } + + @Override + public synchronized void drop(T obj) { + int c; + int n; + do { + c = count; + n = c - 1; + checkValidState(c); + } while (!COUNT.compareAndSet(this, c, n)); + if (n == 0) { + delegate.attach(originalBuf); + delegate.drop(originalBuf); + } + } + + @Override + public void attach(T obj) { + delegate.attach(obj); + } + + Drop unwrap() { + return delegate; + } + + private static void checkValidState(int count) { + if (count == 0) { + throw new IllegalStateException("Underlying resources have already been freed."); + } + } +} diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java index cc5f2fb..7a11636 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java @@ -315,12 +315,49 @@ class MemSegBuf extends RcSupport implements Buf { RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize); var newSegment = recoverableMemory.segment; newSegment.copyFrom(seg); - alloc.recoverMemory(recoverableMemory()); // Release old memory segment. + + // Release old memory segment: + var drop = unsafeGetDrop(); + if (drop instanceof BifurcatedDrop) { + // Disconnect from the bifurcated drop, since we'll get our own fresh memory segment. + drop.drop(this); + drop = ((BifurcatedDrop) drop).unwrap(); + unsafeExchangeDrop(drop); + } else { + alloc.recoverMemory(recoverableMemory()); + } + seg = newSegment; - unsafeGetDrop().accept(this); + drop.attach(this); } } + @Override + public Buf bifurcate() { + if (!isOwned()) { + throw new IllegalStateException("Cannot bifurcate a buffer that is not owned."); + } + var drop = unsafeGetDrop(); + if (seg.ownerThread() != null) { + seg = seg.share(); + drop.attach(this); + } + if (drop instanceof BifurcatedDrop) { + ((BifurcatedDrop) drop).increment(); + } else { + drop = unsafeExchangeDrop(new BifurcatedDrop(new MemSegBuf(seg, drop, alloc), drop)); + } + var bifurcatedSeg = seg.asSlice(0, woff); + var bifurcatedBuf = new MemSegBuf(bifurcatedSeg, drop, alloc); + bifurcatedBuf.woff = woff; + bifurcatedBuf.roff = roff; + bifurcatedBuf.order(order); + seg = seg.asSlice(woff, seg.byteSize() - woff); + woff = 0; + roff = 0; + return bifurcatedBuf; + } + // @Override public byte readByte() { diff --git a/src/test/java/io/netty/buffer/api/BufTest.java b/src/test/java/io/netty/buffer/api/BufTest.java index 0e4dbba..e043801 100644 --- a/src/test/java/io/netty/buffer/api/BufTest.java +++ b/src/test/java/io/netty/buffer/api/BufTest.java @@ -65,6 +65,16 @@ public class BufTest { return fixtures = fixtureCombinations().toArray(Fixture[]::new); } + static List initialAllocators() { + return List.of( + new Fixture("heap", Allocator::heap, HEAP), + new Fixture("direct", Allocator::direct, DIRECT), + new Fixture("directWithCleaner", Allocator::directWithCleaner, DIRECT, CLEANER), + new Fixture("pooledHeap", Allocator::pooledHeap, POOLED, HEAP), + new Fixture("pooledDirect", Allocator::pooledDirect, POOLED, DIRECT), + new Fixture("pooledDirectWithCleaner", Allocator::pooledDirectWithCleaner, POOLED, DIRECT, CLEANER)); + } + static Stream nonSliceAllocators() { return fixtureCombinations().filter(f -> !f.isSlice()); } @@ -94,13 +104,7 @@ public class BufTest { if (fxs != null) { return Arrays.stream(fxs); } - List initFixtures = List.of( - new Fixture("heap", Allocator::heap, HEAP), - new Fixture("direct", Allocator::direct, DIRECT), - new Fixture("directWithCleaner", Allocator::directWithCleaner, DIRECT, CLEANER), - new Fixture("pooledHeap", Allocator::pooledHeap, POOLED, HEAP), - new Fixture("pooledDirect", Allocator::pooledDirect, POOLED, DIRECT), - new Fixture("pooledDirectWithCleaner", Allocator::pooledDirectWithCleaner, POOLED, DIRECT, CLEANER)); + List initFixtures = initialAllocators(); Builder builder = Stream.builder(); initFixtures.forEach(builder); @@ -192,44 +196,68 @@ public class BufTest { }, COMPOSITE)); } - return builder.build().flatMap(f -> { - // Inject slice versions of everything - Builder andSlices = Stream.builder(); - andSlices.add(f); - andSlices.add(new Fixture(f + ".slice(0, capacity())", () -> { - var allocatorBase = f.get(); - return new Allocator() { - @Override - public Buf allocate(int size) { - try (Buf base = allocatorBase.allocate(size)) { - return base.slice(0, base.capacity()).writerOffset(0); - } - } + return builder.build().flatMap(BufTest::injectBifurcations).flatMap(BufTest::injectSlices); + } - @Override - public void close() { - allocatorBase.close(); - } - }; - }, Properties.SLICE)); - andSlices.add(new Fixture(f + ".slice(1, capacity() - 2)", () -> { - var allocatorBase = f.get(); - return new Allocator() { - @Override - public Buf allocate(int size) { - try (Buf base = allocatorBase.allocate(size + 2)) { - return base.slice(1, size).writerOffset(0); - } + private static Stream injectBifurcations(Fixture f) { + Builder builder = Stream.builder(); + builder.add(f); + builder.add(new Fixture(f + ".bifurcate", () -> { + var allocatorBase = f.get(); + return new Allocator() { + @Override + public Buf allocate(int size) { + try (Buf buf = allocatorBase.allocate(size + 1)) { + buf.writerOffset(size); + return buf.bifurcate().writerOffset(0); } + } - @Override - public void close() { - allocatorBase.close(); + @Override + public void close() { + allocatorBase.close(); + } + }; + }, f.getProperties())); + return builder.build(); + } + + private static Stream injectSlices(Fixture f) { + Builder builder = Stream.builder(); + builder.add(f); + builder.add(new Fixture(f + ".slice(0, capacity())", () -> { + var allocatorBase = f.get(); + return new Allocator() { + @Override + public Buf allocate(int size) { + try (Buf base = allocatorBase.allocate(size)) { + return base.slice(0, base.capacity()).writerOffset(0); } - }; - }, Properties.SLICE)); - return andSlices.build(); - }); + } + + @Override + public void close() { + allocatorBase.close(); + } + }; + }, Properties.SLICE)); + builder.add(new Fixture(f + ".slice(1, capacity() - 2)", () -> { + var allocatorBase = f.get(); + return new Allocator() { + @Override + public Buf allocate(int size) { + try (Buf base = allocatorBase.allocate(size + 2)) { + return base.slice(1, size).writerOffset(0); + } + } + + @Override + public void close() { + allocatorBase.close(); + } + }; + }, Properties.SLICE)); + return builder.build(); } @BeforeAll @@ -342,7 +370,7 @@ public class BufTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("initialAllocators") void mustThrowWhenAllocatingZeroSizedBuffer(Fixture fixture) { try (Allocator allocator = fixture.createAllocator()) { assertThrows(IllegalArgumentException.class, () -> allocator.allocate(0)); @@ -1814,6 +1842,242 @@ public class BufTest { } } + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(8)) { + buf.writeInt(1); + try (Buf acquired = buf.acquire()) { + var exc = assertThrows(IllegalStateException.class, () -> acquired.bifurcate()); + assertThat(exc).hasMessageContaining("owned"); + } + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void bifurcatedPartMustContainFirstHalfOfBuffer(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(16).order(ByteOrder.BIG_ENDIAN)) { + buf.writeLong(0x0102030405060708L); + assertThat(buf.readByte()).isEqualTo((byte) 0x01); + try (Buf bif = buf.bifurcate()) { + // Original buffer: + assertThat(buf.capacity()).isEqualTo(8); + assertThat(buf.readerOffset()).isZero(); + assertThat(buf.writerOffset()).isZero(); + assertThat(buf.readableBytes()).isZero(); + assertThrows(IndexOutOfBoundsException.class, () -> buf.readByte()); + + // Bifurcated part: + assertThat(bif.capacity()).isEqualTo(8); + assertThat(bif.readerOffset()).isOne(); + assertThat(bif.writerOffset()).isEqualTo(8); + assertThat(bif.readableBytes()).isEqualTo(7); + assertThat(bif.readByte()).isEqualTo((byte) 0x02); + assertThat(bif.readInt()).isEqualTo(0x03040506); + assertThat(bif.readByte()).isEqualTo((byte) 0x07); + assertThat(bif.readByte()).isEqualTo((byte) 0x08); + assertThrows(IndexOutOfBoundsException.class, () -> bif.readByte()); + } + + // Bifurcated part does NOT return when closed: + assertThat(buf.capacity()).isEqualTo(8); + assertThat(buf.readerOffset()).isZero(); + assertThat(buf.writerOffset()).isZero(); + assertThat(buf.readableBytes()).isZero(); + assertThrows(IndexOutOfBoundsException.class, () -> buf.readByte()); + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void bifurcatedPartsMustBeIndividuallySendable(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(16).order(ByteOrder.BIG_ENDIAN)) { + buf.writeLong(0x0102030405060708L); + assertThat(buf.readByte()).isEqualTo((byte) 0x01); + try (Buf sentBif = buf.bifurcate().send().receive()) { + try (Buf sentBuf = buf.send().receive()) { + assertThat(sentBuf.capacity()).isEqualTo(8); + assertThat(sentBuf.readerOffset()).isZero(); + assertThat(sentBuf.writerOffset()).isZero(); + assertThat(sentBuf.readableBytes()).isZero(); + assertThrows(IndexOutOfBoundsException.class, () -> sentBuf.readByte()); + } + + assertThat(sentBif.capacity()).isEqualTo(8); + assertThat(sentBif.readerOffset()).isOne(); + assertThat(sentBif.writerOffset()).isEqualTo(8); + assertThat(sentBif.readableBytes()).isEqualTo(7); + assertThat(sentBif.readByte()).isEqualTo((byte) 0x02); + assertThat(sentBif.readInt()).isEqualTo(0x03040506); + assertThat(sentBif.readByte()).isEqualTo((byte) 0x07); + assertThat(sentBif.readByte()).isEqualTo((byte) 0x08); + assertThrows(IndexOutOfBoundsException.class, () -> sentBif.readByte()); + } + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void mustBePossibleToBifurcateMoreThanOnce(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(16).order(ByteOrder.BIG_ENDIAN)) { + buf.writeLong(0x0102030405060708L); + try (Buf a = buf.bifurcate()) { + a.writerOffset(4); + try (Buf b = a.bifurcate()) { + assertEquals(0x01020304, b.readInt()); + a.writerOffset(4); + assertEquals(0x05060708, a.readInt()); + assertThrows(IndexOutOfBoundsException.class, () -> b.readByte()); + assertThrows(IndexOutOfBoundsException.class, () -> a.readByte()); + buf.writeLong(0xA1A2A3A4A5A6A7A8L); + buf.writerOffset(4); + try (Buf c = buf.bifurcate()) { + assertEquals(0xA1A2A3A4, c.readInt()); + buf.writerOffset(4); + assertEquals(0xA5A6A7A8, buf.readInt()); + assertThrows(IndexOutOfBoundsException.class, () -> c.readByte()); + assertThrows(IndexOutOfBoundsException.class, () -> buf.readByte()); + } + } + } + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void bifurcatedBufferMustHaveSameByteOrderAsParent(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(8).order(ByteOrder.BIG_ENDIAN)) { + buf.writeLong(0x0102030405060708L); + try (Buf a = buf.bifurcate()) { + assertThat(a.order()).isEqualTo(ByteOrder.BIG_ENDIAN); + a.order(ByteOrder.LITTLE_ENDIAN); + a.writerOffset(4); + try (Buf b = a.bifurcate()) { + assertThat(b.order()).isEqualTo(ByteOrder.LITTLE_ENDIAN); + assertThat(buf.order()).isEqualTo(ByteOrder.BIG_ENDIAN); + } + } + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void ensureWritableOnBifurcatedBuffers(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(8)) { + buf.writeLong(0x0102030405060708L); + try (Buf a = buf.bifurcate()) { + assertEquals(0x0102030405060708L, a.readLong()); + a.ensureWritable(8); + a.writeLong(0xA1A2A3A4A5A6A7A8L); + assertEquals(0xA1A2A3A4A5A6A7A8L, a.readLong()); + + buf.ensureWritable(8); + buf.writeLong(0xA1A2A3A4A5A6A7A8L); + assertEquals(0xA1A2A3A4A5A6A7A8L, buf.readLong()); + } + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void ensureWritableOnBifurcatedBuffersWithOddOffsets(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(10).order(ByteOrder.BIG_ENDIAN)) { + buf.writeLong(0x0102030405060708L); + buf.writeByte((byte) 0x09); + buf.readByte(); + try (Buf a = buf.bifurcate()) { + assertEquals(0x0203040506070809L, a.readLong()); + a.ensureWritable(8); + a.writeLong(0xA1A2A3A4A5A6A7A8L); + assertEquals(0xA1A2A3A4A5A6A7A8L, a.readLong()); + + buf.ensureWritable(8); + buf.writeLong(0xA1A2A3A4A5A6A7A8L); + assertEquals(0xA1A2A3A4A5A6A7A8L, buf.readLong()); + } + } + } + + @Test + public void bifurcateOnEmptyBigEndianCompositeBuffer() { + try (Allocator allocator = Allocator.heap(); + Buf buf = allocator.compose().order(ByteOrder.BIG_ENDIAN)) { + verifyBifurcateEmptyCompositeBuffer(buf); + } + } + + @Test + public void bifurcateOnEmptyLittleEndianCompositeBuffer() { + try (Allocator allocator = Allocator.heap(); + Buf buf = allocator.compose().order(ByteOrder.LITTLE_ENDIAN)) { + verifyBifurcateEmptyCompositeBuffer(buf); + } + } + + private void verifyBifurcateEmptyCompositeBuffer(Buf buf) { + try (Buf a = buf.bifurcate()) { + a.ensureWritable(4); + buf.ensureWritable(4); + a.writeInt(1); + buf.writeInt(2); + assertEquals(1, a.readInt()); + assertEquals(2, buf.readInt()); + assertThat(a.order()).isEqualTo(buf.order()); + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void bifurcatedBuffersMustBeAccessibleInOtherThreads(Fixture fixture) throws Exception { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(8)) { + buf.writeInt(42); + var send = buf.bifurcate().send(); + var fut = executor.submit(() -> { + try (Buf receive = send.receive()) { + assertEquals(42, receive.readInt()); + receive.readerOffset(0).writerOffset(0).writeInt(24); + assertEquals(24, receive.readInt()); + } + }); + fut.get(); + buf.writeInt(32); + assertEquals(32, buf.readInt()); + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void sendMustNotMakeBifurcatedBuffersInaccessible(Fixture fixture) throws Exception { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(16)) { + buf.writeInt(64); + var bifA = buf.bifurcate(); + buf.writeInt(42); + var send = buf.bifurcate().send(); + buf.writeInt(72); + var bifB = buf.bifurcate(); + var fut = executor.submit(() -> { + try (Buf receive = send.receive()) { + assertEquals(42, receive.readInt()); + } + }); + fut.get(); + buf.writeInt(32); + assertEquals(32, buf.readInt()); + assertEquals(64, bifA.readInt()); + assertEquals(72, bifB.readInt()); + } + } + // @ParameterizedTest @MethodSource("allocators")