From b749106c0ca7dac533b700ca61df84f89fefd7a7 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 8 Dec 2020 19:25:53 +0100 Subject: [PATCH 1/3] Add a Buf.bifurcate method Motivation: There are use cases that involve accumulating data into a buffer, then carving out prefix slices and sending them off on their own journey for further processing. Modification: Add a Buf.bifurcate API, that split a buffer, and its ownership, in two. Internally, the API will inject and maintain an atomically reference counted Drop instance, so that the original memory segment is not released until all bifurcated parts are closed. This works particularly well for composite buffers, where only the buffer (if any) wherein the bifurcation point lands, will actually have its memory split. A composite buffer can otherwise just crack its buffer array in two. Result: We now have a safe way of breaking the single ownership of some memory into multiple parts, that can be sent and owned independently. --- .../io/netty/buffer/api/AllocatorControl.java | 2 +- src/main/java/io/netty/buffer/api/Buf.java | 44 +++ .../io/netty/buffer/api/CompositeBuf.java | 51 ++- src/main/java/io/netty/buffer/api/Drop.java | 8 +- .../buffer/api/NativeMemoryCleanerDrop.java | 2 +- .../java/io/netty/buffer/api/RcSupport.java | 11 +- .../buffer/api/SizeClassedMemoryPool.java | 4 +- .../io/netty/buffer/api/TransferSend.java | 2 +- .../buffer/api/memseg/BifurcatedDrop.java | 61 ++++ .../io/netty/buffer/api/memseg/MemSegBuf.java | 41 ++- .../java/io/netty/buffer/api/BufTest.java | 304 +++++++++++++++--- 11 files changed, 466 insertions(+), 64 deletions(-) create mode 100644 src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java diff --git a/src/main/java/io/netty/buffer/api/AllocatorControl.java b/src/main/java/io/netty/buffer/api/AllocatorControl.java index 25aa6f6..53a1bf3 100644 --- a/src/main/java/io/netty/buffer/api/AllocatorControl.java +++ b/src/main/java/io/netty/buffer/api/AllocatorControl.java @@ -28,7 +28,7 @@ public interface AllocatorControl { * This allows a buffer to implement {@link Buf#ensureWritable(int)} by having new memory allocated to it, * without that memory being attached to some other lifetime. * - * @param originator The buffer that originated the request for an untethered memory allocated. + * @param originator The buffer that originated the request for an untethered memory allocated. * @param size The size of the requested memory allocation, in bytes. * @return A "recoverable memory" object that is the requested allocation. */ diff --git a/src/main/java/io/netty/buffer/api/Buf.java b/src/main/java/io/netty/buffer/api/Buf.java index 946f473..98404a1 100644 --- a/src/main/java/io/netty/buffer/api/Buf.java +++ b/src/main/java/io/netty/buffer/api/Buf.java @@ -373,4 +373,48 @@ public interface Buf extends Rc, BufAccessors { * That is, if {@link #countBorrows()} is not {@code 0}. */ void ensureWritable(int size); + + /** + * Split the buffer into two, at the {@linkplain #writerOffset() write offset} position. + *

+ * The region of this buffer that contain the read and readable bytes, will be captured and returned in a new + * buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently + * {@linkplain #send() sent} to other threads. + *

+ * The returned buffer will adopt the {@link #readerOffset()} of this buffer, and have its {@link #writerOffset()} + * and {@link #capacity()} both set to the equal to the write offset of this buffer. + *

+ * The memory region in the returned buffer will become inaccessible through this buffer. This buffer will have its + * capacity reduced by the capacity of the returned buffer, and the read and write offsets of this buffer will both + * become zero, even though their position in memory remain unchanged. + *

+ * Effectively, the following transformation takes place: + *

{@code
+     *         This buffer:
+     *          +------------------------------------------+
+     *         0|   |r/o                  |w/o             |cap
+     *          +---+---------------------+----------------+
+     *         /   /                     / \               \
+     *        /   /                     /   \               \
+     *       /   /                     /     \               \
+     *      /   /                     /       \               \
+     *     /   /                     /         \               \
+     *    +---+---------------------+           +---------------+
+     *    |   |r/o                  |w/o & cap  |r/o & w/o      |cap
+     *    +---+---------------------+           +---------------+
+     *    Returned buffer.                      This buffer.
+     * }
+ * When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the + * underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until + * all of the bifurcated parts have been closed. + *

+ * Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be + * bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can + * simply split its internal array in two. + *

+ * Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}. + * + * @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer. + */ + Buf bifurcate(); } diff --git a/src/main/java/io/netty/buffer/api/CompositeBuf.java b/src/main/java/io/netty/buffer/api/CompositeBuf.java index 0f1346e..22c222c 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuf.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuf.java @@ -27,12 +27,9 @@ final class CompositeBuf extends RcSupport implements Buf { * non-composite copy of the buffer. */ private static final int MAX_CAPACITY = Integer.MAX_VALUE - 8; - private static final Drop COMPOSITE_DROP = new Drop() { - @Override - public void drop(CompositeBuf obj) { - for (Buf buf : obj.bufs) { - buf.close(); - } + private static final Drop COMPOSITE_DROP = buf -> { + for (Buf b : buf.bufs) { + b.close(); } }; @@ -45,6 +42,7 @@ final class CompositeBuf extends RcSupport implements Buf { private int roff; private int woff; private int subOffset; // The next offset *within* a consituent buffer to read from or write to. + private ByteOrder order; CompositeBuf(Allocator allocator, Buf[] bufs) { this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array. @@ -64,6 +62,9 @@ final class CompositeBuf extends RcSupport implements Buf { throw new IllegalArgumentException("Constituent buffers have inconsistent byte order."); } } + order = bufs[0].order(); + } else { + order = ByteOrder.nativeOrder(); } this.bufs = bufs; computeBufferOffsets(); @@ -129,6 +130,7 @@ final class CompositeBuf extends RcSupport implements Buf { @Override public Buf order(ByteOrder order) { + this.order = order; for (Buf buf : bufs) { buf.order(order); } @@ -137,7 +139,7 @@ final class CompositeBuf extends RcSupport implements Buf { @Override public ByteOrder order() { - return bufs.length > 0? bufs[0].order() : ByteOrder.nativeOrder(); + return order; } @Override @@ -543,6 +545,9 @@ final class CompositeBuf extends RcSupport implements Buf { "This buffer uses " + order() + " byte order, and cannot be extended with " + "a buffer that uses " + extension.order() + " byte order."); } + if (bufs.length == 0) { + order = extension.order(); + } long newSize = capacity() + (long) extension.capacity(); Allocator.checkSize(newSize); @@ -561,6 +566,36 @@ final class CompositeBuf extends RcSupport implements Buf { computeBufferOffsets(); } + @Override + public Buf bifurcate() { + if (!isOwned()) { + throw new IllegalStateException("Cannot bifurcate a buffer that is not owned."); + } + if (bufs.length == 0) { + // Bifurcating a zero-length buffer is trivial. + return new CompositeBuf(allocator, true, bufs, unsafeGetDrop()).order(order); + } + + int i = searchOffsets(woff); + int off = woff - offsets[i]; + Buf[] bifs = Arrays.copyOf(bufs, off == 0? i : 1 + i); + bufs = Arrays.copyOfRange(bufs, off == bufs[i].capacity()? 1 + i : i, bufs.length); + if (off > 0 && bifs.length > 0 && off < bifs[bifs.length - 1].capacity()) { + bifs[bifs.length - 1] = bufs[0].bifurcate(); + } + computeBufferOffsets(); + try { + var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop()); + compositeBuf.order = order; // Preserve byte order even if bifs array is empty. + return compositeBuf; + } finally { + // Drop our references to the buffers in the bifs array. They belong to the new composite buffer now. + for (Buf bif : bifs) { + bif.close(); + } + } + } + // @Override public byte readByte() { @@ -856,7 +891,7 @@ final class CompositeBuf extends RcSupport implements Buf { received[i] = sends[i].receive(); } var composite = new CompositeBuf(allocator, true, received, drop); - drop.accept(composite); + drop.reconnect(composite); return composite; } }; diff --git a/src/main/java/io/netty/buffer/api/Drop.java b/src/main/java/io/netty/buffer/api/Drop.java index 21fde57..de949c5 100644 --- a/src/main/java/io/netty/buffer/api/Drop.java +++ b/src/main/java/io/netty/buffer/api/Drop.java @@ -15,16 +15,13 @@ */ package io.netty.buffer.api; -import java.util.function.Consumer; - /** * The Drop interface is used by {@link Rc} instances to implement their resource disposal mechanics. The {@link * #drop(Object)} method will be called by the Rc when their last reference is closed. * * @param */ -@FunctionalInterface -public interface Drop extends Consumer { +public interface Drop { /** * Dispose of the resources in the given Rc. * @@ -37,7 +34,6 @@ public interface Drop extends Consumer { * * @param obj The new Rc instance with the new owner. */ - @Override - default void accept(T obj) { + default void reconnect(T obj) { } } diff --git a/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java b/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java index 68c4c1f..3f1321a 100644 --- a/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java +++ b/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java @@ -49,7 +49,7 @@ class NativeMemoryCleanerDrop implements Drop { } @Override - public void accept(Buf buf) { + public void reconnect(Buf buf) { // Unregister old cleanable, if any, to avoid uncontrolled build-up. GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null); if (c != null) { diff --git a/src/main/java/io/netty/buffer/api/RcSupport.java b/src/main/java/io/netty/buffer/api/RcSupport.java index ff75c74..dd39e83 100644 --- a/src/main/java/io/netty/buffer/api/RcSupport.java +++ b/src/main/java/io/netty/buffer/api/RcSupport.java @@ -15,9 +15,12 @@ */ package io.netty.buffer.api; +import java.util.Objects; +import java.util.function.Function; + public abstract class RcSupport, T extends RcSupport> implements Rc { private int acquires; // Closed if negative. - private final Drop drop; + private Drop drop; protected RcSupport(Drop drop) { this.drop = drop; @@ -114,6 +117,12 @@ public abstract class RcSupport, T extends RcSupport> impl return drop; } + protected Drop unsafeExchangeDrop(Drop replacement) { + Objects.requireNonNull(replacement, "Replacement drop cannot be null."); + drop = replacement; + return replacement; + } + @SuppressWarnings("unchecked") private I self() { return (I) this; diff --git a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java index dc547e1..bd24d3d 100644 --- a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java +++ b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java @@ -54,7 +54,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop { protected Buf createBuf(int size, Drop drop) { var buf = manager.allocateShared(this, size, drop, null); - drop.accept(buf); + drop.reconnect(buf); return buf; } @@ -119,7 +119,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop { public void recoverMemory(Object memory) { var drop = getDrop(); var buf = manager.recoverMemory(memory, drop); - drop.accept(buf); + drop.reconnect(buf); buf.close(); } diff --git a/src/main/java/io/netty/buffer/api/TransferSend.java b/src/main/java/io/netty/buffer/api/TransferSend.java index 987187b..a21ff73 100644 --- a/src/main/java/io/netty/buffer/api/TransferSend.java +++ b/src/main/java/io/netty/buffer/api/TransferSend.java @@ -37,7 +37,7 @@ class TransferSend, T extends Rc> implements Send { public I receive() { gateReception(); var copy = outgoing.transferOwnership(drop); - drop.accept(copy); + drop.reconnect(copy); return (I) copy; } diff --git a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java b/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java new file mode 100644 index 0000000..d3523ff --- /dev/null +++ b/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.memseg; + +import io.netty.buffer.api.Drop; + +public class BifurcatedDrop implements Drop { + private final T originalBuf; + private final Drop delegate; + private int count; + private Exception closeTrace; + + public BifurcatedDrop(T originalBuf, Drop delegate) { + this.originalBuf = originalBuf; + this.delegate = delegate; + count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop. + } + + public synchronized void increment() { + checkValidState(); + count++; + } + + @Override + public synchronized void drop(T obj) { + checkValidState(); + if (--count == 0) { + closeTrace = new Exception("close: " + delegate); + delegate.reconnect(originalBuf); + delegate.drop(originalBuf); + } + } + + @Override + public void reconnect(T obj) { + delegate.reconnect(obj); + } + + Drop unwrap() { + return delegate; + } + + private void checkValidState() { + if (count == 0) { + throw new IllegalStateException("Underlying resources have already been freed.", closeTrace); + } + } +} diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java index cc5f2fb..9aed640 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java @@ -315,12 +315,49 @@ class MemSegBuf extends RcSupport implements Buf { RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize); var newSegment = recoverableMemory.segment; newSegment.copyFrom(seg); - alloc.recoverMemory(recoverableMemory()); // Release old memory segment. + + // Release old memory segment: + var drop = unsafeGetDrop(); + if (drop instanceof BifurcatedDrop) { + // Disconnect from the bifurcated drop, since we'll get our own fresh memory segment. + drop.drop(this); + drop = ((BifurcatedDrop) drop).unwrap(); + unsafeExchangeDrop(drop); + } else { + alloc.recoverMemory(recoverableMemory()); + } + seg = newSegment; - unsafeGetDrop().accept(this); + drop.reconnect(this); } } + @Override + public Buf bifurcate() { + if (!isOwned()) { + throw new IllegalStateException("Cannot bifurcate a buffer that is not owned."); + } + var drop = unsafeGetDrop(); + if (seg.ownerThread() != null) { + seg = seg.share(); + drop.reconnect(this); + } + if (drop instanceof BifurcatedDrop) { + ((BifurcatedDrop) drop).increment(); + } else { + drop = unsafeExchangeDrop(new BifurcatedDrop(new MemSegBuf(seg, drop, alloc), drop)); + } + var bifurcatedSeg = seg.asSlice(0, woff); + var bifurcatedBuf = new MemSegBuf(bifurcatedSeg, drop, alloc); + bifurcatedBuf.woff = woff; + bifurcatedBuf.roff = roff; + bifurcatedBuf.order(order); + seg = seg.asSlice(woff, seg.byteSize() - woff); + woff = 0; + roff = 0; + return bifurcatedBuf; + } + // @Override public byte readByte() { diff --git a/src/test/java/io/netty/buffer/api/BufTest.java b/src/test/java/io/netty/buffer/api/BufTest.java index 0e4dbba..9295050 100644 --- a/src/test/java/io/netty/buffer/api/BufTest.java +++ b/src/test/java/io/netty/buffer/api/BufTest.java @@ -65,6 +65,16 @@ public class BufTest { return fixtures = fixtureCombinations().toArray(Fixture[]::new); } + static List initialAllocators() { + return List.of( + new Fixture("heap", Allocator::heap, HEAP), + new Fixture("direct", Allocator::direct, DIRECT), + new Fixture("directWithCleaner", Allocator::directWithCleaner, DIRECT, CLEANER), + new Fixture("pooledHeap", Allocator::pooledHeap, POOLED, HEAP), + new Fixture("pooledDirect", Allocator::pooledDirect, POOLED, DIRECT), + new Fixture("pooledDirectWithCleaner", Allocator::pooledDirectWithCleaner, POOLED, DIRECT, CLEANER)); + } + static Stream nonSliceAllocators() { return fixtureCombinations().filter(f -> !f.isSlice()); } @@ -94,13 +104,7 @@ public class BufTest { if (fxs != null) { return Arrays.stream(fxs); } - List initFixtures = List.of( - new Fixture("heap", Allocator::heap, HEAP), - new Fixture("direct", Allocator::direct, DIRECT), - new Fixture("directWithCleaner", Allocator::directWithCleaner, DIRECT, CLEANER), - new Fixture("pooledHeap", Allocator::pooledHeap, POOLED, HEAP), - new Fixture("pooledDirect", Allocator::pooledDirect, POOLED, DIRECT), - new Fixture("pooledDirectWithCleaner", Allocator::pooledDirectWithCleaner, POOLED, DIRECT, CLEANER)); + List initFixtures = initialAllocators(); Builder builder = Stream.builder(); initFixtures.forEach(builder); @@ -192,44 +196,68 @@ public class BufTest { }, COMPOSITE)); } - return builder.build().flatMap(f -> { - // Inject slice versions of everything - Builder andSlices = Stream.builder(); - andSlices.add(f); - andSlices.add(new Fixture(f + ".slice(0, capacity())", () -> { - var allocatorBase = f.get(); - return new Allocator() { - @Override - public Buf allocate(int size) { - try (Buf base = allocatorBase.allocate(size)) { - return base.slice(0, base.capacity()).writerOffset(0); - } - } + return builder.build().flatMap(BufTest::injectBifurcations).flatMap(BufTest::injectSlices); + } - @Override - public void close() { - allocatorBase.close(); - } - }; - }, Properties.SLICE)); - andSlices.add(new Fixture(f + ".slice(1, capacity() - 2)", () -> { - var allocatorBase = f.get(); - return new Allocator() { - @Override - public Buf allocate(int size) { - try (Buf base = allocatorBase.allocate(size + 2)) { - return base.slice(1, size).writerOffset(0); - } + private static Stream injectBifurcations(Fixture f) { + Builder builder = Stream.builder(); + builder.add(f); + builder.add(new Fixture(f + ".bifurcate", () -> { + var allocatorBase = f.get(); + return new Allocator() { + @Override + public Buf allocate(int size) { + try (Buf buf = allocatorBase.allocate(size + 1)) { + buf.writerOffset(size); + return buf.bifurcate().writerOffset(0); } + } - @Override - public void close() { - allocatorBase.close(); + @Override + public void close() { + allocatorBase.close(); + } + }; + }, f.getProperties())); + return builder.build(); + } + + private static Stream injectSlices(Fixture f) { + Builder builder = Stream.builder(); + builder.add(f); + builder.add(new Fixture(f + ".slice(0, capacity())", () -> { + var allocatorBase = f.get(); + return new Allocator() { + @Override + public Buf allocate(int size) { + try (Buf base = allocatorBase.allocate(size)) { + return base.slice(0, base.capacity()).writerOffset(0); } - }; - }, Properties.SLICE)); - return andSlices.build(); - }); + } + + @Override + public void close() { + allocatorBase.close(); + } + }; + }, Properties.SLICE)); + builder.add(new Fixture(f + ".slice(1, capacity() - 2)", () -> { + var allocatorBase = f.get(); + return new Allocator() { + @Override + public Buf allocate(int size) { + try (Buf base = allocatorBase.allocate(size + 2)) { + return base.slice(1, size).writerOffset(0); + } + } + + @Override + public void close() { + allocatorBase.close(); + } + }; + }, Properties.SLICE)); + return builder.build(); } @BeforeAll @@ -342,7 +370,7 @@ public class BufTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("initialAllocators") void mustThrowWhenAllocatingZeroSizedBuffer(Fixture fixture) { try (Allocator allocator = fixture.createAllocator()) { assertThrows(IllegalArgumentException.class, () -> allocator.allocate(0)); @@ -1814,6 +1842,198 @@ public class BufTest { } } + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(8)) { + buf.writeInt(1); + try (Buf acquired = buf.acquire()) { + var exc = assertThrows(IllegalStateException.class, () -> acquired.bifurcate()); + assertThat(exc).hasMessageContaining("owned"); + } + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void bifurcatedPartMustContainFirstHalfOfBuffer(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(16).order(ByteOrder.BIG_ENDIAN)) { + buf.writeLong(0x0102030405060708L); + assertThat(buf.readByte()).isEqualTo((byte) 0x01); + try (Buf bif = buf.bifurcate()) { + // Original buffer: + assertThat(buf.capacity()).isEqualTo(8); + assertThat(buf.readerOffset()).isZero(); + assertThat(buf.writerOffset()).isZero(); + assertThat(buf.readableBytes()).isZero(); + assertThrows(IndexOutOfBoundsException.class, () -> buf.readByte()); + + // Bifurcated part: + assertThat(bif.capacity()).isEqualTo(8); + assertThat(bif.readerOffset()).isOne(); + assertThat(bif.writerOffset()).isEqualTo(8); + assertThat(bif.readableBytes()).isEqualTo(7); + assertThat(bif.readByte()).isEqualTo((byte) 0x02); + assertThat(bif.readInt()).isEqualTo(0x03040506); + assertThat(bif.readByte()).isEqualTo((byte) 0x07); + assertThat(bif.readByte()).isEqualTo((byte) 0x08); + assertThrows(IndexOutOfBoundsException.class, () -> bif.readByte()); + } + + // Bifurcated part does NOT return when closed: + assertThat(buf.capacity()).isEqualTo(8); + assertThat(buf.readerOffset()).isZero(); + assertThat(buf.writerOffset()).isZero(); + assertThat(buf.readableBytes()).isZero(); + assertThrows(IndexOutOfBoundsException.class, () -> buf.readByte()); + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void bifurcatedPartsMustBeIndividuallySendable(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(16).order(ByteOrder.BIG_ENDIAN)) { + buf.writeLong(0x0102030405060708L); + assertThat(buf.readByte()).isEqualTo((byte) 0x01); + try (Buf sentBif = buf.bifurcate().send().receive()) { + try (Buf sentBuf = buf.send().receive()) { + assertThat(sentBuf.capacity()).isEqualTo(8); + assertThat(sentBuf.readerOffset()).isZero(); + assertThat(sentBuf.writerOffset()).isZero(); + assertThat(sentBuf.readableBytes()).isZero(); + assertThrows(IndexOutOfBoundsException.class, () -> sentBuf.readByte()); + } + + assertThat(sentBif.capacity()).isEqualTo(8); + assertThat(sentBif.readerOffset()).isOne(); + assertThat(sentBif.writerOffset()).isEqualTo(8); + assertThat(sentBif.readableBytes()).isEqualTo(7); + assertThat(sentBif.readByte()).isEqualTo((byte) 0x02); + assertThat(sentBif.readInt()).isEqualTo(0x03040506); + assertThat(sentBif.readByte()).isEqualTo((byte) 0x07); + assertThat(sentBif.readByte()).isEqualTo((byte) 0x08); + assertThrows(IndexOutOfBoundsException.class, () -> sentBif.readByte()); + } + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void mustBePossibleToBifurcateMoreThanOnce(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(16).order(ByteOrder.BIG_ENDIAN)) { + buf.writeLong(0x0102030405060708L); + try (Buf a = buf.bifurcate()) { + a.writerOffset(4); + try (Buf b = a.bifurcate()) { + assertEquals(0x01020304, b.readInt()); + a.writerOffset(4); + assertEquals(0x05060708, a.readInt()); + assertThrows(IndexOutOfBoundsException.class, () -> b.readByte()); + assertThrows(IndexOutOfBoundsException.class, () -> a.readByte()); + buf.writeLong(0xA1A2A3A4A5A6A7A8L); + buf.writerOffset(4); + try (Buf c = buf.bifurcate()) { + assertEquals(0xA1A2A3A4, c.readInt()); + buf.writerOffset(4); + assertEquals(0xA5A6A7A8, buf.readInt()); + assertThrows(IndexOutOfBoundsException.class, () -> c.readByte()); + assertThrows(IndexOutOfBoundsException.class, () -> buf.readByte()); + } + } + } + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void bifurcatedBufferMustHaveSameByteOrderAsParent(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(8).order(ByteOrder.BIG_ENDIAN)) { + buf.writeLong(0x0102030405060708L); + try (Buf a = buf.bifurcate()) { + assertThat(a.order()).isEqualTo(ByteOrder.BIG_ENDIAN); + a.order(ByteOrder.LITTLE_ENDIAN); + a.writerOffset(4); + try (Buf b = a.bifurcate()) { + assertThat(b.order()).isEqualTo(ByteOrder.LITTLE_ENDIAN); + assertThat(buf.order()).isEqualTo(ByteOrder.BIG_ENDIAN); + } + } + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void ensureWritableOnBifurcatedBuffers(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(8)) { + buf.writeLong(0x0102030405060708L); + try (Buf a = buf.bifurcate()) { + assertEquals(0x0102030405060708L, a.readLong()); + a.ensureWritable(8); + a.writeLong(0xA1A2A3A4A5A6A7A8L); + assertEquals(0xA1A2A3A4A5A6A7A8L, a.readLong()); + + buf.ensureWritable(8); + buf.writeLong(0xA1A2A3A4A5A6A7A8L); + assertEquals(0xA1A2A3A4A5A6A7A8L, buf.readLong()); + } + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void ensureWritableOnBifurcatedBuffersWithOddOffsets(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(10).order(ByteOrder.BIG_ENDIAN)) { + buf.writeLong(0x0102030405060708L); + buf.writeByte((byte) 0x09); + buf.readByte(); + try (Buf a = buf.bifurcate()) { + assertEquals(0x0203040506070809L, a.readLong()); + a.ensureWritable(8); + a.writeLong(0xA1A2A3A4A5A6A7A8L); + assertEquals(0xA1A2A3A4A5A6A7A8L, a.readLong()); + + buf.ensureWritable(8); + buf.writeLong(0xA1A2A3A4A5A6A7A8L); + assertEquals(0xA1A2A3A4A5A6A7A8L, buf.readLong()); + } + } + } + + @Test + public void bifurcateOnEmptyBigEndianCompositeBuffer() { + try (Allocator allocator = Allocator.heap(); + Buf buf = allocator.compose().order(ByteOrder.BIG_ENDIAN)) { + verifyBifurcateEmptyCompositeBuffer(buf); + } + } + + @Test + public void bifurcateOnEmptyLittleEndianCompositeBuffer() { + try (Allocator allocator = Allocator.heap(); + Buf buf = allocator.compose().order(ByteOrder.LITTLE_ENDIAN)) { + verifyBifurcateEmptyCompositeBuffer(buf); + } + } + + private void verifyBifurcateEmptyCompositeBuffer(Buf buf) { + try (Buf a = buf.bifurcate()) { + a.ensureWritable(4); + buf.ensureWritable(4); + a.writeInt(1); + buf.writeInt(2); + assertEquals(1, a.readInt()); + assertEquals(2, buf.readInt()); + assertThat(a.order()).isEqualTo(buf.order()); + } + } + // @ParameterizedTest @MethodSource("allocators") From bb2264ac5b38ac0355274608a4a2a0989f47c41d Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 10 Dec 2020 12:51:18 +0100 Subject: [PATCH 2/3] Address review comments on bifurcate PR --- .../io/netty/buffer/api/CompositeBuf.java | 10 ++-- src/main/java/io/netty/buffer/api/Drop.java | 3 +- .../buffer/api/NativeMemoryCleanerDrop.java | 2 +- .../java/io/netty/buffer/api/RcSupport.java | 3 +- .../buffer/api/SizeClassedMemoryPool.java | 4 +- .../io/netty/buffer/api/TransferSend.java | 2 +- .../buffer/api/memseg/BifurcatedDrop.java | 50 +++++++++++++------ .../io/netty/buffer/api/memseg/MemSegBuf.java | 4 +- 8 files changed, 50 insertions(+), 28 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/CompositeBuf.java b/src/main/java/io/netty/buffer/api/CompositeBuf.java index 22c222c..7d2577e 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuf.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuf.java @@ -130,9 +130,11 @@ final class CompositeBuf extends RcSupport implements Buf { @Override public Buf order(ByteOrder order) { - this.order = order; - for (Buf buf : bufs) { - buf.order(order); + if (this.order != order) { + this.order = order; + for (Buf buf : bufs) { + buf.order(order); + } } return this; } @@ -891,7 +893,7 @@ final class CompositeBuf extends RcSupport implements Buf { received[i] = sends[i].receive(); } var composite = new CompositeBuf(allocator, true, received, drop); - drop.reconnect(composite); + drop.attach(composite); return composite; } }; diff --git a/src/main/java/io/netty/buffer/api/Drop.java b/src/main/java/io/netty/buffer/api/Drop.java index de949c5..1658c7e 100644 --- a/src/main/java/io/netty/buffer/api/Drop.java +++ b/src/main/java/io/netty/buffer/api/Drop.java @@ -21,6 +21,7 @@ package io.netty.buffer.api; * * @param */ +@FunctionalInterface public interface Drop { /** * Dispose of the resources in the given Rc. @@ -34,6 +35,6 @@ public interface Drop { * * @param obj The new Rc instance with the new owner. */ - default void reconnect(T obj) { + default void attach(T obj) { } } diff --git a/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java b/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java index 3f1321a..1f0a9ef 100644 --- a/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java +++ b/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java @@ -49,7 +49,7 @@ class NativeMemoryCleanerDrop implements Drop { } @Override - public void reconnect(Buf buf) { + public void attach(Buf buf) { // Unregister old cleanable, if any, to avoid uncontrolled build-up. GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null); if (c != null) { diff --git a/src/main/java/io/netty/buffer/api/RcSupport.java b/src/main/java/io/netty/buffer/api/RcSupport.java index dd39e83..4e59120 100644 --- a/src/main/java/io/netty/buffer/api/RcSupport.java +++ b/src/main/java/io/netty/buffer/api/RcSupport.java @@ -118,8 +118,7 @@ public abstract class RcSupport, T extends RcSupport> impl } protected Drop unsafeExchangeDrop(Drop replacement) { - Objects.requireNonNull(replacement, "Replacement drop cannot be null."); - drop = replacement; + drop = Objects.requireNonNull(replacement, "Replacement drop cannot be null."); return replacement; } diff --git a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java index bd24d3d..5b178ae 100644 --- a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java +++ b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java @@ -54,7 +54,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop { protected Buf createBuf(int size, Drop drop) { var buf = manager.allocateShared(this, size, drop, null); - drop.reconnect(buf); + drop.attach(buf); return buf; } @@ -119,7 +119,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop { public void recoverMemory(Object memory) { var drop = getDrop(); var buf = manager.recoverMemory(memory, drop); - drop.reconnect(buf); + drop.attach(buf); buf.close(); } diff --git a/src/main/java/io/netty/buffer/api/TransferSend.java b/src/main/java/io/netty/buffer/api/TransferSend.java index a21ff73..24098f4 100644 --- a/src/main/java/io/netty/buffer/api/TransferSend.java +++ b/src/main/java/io/netty/buffer/api/TransferSend.java @@ -37,7 +37,7 @@ class TransferSend, T extends Rc> implements Send { public I receive() { gateReception(); var copy = outgoing.transferOwnership(drop); - drop.reconnect(copy); + drop.attach(copy); return (I) copy; } diff --git a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java b/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java index d3523ff..2e17421 100644 --- a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java +++ b/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java @@ -17,45 +17,65 @@ package io.netty.buffer.api.memseg; import io.netty.buffer.api.Drop; -public class BifurcatedDrop implements Drop { +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; + +class BifurcatedDrop implements Drop { + private static final VarHandle COUNT; + static { + try { + COUNT = MethodHandles.lookup().findVarHandle(BifurcatedDrop.class, "count", int.class); + } catch (Exception e) { + throw new ExceptionInInitializerError(e); + } + } + private final T originalBuf; private final Drop delegate; - private int count; - private Exception closeTrace; + @SuppressWarnings("FieldMayBeFinal") + private volatile int count; - public BifurcatedDrop(T originalBuf, Drop delegate) { + BifurcatedDrop(T originalBuf, Drop delegate) { this.originalBuf = originalBuf; this.delegate = delegate; count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop. } - public synchronized void increment() { - checkValidState(); - count++; + void increment() { + int c; + do { + c = count; + checkValidState(c); + } while (!COUNT.compareAndSet(this, c, c + 1)); } @Override public synchronized void drop(T obj) { - checkValidState(); - if (--count == 0) { - closeTrace = new Exception("close: " + delegate); - delegate.reconnect(originalBuf); + int c; + int n; + do { + c = count; + n = c - 1; + checkValidState(c); + } while (!COUNT.compareAndSet(this, c, n)); + if (n == 0) { + delegate.attach(originalBuf); delegate.drop(originalBuf); } } @Override - public void reconnect(T obj) { - delegate.reconnect(obj); + public void attach(T obj) { + delegate.attach(obj); } Drop unwrap() { return delegate; } - private void checkValidState() { + private static void checkValidState(int count) { if (count == 0) { - throw new IllegalStateException("Underlying resources have already been freed.", closeTrace); + throw new IllegalStateException("Underlying resources have already been freed."); } } } diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java index 9aed640..7a11636 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java @@ -328,7 +328,7 @@ class MemSegBuf extends RcSupport implements Buf { } seg = newSegment; - drop.reconnect(this); + drop.attach(this); } } @@ -340,7 +340,7 @@ class MemSegBuf extends RcSupport implements Buf { var drop = unsafeGetDrop(); if (seg.ownerThread() != null) { seg = seg.share(); - drop.reconnect(this); + drop.attach(this); } if (drop instanceof BifurcatedDrop) { ((BifurcatedDrop) drop).increment(); From cccec1ae4c6dc0bcc5659431693ee89b5609d1da Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 10 Dec 2020 14:27:45 +0100 Subject: [PATCH 3/3] Add two more tests for interactions between bifurcation and send --- .../java/io/netty/buffer/api/BufTest.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/src/test/java/io/netty/buffer/api/BufTest.java b/src/test/java/io/netty/buffer/api/BufTest.java index 9295050..e043801 100644 --- a/src/test/java/io/netty/buffer/api/BufTest.java +++ b/src/test/java/io/netty/buffer/api/BufTest.java @@ -2034,6 +2034,50 @@ public class BufTest { } } + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void bifurcatedBuffersMustBeAccessibleInOtherThreads(Fixture fixture) throws Exception { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(8)) { + buf.writeInt(42); + var send = buf.bifurcate().send(); + var fut = executor.submit(() -> { + try (Buf receive = send.receive()) { + assertEquals(42, receive.readInt()); + receive.readerOffset(0).writerOffset(0).writeInt(24); + assertEquals(24, receive.readInt()); + } + }); + fut.get(); + buf.writeInt(32); + assertEquals(32, buf.readInt()); + } + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void sendMustNotMakeBifurcatedBuffersInaccessible(Fixture fixture) throws Exception { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(16)) { + buf.writeInt(64); + var bifA = buf.bifurcate(); + buf.writeInt(42); + var send = buf.bifurcate().send(); + buf.writeInt(72); + var bifB = buf.bifurcate(); + var fut = executor.submit(() -> { + try (Buf receive = send.receive()) { + assertEquals(42, receive.readInt()); + } + }); + fut.get(); + buf.writeInt(32); + assertEquals(32, buf.readInt()); + assertEquals(64, bifA.readInt()); + assertEquals(72, bifB.readInt()); + } + } + // @ParameterizedTest @MethodSource("allocators")