From 253b6cb9190b146fe045216cb79da8ad1843c775 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 15 Mar 2021 16:42:56 +0100 Subject: [PATCH] Allow slices to obtain ownership when parent is closed Motivation: It is kind of a weird internal and hidden state, that slices were special. For instance, slices could not be sent, and they could never obtain ownership. This means buffers from slices behaved differently from allocated buffers. In doing so, they violated both the principle that magic should stay hidden, and the principle of consistent behaviour. Modification: - The special reference-counting drop implementation that was added to support bifurcation, has been renamed to ArcDrop (for atomic reference counting). - The ArcDrop is then used throughout the MemSegBuffer implementation to account for every instance where multiple buffers reference the same memory, e.g. slices and the like. - Borrows of a buffer is then the sum of borrows from the buffer itself, and its ArcDrop. - Ownership is thus tied to both the buffer itself being owned, and the ArcDrop being in an owned state. - SizeClassedMemoryPool is changed to pool recoverable memory instead of sends, because the sends could come from slices. - We also take care to keep around a "base" memory segment, so that we don't return memory segment slices to the memory pool (doing so would leak the memory from the parent segment that is not part of the slice). - CleanerPooledDrop now keeps a weak reference to itself, rather than the buffer, which is more correct anyway, but now also required because we cannot rely on the buffer reference the cleaner was created with. - The CleanerPooledDrop now takes care to drop the buffer that is actually passed to it, rather than what it was referencing from some earlier point. - MemoryManager can now disclose the size of recoverable memory, so that SizeClassedMemoryPool can pick the correct size pool to return memory to. It cannot rely on the passed down buffer instance for this, because that buffer might have been a slice. Result: It is now possible for slices to obtain ownership when their parent buffer is closed. --- .../netty/buffer/api/CleanerPooledDrop.java | 43 ++++--- .../io/netty/buffer/api/MemoryManager.java | 1 + .../java/io/netty/buffer/api/RcSupport.java | 2 +- .../buffer/api/SizeClassedMemoryPool.java | 52 ++++---- .../memseg/AbstractMemorySegmentManager.java | 9 +- .../{BifurcatedDrop.java => ArcDrop.java} | 40 ++++-- .../netty/buffer/api/memseg/MemSegBuffer.java | 115 +++++++++++------- .../java/io/netty/buffer/api/BufferTest.java | 45 +++++++ 8 files changed, 208 insertions(+), 99 deletions(-) rename src/main/java/io/netty/buffer/api/memseg/{BifurcatedDrop.java => ArcDrop.java} (69%) diff --git a/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java b/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java index be8621f..8e4e785 100644 --- a/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java +++ b/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java @@ -45,6 +45,7 @@ class CleanerPooledDrop implements Drop { GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null); if (c != null) { c.clean(); + delegate.drop(buf); } } @@ -57,24 +58,38 @@ class CleanerPooledDrop implements Drop { c.clean(); } - var pool = this.pool; var mem = manager.unwrapRecoverableMemory(buf); - var delegate = this.delegate; - WeakReference ref = new WeakReference<>(buf); + WeakReference ref = new WeakReference<>(this); AtomicBoolean gate = new AtomicBoolean(true); - cleanable = new GatedCleanable(gate, CLEANER.register(this, () -> { - if (gate.getAndSet(false)) { - Buffer b = ref.get(); - if (b == null) { - pool.recoverMemory(mem); - } else { - delegate.drop(b); - } - } - })); + cleanable = new GatedCleanable(gate, CLEANER.register(this, new CleanAction(pool, mem, ref, gate))); } - private static class GatedCleanable implements Cleanable { + private static final class CleanAction implements Runnable { + private final SizeClassedMemoryPool pool; + private final Object mem; + private final WeakReference ref; + private final AtomicBoolean gate; + + private CleanAction(SizeClassedMemoryPool pool, Object mem, WeakReference ref, + AtomicBoolean gate) { + this.pool = pool; + this.mem = mem; + this.ref = ref; + this.gate = gate; + } + + @Override + public void run() { + if (gate.getAndSet(false)) { + var monitored = ref.get(); + if (monitored == null) { + pool.recoverMemory(mem); + } + } + } + } + + private static final class GatedCleanable implements Cleanable { private final AtomicBoolean gate; private final Cleanable cleanable; diff --git a/src/main/java/io/netty/buffer/api/MemoryManager.java b/src/main/java/io/netty/buffer/api/MemoryManager.java index c3e6dfd..024d2df 100644 --- a/src/main/java/io/netty/buffer/api/MemoryManager.java +++ b/src/main/java/io/netty/buffer/api/MemoryManager.java @@ -34,5 +34,6 @@ public interface MemoryManager { Buffer allocateShared(AllocatorControl allo, long size, Drop drop, Cleaner cleaner); Drop drop(); Object unwrapRecoverableMemory(Buffer buf); + int capacityOfRecoverableMemory(Object memory); Buffer recoverMemory(Object recoverableMemory, Drop drop); } diff --git a/src/main/java/io/netty/buffer/api/RcSupport.java b/src/main/java/io/netty/buffer/api/RcSupport.java index 2332342..48ddbaf 100644 --- a/src/main/java/io/netty/buffer/api/RcSupport.java +++ b/src/main/java/io/netty/buffer/api/RcSupport.java @@ -37,7 +37,7 @@ public abstract class RcSupport, T extends RcSupport> impl @Override public final I acquire() { if (acquires < 0) { - throw attachTrace(new IllegalStateException("Resource is closed.")); + throw attachTrace(new IllegalStateException("This resource is closed: " + this + '.')); } if (acquires == Integer.MAX_VALUE) { throw new IllegalStateException("Cannot acquire more references; counter would overflow."); diff --git a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java index 08a6949..5b336c4 100644 --- a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java +++ b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java @@ -28,7 +28,7 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop>> pool; + private final ConcurrentHashMap> pool; @SuppressWarnings("unused") private volatile boolean closed; @@ -41,9 +41,9 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop send = sizeClassPool.poll(); - if (send != null) { - return send.receive() + Object memory = sizeClassPool.poll(); + if (memory != null) { + return recoverMemoryIntoBuffer(memory) .reset() .readOnly(false) .fill((byte) 0) @@ -71,10 +71,10 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop(4); pool.forEach((k, v) -> { - Send send; - while ((send = v.poll()) != null) { + Object memory; + while ((memory = v.poll()) != null) { try { - send.receive().close(); + dispose(recoverMemoryIntoBuffer(memory)); } catch (Exception e) { capturedExceptions.add(e); } @@ -94,12 +94,13 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop send; - while ((send = sizeClassPool.poll()) != null) { - send.receive().close(); + Object memory; + while ((memory = sizeClassPool.poll()) != null) { + dispose(recoverMemoryIntoBuffer(memory)); } } } @@ -107,27 +108,28 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop send = sizeClassPool.poll(); - Buffer untetheredBuf; - if (send != null) { - var transfer = (TransferSend) send; - var owned = transfer.unsafeUnwrapOwned(); - untetheredBuf = owned.transferOwnership(NO_OP_DROP); - } else { - untetheredBuf = createBuf(size, NO_OP_DROP); + Object memory = sizeClassPool.poll(); + if (memory == null) { + Buffer untetheredBuf = createBuf(size, NO_OP_DROP); + memory = manager.unwrapRecoverableMemory(untetheredBuf); } - return manager.unwrapRecoverableMemory(untetheredBuf); + return memory; } @Override public void recoverMemory(Object memory) { - var drop = getDrop(); - var buf = manager.recoverMemory(memory, drop); - drop.attach(buf); + Buffer buf = recoverMemoryIntoBuffer(memory); buf.close(); } - private ConcurrentLinkedQueue> getSizeClassPool(int size) { + private Buffer recoverMemoryIntoBuffer(Object memory) { + var drop = getDrop(); + var buf = manager.recoverMemory(memory, drop); + drop.attach(buf); + return buf; + } + + private ConcurrentLinkedQueue getSizeClassPool(int size) { return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>()); } diff --git a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java index 797970b..db7b359 100644 --- a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java +++ b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java @@ -34,7 +34,7 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager { if (cleaner != null) { segment = segment.registerCleaner(cleaner); } - return new MemSegBuffer(segment, convert(drop), alloc); + return new MemSegBuffer(segment, segment, convert(drop), alloc); } @Override @@ -43,7 +43,7 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager { if (cleaner != null) { segment = segment.registerCleaner(cleaner); } - return new MemSegBuffer(segment, convert(drop), alloc); + return new MemSegBuffer(segment, segment, convert(drop), alloc); } protected abstract MemorySegment createSegment(long size); @@ -59,6 +59,11 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager { return b.recoverableMemory(); } + @Override + public int capacityOfRecoverableMemory(Object memory) { + return ((RecoverableMemory) memory).capacity(); + } + @Override public Buffer recoverMemory(Object recoverableMemory, Drop drop) { var recovery = (RecoverableMemory) recoverableMemory; diff --git a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java b/src/main/java/io/netty/buffer/api/memseg/ArcDrop.java similarity index 69% rename from src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java rename to src/main/java/io/netty/buffer/api/memseg/ArcDrop.java index 4a72f07..3e8bad5 100644 --- a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java +++ b/src/main/java/io/netty/buffer/api/memseg/ArcDrop.java @@ -20,33 +20,47 @@ import io.netty.buffer.api.Drop; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; -class BifurcatedDrop implements Drop { +final class ArcDrop implements Drop { private static final VarHandle COUNT; static { try { - COUNT = MethodHandles.lookup().findVarHandle(BifurcatedDrop.class, "count", int.class); + COUNT = MethodHandles.lookup().findVarHandle(ArcDrop.class, "count", int.class); } catch (Exception e) { throw new ExceptionInInitializerError(e); } } - private final MemSegBuffer originalBuf; private final Drop delegate; @SuppressWarnings("FieldMayBeFinal") private volatile int count; - BifurcatedDrop(MemSegBuffer originalBuf, Drop delegate) { - this.originalBuf = originalBuf; + ArcDrop(Drop delegate) { this.delegate = delegate; - count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop. + count = 1; } - void increment() { + static Drop wrap(Drop drop) { + if (drop.getClass() == ArcDrop.class) { + return drop; + } + return new ArcDrop(drop); + } + + static Drop acquire(Drop drop) { + if (drop.getClass() == ArcDrop.class) { + ((ArcDrop) drop).increment(); + return drop; + } + return new ArcDrop(drop); + } + + ArcDrop increment() { int c; do { c = count; checkValidState(c); } while (!COUNT.compareAndSet(this, c, c + 1)); + return this; } @Override @@ -59,10 +73,8 @@ class BifurcatedDrop implements Drop { checkValidState(c); } while (!COUNT.compareAndSet(this, c, n)); if (n == 0) { - delegate.attach(originalBuf); - delegate.drop(originalBuf); + delegate.drop(buf); } - buf.makeInaccessible(); } @Override @@ -70,6 +82,14 @@ class BifurcatedDrop implements Drop { delegate.attach(obj); } + boolean isOwned() { + return count <= 1; + } + + int countBorrows() { + return count - 1; + } + Drop unwrap() { return delegate; } diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java index 547a8d8..bf0d17a 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java @@ -59,31 +59,58 @@ class MemSegBuffer extends RcSupport implements Buffer, Re CLOSED_SEGMENT = MemorySegment.ofArray(new byte[0]); CLOSED_SEGMENT.close(); SEGMENT_CLOSE = buf -> { - try (var ignore = buf.seg) { - buf.makeInaccessible(); - } + buf.base.close(); }; } private final AllocatorControl alloc; - private final boolean isSendable; + private MemorySegment base; private MemorySegment seg; private MemorySegment wseg; private ByteOrder order; private int roff; private int woff; - MemSegBuffer(MemorySegment segmet, Drop drop, AllocatorControl alloc) { - this(segmet, drop, alloc, true); + MemSegBuffer(MemorySegment baseSegment, MemorySegment viewSegment, Drop drop, AllocatorControl alloc) { + super(new MakeInaccisbleOnDrop(ArcDrop.wrap(drop))); + this.alloc = alloc; + base = baseSegment; + seg = viewSegment; + wseg = viewSegment; + order = ByteOrder.nativeOrder(); } - private MemSegBuffer(MemorySegment segment, Drop drop, AllocatorControl alloc, boolean isSendable) { - super(drop); - this.alloc = alloc; - seg = segment; - wseg = segment; - this.isSendable = isSendable; - order = ByteOrder.nativeOrder(); + private static final class MakeInaccisbleOnDrop implements Drop { + final Drop delegate; + + private MakeInaccisbleOnDrop(Drop delegate) { + this.delegate = delegate; + } + + @Override + public void drop(MemSegBuffer buf) { + try { + delegate.drop(buf); + } finally { + buf.makeInaccessible(); + } + } + + @Override + public void attach(MemSegBuffer buf) { + delegate.attach(buf); + } + } + + @Override + protected Drop unsafeGetDrop() { + MakeInaccisbleOnDrop drop = (MakeInaccisbleOnDrop) super.unsafeGetDrop(); + return drop.delegate; + } + + @Override + protected void unsafeSetDrop(Drop replacement) { + super.unsafeSetDrop(new MakeInaccisbleOnDrop(replacement)); } @Override @@ -233,14 +260,12 @@ class MemSegBuffer extends RcSupport implements Buffer, Re if (length < 0) { throw new IllegalArgumentException("Length cannot be negative: " + length + '.'); } + if (!isAccessible()) { + throw new IllegalStateException("This buffer is closed: " + this + '.'); + } var slice = seg.asSlice(offset, length); - acquire(); - Drop drop = b -> { - close(); - b.makeInaccessible(); - }; - var sendable = false; // Sending implies ownership change, which we can't do for slices. - return new MemSegBuffer(slice, drop, alloc, sendable) + Drop drop = ArcDrop.acquire(unsafeGetDrop()); + return new MemSegBuffer(base, slice, drop, alloc) .writerOffset(length) .order(order()) .readOnly(readOnly()); @@ -482,19 +507,22 @@ class MemSegBuffer extends RcSupport implements Buffer, Re // Release old memory segment: var drop = unsafeGetDrop(); - if (drop instanceof BifurcatedDrop) { - // Disconnect from the bifurcated drop, since we'll get our own fresh memory segment. + if (drop instanceof ArcDrop) { + // Disconnect from the current arc drop, since we'll get our own fresh memory segment. int roff = this.roff; int woff = this.woff; drop.drop(this); - drop = ((BifurcatedDrop) drop).unwrap(); - unsafeSetDrop(drop); + while (drop instanceof ArcDrop) { + drop = ((ArcDrop) drop).unwrap(); + } + unsafeSetDrop(new ArcDrop(drop)); this.roff = roff; this.woff = woff; } else { alloc.recoverMemory(recoverableMemory()); } + base = newSegment; seg = newSegment; wseg = newSegment; drop.attach(this); @@ -505,19 +533,10 @@ class MemSegBuffer extends RcSupport implements Buffer, Re if (!isOwned()) { throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); } - var drop = unsafeGetDrop(); - if (seg.ownerThread() != null) { - seg = seg.share(); - drop.attach(this); - } - if (drop instanceof BifurcatedDrop) { - ((BifurcatedDrop) drop).increment(); - } else { - drop = new BifurcatedDrop(new MemSegBuffer(seg, drop, alloc), drop); - unsafeSetDrop(drop); - } + var drop = (ArcDrop) unsafeGetDrop(); + unsafeSetDrop(new ArcDrop(drop)); var bifurcatedSeg = seg.asSlice(0, woff); - var bifurcatedBuf = new MemSegBuffer(bifurcatedSeg, drop, alloc); + var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop(drop.increment()), alloc); bifurcatedBuf.woff = woff; bifurcatedBuf.roff = roff; bifurcatedBuf.order(order); @@ -1052,11 +1071,12 @@ class MemSegBuffer extends RcSupport implements Buffer, Re var readOnly = readOnly(); boolean isConfined = seg.ownerThread() == null; MemorySegment transferSegment = isConfined? seg : seg.share(); + MemorySegment base = this.base; makeInaccessible(); return new Owned() { @Override public MemSegBuffer transferOwnership(Drop drop) { - MemSegBuffer copy = new MemSegBuffer(transferSegment, drop, alloc); + MemSegBuffer copy = new MemSegBuffer(base, transferSegment, drop, alloc); copy.order = order; copy.roff = roff; copy.woff = woff; @@ -1067,6 +1087,7 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } void makeInaccessible() { + base = CLOSED_SEGMENT; seg = CLOSED_SEGMENT; wseg = CLOSED_SEGMENT; roff = 0; @@ -1074,17 +1095,13 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } @Override - protected IllegalStateException notSendableException() { - if (!isSendable) { - return new IllegalStateException( - "Cannot send() this buffer. This buffer might be a slice of another buffer."); - } - return super.notSendableException(); + public boolean isOwned() { + return super.isOwned() && ((ArcDrop) unsafeGetDrop()).isOwned(); } @Override - public boolean isOwned() { - return isSendable && super.isOwned(); + public int countBorrows() { + return super.countBorrows() + ((ArcDrop) unsafeGetDrop()).countBorrows(); } private void checkRead(int index, int size) { @@ -1153,7 +1170,7 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } Object recoverableMemory() { - return new RecoverableMemory(seg, alloc); + return new RecoverableMemory(base, alloc); } // @@ -1226,7 +1243,11 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } Buffer recover(Drop drop) { - return new MemSegBuffer(segment, drop, alloc); + return new MemSegBuffer(segment, segment, ArcDrop.acquire(drop), alloc); + } + + int capacity() { + return (int) segment.byteSize(); } } } diff --git a/src/test/java/io/netty/buffer/api/BufferTest.java b/src/test/java/io/netty/buffer/api/BufferTest.java index 2f4d998..82de3fe 100644 --- a/src/test/java/io/netty/buffer/api/BufferTest.java +++ b/src/test/java/io/netty/buffer/api/BufferTest.java @@ -926,6 +926,51 @@ public class BufferTest { } } + @ParameterizedTest + @MethodSource("nonCompositeAllocators") + public void acquireComposingAndSlicingMustIncrementBorrowsWithData(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(8)) { + buf.writeByte((byte) 1); + int borrows = buf.countBorrows(); + try (Buffer ignored = buf.acquire()) { + assertEquals(borrows + 1, buf.countBorrows()); + try (Buffer slice = buf.slice()) { + assertEquals(1, slice.capacity()); + int sliceBorrows = slice.countBorrows(); + assertEquals(borrows + 2, buf.countBorrows()); + try (Buffer ignored1 = Buffer.compose(allocator, buf, slice)) { + assertEquals(borrows + 3, buf.countBorrows()); + assertEquals(sliceBorrows + 1, slice.countBorrows()); + } + assertEquals(sliceBorrows, slice.countBorrows()); + assertEquals(borrows + 2, buf.countBorrows()); + } + assertEquals(borrows + 1, buf.countBorrows()); + } + assertEquals(borrows, buf.countBorrows()); + } + } + + @Disabled + @ParameterizedTest + @MethodSource("allocators") + public void sliceMustBecomeOwnedOnSourceBufferClose(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator()) { + Buffer buf = allocator.allocate(8); + buf.writeInt(42); + try (Buffer slice = buf.slice()) { + buf.close(); + assertFalse(buf.isAccessible()); + assertTrue(slice.isOwned()); + try (Buffer receive = slice.send().receive()) { + assertTrue(receive.isOwned()); + assertFalse(slice.isAccessible()); + } + } + } + } + @ParameterizedTest @MethodSource("allocators") void copyIntoByteArray(Fixture fixture) {