From 253b6cb9190b146fe045216cb79da8ad1843c775 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 15 Mar 2021 16:42:56 +0100 Subject: [PATCH 1/5] Allow slices to obtain ownership when parent is closed Motivation: It is kind of a weird internal and hidden state, that slices were special. For instance, slices could not be sent, and they could never obtain ownership. This means buffers from slices behaved differently from allocated buffers. In doing so, they violated both the principle that magic should stay hidden, and the principle of consistent behaviour. Modification: - The special reference-counting drop implementation that was added to support bifurcation, has been renamed to ArcDrop (for atomic reference counting). - The ArcDrop is then used throughout the MemSegBuffer implementation to account for every instance where multiple buffers reference the same memory, e.g. slices and the like. - Borrows of a buffer is then the sum of borrows from the buffer itself, and its ArcDrop. - Ownership is thus tied to both the buffer itself being owned, and the ArcDrop being in an owned state. - SizeClassedMemoryPool is changed to pool recoverable memory instead of sends, because the sends could come from slices. - We also take care to keep around a "base" memory segment, so that we don't return memory segment slices to the memory pool (doing so would leak the memory from the parent segment that is not part of the slice). - CleanerPooledDrop now keeps a weak reference to itself, rather than the buffer, which is more correct anyway, but now also required because we cannot rely on the buffer reference the cleaner was created with. - The CleanerPooledDrop now takes care to drop the buffer that is actually passed to it, rather than what it was referencing from some earlier point. - MemoryManager can now disclose the size of recoverable memory, so that SizeClassedMemoryPool can pick the correct size pool to return memory to. It cannot rely on the passed down buffer instance for this, because that buffer might have been a slice. Result: It is now possible for slices to obtain ownership when their parent buffer is closed. --- .../netty/buffer/api/CleanerPooledDrop.java | 43 ++++--- .../io/netty/buffer/api/MemoryManager.java | 1 + .../java/io/netty/buffer/api/RcSupport.java | 2 +- .../buffer/api/SizeClassedMemoryPool.java | 52 ++++---- .../memseg/AbstractMemorySegmentManager.java | 9 +- .../{BifurcatedDrop.java => ArcDrop.java} | 40 ++++-- .../netty/buffer/api/memseg/MemSegBuffer.java | 115 +++++++++++------- .../java/io/netty/buffer/api/BufferTest.java | 45 +++++++ 8 files changed, 208 insertions(+), 99 deletions(-) rename src/main/java/io/netty/buffer/api/memseg/{BifurcatedDrop.java => ArcDrop.java} (69%) diff --git a/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java b/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java index be8621f..8e4e785 100644 --- a/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java +++ b/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java @@ -45,6 +45,7 @@ class CleanerPooledDrop implements Drop { GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null); if (c != null) { c.clean(); + delegate.drop(buf); } } @@ -57,24 +58,38 @@ class CleanerPooledDrop implements Drop { c.clean(); } - var pool = this.pool; var mem = manager.unwrapRecoverableMemory(buf); - var delegate = this.delegate; - WeakReference ref = new WeakReference<>(buf); + WeakReference ref = new WeakReference<>(this); AtomicBoolean gate = new AtomicBoolean(true); - cleanable = new GatedCleanable(gate, CLEANER.register(this, () -> { - if (gate.getAndSet(false)) { - Buffer b = ref.get(); - if (b == null) { - pool.recoverMemory(mem); - } else { - delegate.drop(b); - } - } - })); + cleanable = new GatedCleanable(gate, CLEANER.register(this, new CleanAction(pool, mem, ref, gate))); } - private static class GatedCleanable implements Cleanable { + private static final class CleanAction implements Runnable { + private final SizeClassedMemoryPool pool; + private final Object mem; + private final WeakReference ref; + private final AtomicBoolean gate; + + private CleanAction(SizeClassedMemoryPool pool, Object mem, WeakReference ref, + AtomicBoolean gate) { + this.pool = pool; + this.mem = mem; + this.ref = ref; + this.gate = gate; + } + + @Override + public void run() { + if (gate.getAndSet(false)) { + var monitored = ref.get(); + if (monitored == null) { + pool.recoverMemory(mem); + } + } + } + } + + private static final class GatedCleanable implements Cleanable { private final AtomicBoolean gate; private final Cleanable cleanable; diff --git a/src/main/java/io/netty/buffer/api/MemoryManager.java b/src/main/java/io/netty/buffer/api/MemoryManager.java index c3e6dfd..024d2df 100644 --- a/src/main/java/io/netty/buffer/api/MemoryManager.java +++ b/src/main/java/io/netty/buffer/api/MemoryManager.java @@ -34,5 +34,6 @@ public interface MemoryManager { Buffer allocateShared(AllocatorControl allo, long size, Drop drop, Cleaner cleaner); Drop drop(); Object unwrapRecoverableMemory(Buffer buf); + int capacityOfRecoverableMemory(Object memory); Buffer recoverMemory(Object recoverableMemory, Drop drop); } diff --git a/src/main/java/io/netty/buffer/api/RcSupport.java b/src/main/java/io/netty/buffer/api/RcSupport.java index 2332342..48ddbaf 100644 --- a/src/main/java/io/netty/buffer/api/RcSupport.java +++ b/src/main/java/io/netty/buffer/api/RcSupport.java @@ -37,7 +37,7 @@ public abstract class RcSupport, T extends RcSupport> impl @Override public final I acquire() { if (acquires < 0) { - throw attachTrace(new IllegalStateException("Resource is closed.")); + throw attachTrace(new IllegalStateException("This resource is closed: " + this + '.')); } if (acquires == Integer.MAX_VALUE) { throw new IllegalStateException("Cannot acquire more references; counter would overflow."); diff --git a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java index 08a6949..5b336c4 100644 --- a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java +++ b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java @@ -28,7 +28,7 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop>> pool; + private final ConcurrentHashMap> pool; @SuppressWarnings("unused") private volatile boolean closed; @@ -41,9 +41,9 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop send = sizeClassPool.poll(); - if (send != null) { - return send.receive() + Object memory = sizeClassPool.poll(); + if (memory != null) { + return recoverMemoryIntoBuffer(memory) .reset() .readOnly(false) .fill((byte) 0) @@ -71,10 +71,10 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop(4); pool.forEach((k, v) -> { - Send send; - while ((send = v.poll()) != null) { + Object memory; + while ((memory = v.poll()) != null) { try { - send.receive().close(); + dispose(recoverMemoryIntoBuffer(memory)); } catch (Exception e) { capturedExceptions.add(e); } @@ -94,12 +94,13 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop send; - while ((send = sizeClassPool.poll()) != null) { - send.receive().close(); + Object memory; + while ((memory = sizeClassPool.poll()) != null) { + dispose(recoverMemoryIntoBuffer(memory)); } } } @@ -107,27 +108,28 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop send = sizeClassPool.poll(); - Buffer untetheredBuf; - if (send != null) { - var transfer = (TransferSend) send; - var owned = transfer.unsafeUnwrapOwned(); - untetheredBuf = owned.transferOwnership(NO_OP_DROP); - } else { - untetheredBuf = createBuf(size, NO_OP_DROP); + Object memory = sizeClassPool.poll(); + if (memory == null) { + Buffer untetheredBuf = createBuf(size, NO_OP_DROP); + memory = manager.unwrapRecoverableMemory(untetheredBuf); } - return manager.unwrapRecoverableMemory(untetheredBuf); + return memory; } @Override public void recoverMemory(Object memory) { - var drop = getDrop(); - var buf = manager.recoverMemory(memory, drop); - drop.attach(buf); + Buffer buf = recoverMemoryIntoBuffer(memory); buf.close(); } - private ConcurrentLinkedQueue> getSizeClassPool(int size) { + private Buffer recoverMemoryIntoBuffer(Object memory) { + var drop = getDrop(); + var buf = manager.recoverMemory(memory, drop); + drop.attach(buf); + return buf; + } + + private ConcurrentLinkedQueue getSizeClassPool(int size) { return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>()); } diff --git a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java index 797970b..db7b359 100644 --- a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java +++ b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java @@ -34,7 +34,7 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager { if (cleaner != null) { segment = segment.registerCleaner(cleaner); } - return new MemSegBuffer(segment, convert(drop), alloc); + return new MemSegBuffer(segment, segment, convert(drop), alloc); } @Override @@ -43,7 +43,7 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager { if (cleaner != null) { segment = segment.registerCleaner(cleaner); } - return new MemSegBuffer(segment, convert(drop), alloc); + return new MemSegBuffer(segment, segment, convert(drop), alloc); } protected abstract MemorySegment createSegment(long size); @@ -59,6 +59,11 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager { return b.recoverableMemory(); } + @Override + public int capacityOfRecoverableMemory(Object memory) { + return ((RecoverableMemory) memory).capacity(); + } + @Override public Buffer recoverMemory(Object recoverableMemory, Drop drop) { var recovery = (RecoverableMemory) recoverableMemory; diff --git a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java b/src/main/java/io/netty/buffer/api/memseg/ArcDrop.java similarity index 69% rename from src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java rename to src/main/java/io/netty/buffer/api/memseg/ArcDrop.java index 4a72f07..3e8bad5 100644 --- a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java +++ b/src/main/java/io/netty/buffer/api/memseg/ArcDrop.java @@ -20,33 +20,47 @@ import io.netty.buffer.api.Drop; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; -class BifurcatedDrop implements Drop { +final class ArcDrop implements Drop { private static final VarHandle COUNT; static { try { - COUNT = MethodHandles.lookup().findVarHandle(BifurcatedDrop.class, "count", int.class); + COUNT = MethodHandles.lookup().findVarHandle(ArcDrop.class, "count", int.class); } catch (Exception e) { throw new ExceptionInInitializerError(e); } } - private final MemSegBuffer originalBuf; private final Drop delegate; @SuppressWarnings("FieldMayBeFinal") private volatile int count; - BifurcatedDrop(MemSegBuffer originalBuf, Drop delegate) { - this.originalBuf = originalBuf; + ArcDrop(Drop delegate) { this.delegate = delegate; - count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop. + count = 1; } - void increment() { + static Drop wrap(Drop drop) { + if (drop.getClass() == ArcDrop.class) { + return drop; + } + return new ArcDrop(drop); + } + + static Drop acquire(Drop drop) { + if (drop.getClass() == ArcDrop.class) { + ((ArcDrop) drop).increment(); + return drop; + } + return new ArcDrop(drop); + } + + ArcDrop increment() { int c; do { c = count; checkValidState(c); } while (!COUNT.compareAndSet(this, c, c + 1)); + return this; } @Override @@ -59,10 +73,8 @@ class BifurcatedDrop implements Drop { checkValidState(c); } while (!COUNT.compareAndSet(this, c, n)); if (n == 0) { - delegate.attach(originalBuf); - delegate.drop(originalBuf); + delegate.drop(buf); } - buf.makeInaccessible(); } @Override @@ -70,6 +82,14 @@ class BifurcatedDrop implements Drop { delegate.attach(obj); } + boolean isOwned() { + return count <= 1; + } + + int countBorrows() { + return count - 1; + } + Drop unwrap() { return delegate; } diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java index 547a8d8..bf0d17a 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java @@ -59,31 +59,58 @@ class MemSegBuffer extends RcSupport implements Buffer, Re CLOSED_SEGMENT = MemorySegment.ofArray(new byte[0]); CLOSED_SEGMENT.close(); SEGMENT_CLOSE = buf -> { - try (var ignore = buf.seg) { - buf.makeInaccessible(); - } + buf.base.close(); }; } private final AllocatorControl alloc; - private final boolean isSendable; + private MemorySegment base; private MemorySegment seg; private MemorySegment wseg; private ByteOrder order; private int roff; private int woff; - MemSegBuffer(MemorySegment segmet, Drop drop, AllocatorControl alloc) { - this(segmet, drop, alloc, true); + MemSegBuffer(MemorySegment baseSegment, MemorySegment viewSegment, Drop drop, AllocatorControl alloc) { + super(new MakeInaccisbleOnDrop(ArcDrop.wrap(drop))); + this.alloc = alloc; + base = baseSegment; + seg = viewSegment; + wseg = viewSegment; + order = ByteOrder.nativeOrder(); } - private MemSegBuffer(MemorySegment segment, Drop drop, AllocatorControl alloc, boolean isSendable) { - super(drop); - this.alloc = alloc; - seg = segment; - wseg = segment; - this.isSendable = isSendable; - order = ByteOrder.nativeOrder(); + private static final class MakeInaccisbleOnDrop implements Drop { + final Drop delegate; + + private MakeInaccisbleOnDrop(Drop delegate) { + this.delegate = delegate; + } + + @Override + public void drop(MemSegBuffer buf) { + try { + delegate.drop(buf); + } finally { + buf.makeInaccessible(); + } + } + + @Override + public void attach(MemSegBuffer buf) { + delegate.attach(buf); + } + } + + @Override + protected Drop unsafeGetDrop() { + MakeInaccisbleOnDrop drop = (MakeInaccisbleOnDrop) super.unsafeGetDrop(); + return drop.delegate; + } + + @Override + protected void unsafeSetDrop(Drop replacement) { + super.unsafeSetDrop(new MakeInaccisbleOnDrop(replacement)); } @Override @@ -233,14 +260,12 @@ class MemSegBuffer extends RcSupport implements Buffer, Re if (length < 0) { throw new IllegalArgumentException("Length cannot be negative: " + length + '.'); } + if (!isAccessible()) { + throw new IllegalStateException("This buffer is closed: " + this + '.'); + } var slice = seg.asSlice(offset, length); - acquire(); - Drop drop = b -> { - close(); - b.makeInaccessible(); - }; - var sendable = false; // Sending implies ownership change, which we can't do for slices. - return new MemSegBuffer(slice, drop, alloc, sendable) + Drop drop = ArcDrop.acquire(unsafeGetDrop()); + return new MemSegBuffer(base, slice, drop, alloc) .writerOffset(length) .order(order()) .readOnly(readOnly()); @@ -482,19 +507,22 @@ class MemSegBuffer extends RcSupport implements Buffer, Re // Release old memory segment: var drop = unsafeGetDrop(); - if (drop instanceof BifurcatedDrop) { - // Disconnect from the bifurcated drop, since we'll get our own fresh memory segment. + if (drop instanceof ArcDrop) { + // Disconnect from the current arc drop, since we'll get our own fresh memory segment. int roff = this.roff; int woff = this.woff; drop.drop(this); - drop = ((BifurcatedDrop) drop).unwrap(); - unsafeSetDrop(drop); + while (drop instanceof ArcDrop) { + drop = ((ArcDrop) drop).unwrap(); + } + unsafeSetDrop(new ArcDrop(drop)); this.roff = roff; this.woff = woff; } else { alloc.recoverMemory(recoverableMemory()); } + base = newSegment; seg = newSegment; wseg = newSegment; drop.attach(this); @@ -505,19 +533,10 @@ class MemSegBuffer extends RcSupport implements Buffer, Re if (!isOwned()) { throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); } - var drop = unsafeGetDrop(); - if (seg.ownerThread() != null) { - seg = seg.share(); - drop.attach(this); - } - if (drop instanceof BifurcatedDrop) { - ((BifurcatedDrop) drop).increment(); - } else { - drop = new BifurcatedDrop(new MemSegBuffer(seg, drop, alloc), drop); - unsafeSetDrop(drop); - } + var drop = (ArcDrop) unsafeGetDrop(); + unsafeSetDrop(new ArcDrop(drop)); var bifurcatedSeg = seg.asSlice(0, woff); - var bifurcatedBuf = new MemSegBuffer(bifurcatedSeg, drop, alloc); + var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop(drop.increment()), alloc); bifurcatedBuf.woff = woff; bifurcatedBuf.roff = roff; bifurcatedBuf.order(order); @@ -1052,11 +1071,12 @@ class MemSegBuffer extends RcSupport implements Buffer, Re var readOnly = readOnly(); boolean isConfined = seg.ownerThread() == null; MemorySegment transferSegment = isConfined? seg : seg.share(); + MemorySegment base = this.base; makeInaccessible(); return new Owned() { @Override public MemSegBuffer transferOwnership(Drop drop) { - MemSegBuffer copy = new MemSegBuffer(transferSegment, drop, alloc); + MemSegBuffer copy = new MemSegBuffer(base, transferSegment, drop, alloc); copy.order = order; copy.roff = roff; copy.woff = woff; @@ -1067,6 +1087,7 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } void makeInaccessible() { + base = CLOSED_SEGMENT; seg = CLOSED_SEGMENT; wseg = CLOSED_SEGMENT; roff = 0; @@ -1074,17 +1095,13 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } @Override - protected IllegalStateException notSendableException() { - if (!isSendable) { - return new IllegalStateException( - "Cannot send() this buffer. This buffer might be a slice of another buffer."); - } - return super.notSendableException(); + public boolean isOwned() { + return super.isOwned() && ((ArcDrop) unsafeGetDrop()).isOwned(); } @Override - public boolean isOwned() { - return isSendable && super.isOwned(); + public int countBorrows() { + return super.countBorrows() + ((ArcDrop) unsafeGetDrop()).countBorrows(); } private void checkRead(int index, int size) { @@ -1153,7 +1170,7 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } Object recoverableMemory() { - return new RecoverableMemory(seg, alloc); + return new RecoverableMemory(base, alloc); } // @@ -1226,7 +1243,11 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } Buffer recover(Drop drop) { - return new MemSegBuffer(segment, drop, alloc); + return new MemSegBuffer(segment, segment, ArcDrop.acquire(drop), alloc); + } + + int capacity() { + return (int) segment.byteSize(); } } } diff --git a/src/test/java/io/netty/buffer/api/BufferTest.java b/src/test/java/io/netty/buffer/api/BufferTest.java index 2f4d998..82de3fe 100644 --- a/src/test/java/io/netty/buffer/api/BufferTest.java +++ b/src/test/java/io/netty/buffer/api/BufferTest.java @@ -926,6 +926,51 @@ public class BufferTest { } } + @ParameterizedTest + @MethodSource("nonCompositeAllocators") + public void acquireComposingAndSlicingMustIncrementBorrowsWithData(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(8)) { + buf.writeByte((byte) 1); + int borrows = buf.countBorrows(); + try (Buffer ignored = buf.acquire()) { + assertEquals(borrows + 1, buf.countBorrows()); + try (Buffer slice = buf.slice()) { + assertEquals(1, slice.capacity()); + int sliceBorrows = slice.countBorrows(); + assertEquals(borrows + 2, buf.countBorrows()); + try (Buffer ignored1 = Buffer.compose(allocator, buf, slice)) { + assertEquals(borrows + 3, buf.countBorrows()); + assertEquals(sliceBorrows + 1, slice.countBorrows()); + } + assertEquals(sliceBorrows, slice.countBorrows()); + assertEquals(borrows + 2, buf.countBorrows()); + } + assertEquals(borrows + 1, buf.countBorrows()); + } + assertEquals(borrows, buf.countBorrows()); + } + } + + @Disabled + @ParameterizedTest + @MethodSource("allocators") + public void sliceMustBecomeOwnedOnSourceBufferClose(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator()) { + Buffer buf = allocator.allocate(8); + buf.writeInt(42); + try (Buffer slice = buf.slice()) { + buf.close(); + assertFalse(buf.isAccessible()); + assertTrue(slice.isOwned()); + try (Buffer receive = slice.send().receive()) { + assertTrue(receive.isOwned()); + assertFalse(slice.isAccessible()); + } + } + } + } + @ParameterizedTest @MethodSource("allocators") void copyIntoByteArray(Fixture fixture) { From 1f4234dfb5d3c2c1e1d7ea2061ca116cf320f60c Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 15 Mar 2021 16:59:42 +0100 Subject: [PATCH 2/5] Fix checkstyle line length --- .../java/io/netty/buffer/api/memseg/MemSegBuffer.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java index bf0d17a..e0cd90d 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java @@ -71,12 +71,12 @@ class MemSegBuffer extends RcSupport implements Buffer, Re private int roff; private int woff; - MemSegBuffer(MemorySegment baseSegment, MemorySegment viewSegment, Drop drop, AllocatorControl alloc) { + MemSegBuffer(MemorySegment base, MemorySegment view, Drop drop, AllocatorControl alloc) { super(new MakeInaccisbleOnDrop(ArcDrop.wrap(drop))); this.alloc = alloc; - base = baseSegment; - seg = viewSegment; - wseg = viewSegment; + this.base = base; + seg = view; + wseg = view; order = ByteOrder.nativeOrder(); } From 0ccb34ca0801b0f65ec344b055d68d31247620f1 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 16 Mar 2021 12:11:29 +0100 Subject: [PATCH 3/5] Fix failing ByteBufAdaptorTests and increase adaptor compatibility --- .../java/io/netty/buffer/api/RcSupport.java | 2 +- .../buffer/api/adaptor/ByteBufAdaptor.java | 68 +++++-- .../api/adaptor/ByteBufAdaptorTest.java | 174 ------------------ 3 files changed, 56 insertions(+), 188 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/RcSupport.java b/src/main/java/io/netty/buffer/api/RcSupport.java index 48ddbaf..99a4310 100644 --- a/src/main/java/io/netty/buffer/api/RcSupport.java +++ b/src/main/java/io/netty/buffer/api/RcSupport.java @@ -98,7 +98,7 @@ public abstract class RcSupport, T extends RcSupport> impl */ protected IllegalStateException notSendableException() { return new IllegalStateException( - "Cannot send() a reference counted object with " + acquires + " outstanding acquires: " + this + '.'); + "Cannot send() a reference counted object with " + countBorrows() + " borrows: " + this + '.'); } @Override diff --git a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java index 21dae15..fa0ec00 100644 --- a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java +++ b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java @@ -19,6 +19,8 @@ import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.DuplicatedByteBuf; +import io.netty.buffer.SlicedByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.BufferAllocator; @@ -957,8 +959,8 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuf readSlice(int length) { - ByteBuf slice = readRetainedSlice(length); - release(); + ByteBuf slice = slice(readerIndex(), length); + buffer.readerOffset(buffer.readerOffset() + length); return slice; } @@ -1411,22 +1413,18 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuf slice() { - ByteBuf slice = retainedSlice(); - release(); - return slice; + return slice(readerIndex(), readableBytes()); } @Override public ByteBuf retainedSlice() { - checkAccess(); - return wrap(buffer.slice()); + return retainedSlice(readerIndex(), readableBytes()); } @Override public ByteBuf slice(int index, int length) { - ByteBuf slice = retainedSlice(index, length); - release(); - return slice; + checkAccess(); + return new Slice(this, index, length); } @Override @@ -1439,11 +1437,55 @@ public final class ByteBufAdaptor extends ByteBuf { } } + private static final class Slice extends SlicedByteBuf { + private final int indexAdjustment; + private final int lengthAdjustment; + + Slice(ByteBuf buffer, int index, int length) { + super(buffer, index, length); + indexAdjustment = index; + lengthAdjustment = length; + } + + @Override + public ByteBuf retainedDuplicate() { + return new Slice(unwrap().retainedDuplicate(), indexAdjustment, lengthAdjustment); + } + + @Override + public ByteBuf retainedSlice(int index, int length) { + checkIndex(index, length); + return unwrap().retainedSlice(indexAdjustment + index, length); + } + } + + private static final class Duplicate extends DuplicatedByteBuf { + Duplicate(ByteBufAdaptor byteBuf) { + super(byteBuf); + } + + @Override + public ByteBuf duplicate() { + ((ByteBufAdaptor) unwrap()).checkAccess(); + return new Duplicate((ByteBufAdaptor) unwrap()); + } + + @Override + public ByteBuf retainedDuplicate() { + return unwrap().retainedDuplicate(); + } + + @Override + public ByteBuf retainedSlice(int index, int length) { + return unwrap().retainedSlice(index, length); + } + } + @Override public ByteBuf duplicate() { - ByteBuf duplicate = retainedDuplicate(); - release(); - return duplicate; + checkAccess(); + Duplicate duplicatedByteBuf = new Duplicate(this); + return duplicatedByteBuf.setIndex(readerIndex(), writerIndex()); } @Override diff --git a/src/test/java/io/netty/buffer/api/adaptor/ByteBufAdaptorTest.java b/src/test/java/io/netty/buffer/api/adaptor/ByteBufAdaptorTest.java index 16d994e..fd15e0b 100644 --- a/src/test/java/io/netty/buffer/api/adaptor/ByteBufAdaptorTest.java +++ b/src/test/java/io/netty/buffer/api/adaptor/ByteBufAdaptorTest.java @@ -39,36 +39,6 @@ public class ByteBufAdaptorTest extends AbstractByteBufTest { return alloc.buffer(capacity, capacity); } - @Ignore("New buffers not thread-safe like this.") - @Override - public void testSliceReadGatheringByteChannelMultipleThreads() throws Exception { - } - - @Ignore("New buffers not thread-safe like this.") - @Override - public void testDuplicateReadGatheringByteChannelMultipleThreads() throws Exception { - } - - @Ignore("New buffers not thread-safe like this.") - @Override - public void testSliceReadOutputStreamMultipleThreads() throws Exception { - } - - @Ignore("New buffers not thread-safe like this.") - @Override - public void testDuplicateReadOutputStreamMultipleThreads() throws Exception { - } - - @Ignore("New buffers not thread-safe like this.") - @Override - public void testSliceBytesInArrayMultipleThreads() throws Exception { - } - - @Ignore("New buffers not thread-safe like this.") - @Override - public void testDuplicateBytesInArrayMultipleThreads() throws Exception { - } - @Ignore("This test codifies that asking to reading 0 bytes from an empty but unclosed stream should return -1, " + "which is just weird.") @Override @@ -112,152 +82,8 @@ public class ByteBufAdaptorTest extends AbstractByteBufTest { public void testToByteBuffer2() { } - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testRetainedDuplicateUnreleasable3() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testRetainedDuplicateUnreleasable4() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testRetainedDuplicateAndRetainedSliceContentIsExpected() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testMultipleRetainedSliceReleaseOriginal2() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testMultipleRetainedSliceReleaseOriginal3() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testMultipleRetainedSliceReleaseOriginal4() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testReadRetainedSliceUnreleasable3() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testReadRetainedSliceUnreleasable4() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testRetainedSliceUnreleasable3() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testRetainedSliceUnreleasable4() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testRetainedSliceReleaseOriginal2() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testRetainedSliceReleaseOriginal3() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testRetainedSliceReleaseOriginal4() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testMultipleRetainedDuplicateReleaseOriginal2() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testMultipleRetainedDuplicateReleaseOriginal3() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testMultipleRetainedDuplicateReleaseOriginal4() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testRetainedDuplicateReleaseOriginal2() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testRetainedDuplicateReleaseOriginal3() { - } - - @Ignore("This assumes a single reference count for the memory, but all buffers (views of memory) have " + - "independent reference counts now. Also, this plays tricks with reference that we cannot support.") - @Override - public void testRetainedDuplicateReleaseOriginal4() { - } - @Ignore("No longer allowed to allocate 0 sized buffers, except for composite buffers with no components.") @Override public void testLittleEndianWithExpand() { } - - @Ignore("Test seems to inherently have double-free bug?") - @Override - public void testRetainedSliceAfterReleaseRetainedSliceDuplicate() { - } - - @Ignore("Test seems to inherently have double-free bug?") - @Override - public void testRetainedSliceAfterReleaseRetainedDuplicateSlice() { - } - - @Ignore("Test seems to inherently have double-free bug?") - @Override - public void testSliceAfterReleaseRetainedSliceDuplicate() { - } - - @Ignore("Test seems to inherently have double-free bug?") - @Override - public void testDuplicateAfterReleaseRetainedSliceDuplicate() { - } - - @Ignore("Test seems to inherently have double-free bug?") - @Override - public void testDuplicateAfterReleaseRetainedDuplicateSlice() { - } - - @Ignore("Test seems to inherently have double-free bug?") - @Override - public void testSliceAfterReleaseRetainedDuplicateSlice() { - } } From d40989da78d1c7c961462c2f30f45d18ea97e432 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 16 Mar 2021 17:20:12 +0100 Subject: [PATCH 4/5] Add toString implementations to all Drop implementations This is helpful when debugging. --- .../io/netty/buffer/api/CleanerPooledDrop.java | 5 +++++ .../io/netty/buffer/api/CompositeBuffer.java | 16 ++++++++++++---- .../netty/buffer/api/SizeClassedMemoryPool.java | 5 +++++ src/main/java/io/netty/buffer/api/Statics.java | 10 +++++++++- .../io/netty/buffer/api/memseg/ArcDrop.java | 10 ++++++++++ .../netty/buffer/api/memseg/MemSegBuffer.java | 17 +++++++++++++++-- 6 files changed, 56 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java b/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java index 8e4e785..5bed302 100644 --- a/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java +++ b/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java @@ -64,6 +64,11 @@ class CleanerPooledDrop implements Drop { cleanable = new GatedCleanable(gate, CLEANER.register(this, new CleanAction(pool, mem, ref, gate))); } + @Override + public String toString() { + return "CleanerPooledDrop(" + delegate + ')'; + } + private static final class CleanAction implements Runnable { private final SizeClassedMemoryPool pool; private final Object mem; diff --git a/src/main/java/io/netty/buffer/api/CompositeBuffer.java b/src/main/java/io/netty/buffer/api/CompositeBuffer.java index 7cfbb11..1a56a58 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuffer.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuffer.java @@ -31,11 +31,19 @@ final class CompositeBuffer extends RcSupport implement * non-composite copy of the buffer. */ private static final int MAX_CAPACITY = Integer.MAX_VALUE - 8; - private static final Drop COMPOSITE_DROP = buf -> { - for (Buffer b : buf.bufs) { - b.close(); + private static final Drop COMPOSITE_DROP = new Drop() { + @Override + public void drop(CompositeBuffer buf) { + for (Buffer b : buf.bufs) { + b.close(); + } + buf.makeInaccessible(); + } + + @Override + public String toString() { + return "COMPOSITE_DROP"; } - buf.makeInaccessible(); }; private final BufferAllocator allocator; diff --git a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java index 5b336c4..265bd82 100644 --- a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java +++ b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java @@ -105,6 +105,11 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop NO_OP_DROP = buf -> { + Drop NO_OP_DROP = new Drop() { + @Override + public void drop(Buffer obj) { + } + + @Override + public String toString() { + return "NO_OP_DROP"; + } }; static VarHandle findVarHandle(Lookup lookup, Class recv, String name, Class type) { diff --git a/src/main/java/io/netty/buffer/api/memseg/ArcDrop.java b/src/main/java/io/netty/buffer/api/memseg/ArcDrop.java index 3e8bad5..293a88f 100644 --- a/src/main/java/io/netty/buffer/api/memseg/ArcDrop.java +++ b/src/main/java/io/netty/buffer/api/memseg/ArcDrop.java @@ -94,6 +94,16 @@ final class ArcDrop implements Drop { return delegate; } + @Override + public String toString() { + StringBuilder builder = new StringBuilder().append("ArcDrop(").append(count).append(", "); + Drop drop = this; + while ((drop = ((ArcDrop) drop).unwrap()) instanceof ArcDrop) { + builder.append(((ArcDrop) drop).count).append(", "); + } + return builder.append(drop).append(')').toString(); + } + private static void checkValidState(int count) { if (count == 0) { throw new IllegalStateException("Underlying resources have already been freed."); diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java index e0cd90d..01d3e08 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java @@ -58,8 +58,16 @@ class MemSegBuffer extends RcSupport implements Buffer, Re static { CLOSED_SEGMENT = MemorySegment.ofArray(new byte[0]); CLOSED_SEGMENT.close(); - SEGMENT_CLOSE = buf -> { - buf.base.close(); + SEGMENT_CLOSE = new Drop() { + @Override + public void drop(MemSegBuffer buf) { + buf.base.close(); + } + + @Override + public String toString() { + return "SEGMENT_CLOSE"; + } }; } @@ -100,6 +108,11 @@ class MemSegBuffer extends RcSupport implements Buffer, Re public void attach(MemSegBuffer buf) { delegate.attach(buf); } + + @Override + public String toString() { + return "MemSegDrop(" + delegate + ')'; + } } @Override From de305bd6b99770e1c8b3374dff5a6d8aed0f5ca9 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 16 Mar 2021 17:22:41 +0100 Subject: [PATCH 5/5] Align slice sendability of composite buffers with that of non-composite buffers This means we no longer need to have tests that are parameterised over non-sliced buffers. --- .../io/netty/buffer/api/CompositeBuffer.java | 82 +++++++------------ .../api/memseg/HeapMemorySegmentManager.java | 13 --- .../java/io/netty/buffer/api/BufferTest.java | 68 +++++++-------- 3 files changed, 59 insertions(+), 104 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/CompositeBuffer.java b/src/main/java/io/netty/buffer/api/CompositeBuffer.java index 1a56a58..8906dab 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuffer.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuffer.java @@ -48,7 +48,6 @@ final class CompositeBuffer extends RcSupport implement private final BufferAllocator allocator; private final TornBufferAccessors tornBufAccessors; - private final boolean isSendable; private Buffer[] bufs; private int[] offsets; // The offset, for the composite buffer, where each constituent buffer starts. private int capacity; @@ -60,7 +59,7 @@ final class CompositeBuffer extends RcSupport implement private boolean readOnly; CompositeBuffer(BufferAllocator allocator, Deref[] refs) { - this(allocator, true, filterExternalBufs(refs), COMPOSITE_DROP, false); + this(allocator, filterExternalBufs(refs), COMPOSITE_DROP, false); } private static Buffer[] filterExternalBufs(Deref[] refs) { @@ -114,11 +113,10 @@ final class CompositeBuffer extends RcSupport implement return Stream.of(buf); } - private CompositeBuffer(BufferAllocator allocator, boolean isSendable, Buffer[] bufs, Drop drop, + private CompositeBuffer(BufferAllocator allocator, Buffer[] bufs, Drop drop, boolean acquireBufs) { super(drop); this.allocator = allocator; - this.isSendable = isSendable; if (acquireBufs) { for (Buffer buf : bufs) { buf.acquire(); @@ -305,46 +303,31 @@ final class CompositeBuffer extends RcSupport implement offset + ", and length was " + length + '.'); } Buffer choice = (Buffer) chooseBuffer(offset, 0); - Buffer[] slices = null; - acquire(); // Increase reference count of the original composite buffer. - Drop drop = obj -> { - close(); // Decrement the reference count of the original composite buffer. - COMPOSITE_DROP.drop(obj); - }; + Buffer[] slices; - try { - if (length > 0) { - slices = new Buffer[bufs.length]; - int off = subOffset; - int cap = length; - int i; - for (i = searchOffsets(offset); cap > 0; i++) { - var buf = bufs[i]; - int avail = buf.capacity() - off; - slices[i] = buf.slice(off, Math.min(cap, avail)); - cap -= avail; - off = 0; - } - slices = Arrays.copyOf(slices, i); - } else { - // Specialize for length == 0, since we must slice from at least one constituent buffer. - slices = new Buffer[] { choice.slice(subOffset, 0) }; - } - - return new CompositeBuffer(allocator, false, slices, drop, true); - } catch (Throwable throwable) { - // We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer: - close(); - throw throwable; - } finally { - if (slices != null) { - for (Buffer slice : slices) { - if (slice != null) { - slice.close(); // Ownership now transfers to the composite buffer. - } - } + if (length > 0) { + slices = new Buffer[bufs.length]; + int off = subOffset; + int cap = length; + int i; + for (i = searchOffsets(offset); cap > 0; i++) { + var buf = bufs[i]; + int avail = buf.capacity() - off; + slices[i] = buf.slice(off, Math.min(cap, avail)); + cap -= avail; + off = 0; } + slices = Arrays.copyOf(slices, i); + } else { + // Specialize for length == 0, since we must slice from at least one constituent buffer. + slices = new Buffer[] { choice.slice(subOffset, 0) }; } + + // Use the constructor that skips filtering out empty buffers, and skips acquiring on the buffers. + // This is important because 1) slice() already acquired the buffers, and 2) if this slice is empty + // then we need to keep holding on to it to prevent this originating composite buffer from getting + // ownership. If it did, its behaviour would be inconsistent with that of a non-composite buffer. + return new CompositeBuffer(allocator, slices, COMPOSITE_DROP, false); } @Override @@ -757,7 +740,7 @@ final class CompositeBuffer extends RcSupport implement } if (bufs.length == 0) { // Bifurcating a zero-length buffer is trivial. - return new CompositeBuffer(allocator, true, bufs, unsafeGetDrop(), true).order(order); + return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order); } int i = searchOffsets(woff); @@ -769,7 +752,7 @@ final class CompositeBuffer extends RcSupport implement } computeBufferOffsets(); try { - var compositeBuf = new CompositeBuffer(allocator, true, bifs, unsafeGetDrop(), true); + var compositeBuf = new CompositeBuffer(allocator, bifs, unsafeGetDrop(), true); compositeBuf.order = order; // Preserve byte order even if bifs array is empty. return compositeBuf; } finally { @@ -1172,7 +1155,7 @@ final class CompositeBuffer extends RcSupport implement for (int i = 0; i < sends.length; i++) { received[i] = sends[i].receive(); } - var composite = new CompositeBuffer(allocator, true, received, drop, true); + var composite = new CompositeBuffer(allocator, received, drop, true); composite.readOnly = readOnly; drop.attach(composite); return composite; @@ -1187,18 +1170,9 @@ final class CompositeBuffer extends RcSupport implement closed = true; } - @Override - protected IllegalStateException notSendableException() { - if (!isSendable) { - return new IllegalStateException( - "Cannot send() this buffer. This buffer might be a slice of another buffer."); - } - return super.notSendableException(); - } - @Override public boolean isOwned() { - return isSendable && super.isOwned() && allConstituentsAreOwned(); + return super.isOwned() && allConstituentsAreOwned(); } private boolean allConstituentsAreOwned() { diff --git a/src/main/java/io/netty/buffer/api/memseg/HeapMemorySegmentManager.java b/src/main/java/io/netty/buffer/api/memseg/HeapMemorySegmentManager.java index 01ec996..f62e0de 100644 --- a/src/main/java/io/netty/buffer/api/memseg/HeapMemorySegmentManager.java +++ b/src/main/java/io/netty/buffer/api/memseg/HeapMemorySegmentManager.java @@ -15,8 +15,6 @@ */ package io.netty.buffer.api.memseg; -import io.netty.buffer.api.Buffer; -import io.netty.buffer.api.Drop; import jdk.incubator.foreign.MemorySegment; public class HeapMemorySegmentManager extends AbstractMemorySegmentManager { @@ -29,15 +27,4 @@ public class HeapMemorySegmentManager extends AbstractMemorySegmentManager { protected MemorySegment createSegment(long size) { return MemorySegment.ofArray(new byte[Math.toIntExact(size)]); } - - @Override - public Drop drop() { - return convert(buf -> buf.makeInaccessible()); - } - - @SuppressWarnings({ "unchecked", "UnnecessaryLocalVariable" }) - private static Drop convert(Drop drop) { - Drop tmp = drop; - return (Drop) tmp; - } } diff --git a/src/test/java/io/netty/buffer/api/BufferTest.java b/src/test/java/io/netty/buffer/api/BufferTest.java index 82de3fe..d542abf 100644 --- a/src/test/java/io/netty/buffer/api/BufferTest.java +++ b/src/test/java/io/netty/buffer/api/BufferTest.java @@ -63,8 +63,6 @@ public class BufferTest { private static final Memoize ALL_COMBINATIONS = new Memoize<>( () -> fixtureCombinations().toArray(Fixture[]::new)); - private static final Memoize NON_SLICED = new Memoize<>( - () -> Arrays.stream(ALL_COMBINATIONS.get()).filter(f -> !f.isSlice()).toArray(Fixture[]::new)); private static final Memoize NON_COMPOSITE = new Memoize<>( () -> Arrays.stream(ALL_COMBINATIONS.get()).filter(f -> !f.isComposite()).toArray(Fixture[]::new)); private static final Memoize HEAP_ALLOCS = new Memoize<>( @@ -78,10 +76,6 @@ public class BufferTest { return ALL_COMBINATIONS.get(); } - static Fixture[] nonSliceAllocators() { - return NON_SLICED.get(); - } - static Fixture[] nonCompositeAllocators() { return NON_COMPOSITE.get(); } @@ -349,7 +343,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") void allocateAndSendToThread(Fixture fixture) throws Exception { try (BufferAllocator allocator = fixture.createAllocator()) { ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(10); @@ -369,7 +363,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") void allocateAndSendToThreadViaSyncQueue(Fixture fixture) throws Exception { SynchronousQueue> queue = new SynchronousQueue<>(); Future future = executor.submit(() -> { @@ -388,7 +382,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") void sendMustThrowWhenBufIsAcquired(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8)) { @@ -403,7 +397,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void originalBufferMustNotBeAccessibleAfterSend(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer orig = allocator.allocate(24)) { @@ -505,7 +499,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void cannotSendMoreThanOnce(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8)) { @@ -822,7 +816,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") void sendOnSliceWithoutOffsetAndSizeMustThrow(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8)) { @@ -837,7 +831,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") void sendOnSliceWithOffsetAndSizeMustThrow(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8)) { @@ -1782,7 +1776,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void ensureWritableMustThrowForNegativeSize(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8)) { @@ -1791,7 +1785,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void ensureWritableMustThrowIfRequestedSizeWouldGrowBeyondMaxAllowed(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(512)) { @@ -1800,7 +1794,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void ensureWritableMustNotThrowWhenSpaceIsAlreadyAvailable(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8)) { @@ -1811,7 +1805,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void ensureWritableMustExpandBufferCapacity(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8)) { @@ -1846,7 +1840,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void mustBeAbleToSliceAfterEnsureWritable(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(4)) { @@ -1861,7 +1855,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void ensureWritableOnCompositeBuffersMustRespectExistingBigEndianByteOrder(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator()) { Buffer composite; @@ -1878,7 +1872,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void ensureWritableOnCompositeBuffersMustRespectExistingLittleEndianByteOrder(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator()) { Buffer composite; @@ -1895,7 +1889,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void ensureWritableWithCompactionMustNotAllocateIfCompactionIsEnough(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(64)) { @@ -2220,7 +2214,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8)) { @@ -2233,7 +2227,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void bifurcatedPartMustContainFirstHalfOfBuffer(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(16).order(BIG_ENDIAN)) { @@ -2269,7 +2263,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void bifurcatedPartsMustBeIndividuallySendable(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(16).order(BIG_ENDIAN)) { @@ -2298,7 +2292,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void mustBePossibleToBifurcateMoreThanOnce(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(16).order(BIG_ENDIAN)) { @@ -2326,7 +2320,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void bifurcatedBufferMustHaveSameByteOrderAsParent(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8).order(BIG_ENDIAN)) { @@ -2344,7 +2338,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void ensureWritableOnBifurcatedBuffers(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8)) { @@ -2363,7 +2357,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void ensureWritableOnBifurcatedBuffersWithOddOffsets(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(10).order(BIG_ENDIAN)) { @@ -2412,7 +2406,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void bifurcatedBuffersMustBeAccessibleInOtherThreads(Fixture fixture) throws Exception { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8)) { @@ -2432,7 +2426,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void sendMustNotMakeBifurcatedBuffersInaccessible(Fixture fixture) throws Exception { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(16)) { @@ -2456,7 +2450,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void compactMustDiscardReadBytes(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(16, BIG_ENDIAN)) { @@ -2478,7 +2472,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void compactMustThrowForUnownedBuffer(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8, BIG_ENDIAN)) { @@ -2593,7 +2587,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void readOnlyBufferMustRemainReadOnlyAfterSend(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8)) { @@ -2658,7 +2652,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void bifurcateOfReadOnlyBufferMustBeReadOnly(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(16)) { @@ -2710,7 +2704,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void compactOnReadOnlyBufferMustThrow(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8)) { @@ -2720,7 +2714,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void ensureWritableOnReadOnlyBufferMustThrow(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer buf = allocator.allocate(8)) { @@ -2730,7 +2724,7 @@ public class BufferTest { } @ParameterizedTest - @MethodSource("nonSliceAllocators") + @MethodSource("allocators") public void copyIntoOnReadOnlyBufferMustThrow(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); Buffer dest = allocator.allocate(8)) {