diff --git a/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java b/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java index be8621f..8e4e785 100644 --- a/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java +++ b/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java @@ -45,6 +45,7 @@ class CleanerPooledDrop implements Drop { GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null); if (c != null) { c.clean(); + delegate.drop(buf); } } @@ -57,24 +58,38 @@ class CleanerPooledDrop implements Drop { c.clean(); } - var pool = this.pool; var mem = manager.unwrapRecoverableMemory(buf); - var delegate = this.delegate; - WeakReference ref = new WeakReference<>(buf); + WeakReference ref = new WeakReference<>(this); AtomicBoolean gate = new AtomicBoolean(true); - cleanable = new GatedCleanable(gate, CLEANER.register(this, () -> { - if (gate.getAndSet(false)) { - Buffer b = ref.get(); - if (b == null) { - pool.recoverMemory(mem); - } else { - delegate.drop(b); - } - } - })); + cleanable = new GatedCleanable(gate, CLEANER.register(this, new CleanAction(pool, mem, ref, gate))); } - private static class GatedCleanable implements Cleanable { + private static final class CleanAction implements Runnable { + private final SizeClassedMemoryPool pool; + private final Object mem; + private final WeakReference ref; + private final AtomicBoolean gate; + + private CleanAction(SizeClassedMemoryPool pool, Object mem, WeakReference ref, + AtomicBoolean gate) { + this.pool = pool; + this.mem = mem; + this.ref = ref; + this.gate = gate; + } + + @Override + public void run() { + if (gate.getAndSet(false)) { + var monitored = ref.get(); + if (monitored == null) { + pool.recoverMemory(mem); + } + } + } + } + + private static final class GatedCleanable implements Cleanable { private final AtomicBoolean gate; private final Cleanable cleanable; diff --git a/src/main/java/io/netty/buffer/api/MemoryManager.java b/src/main/java/io/netty/buffer/api/MemoryManager.java index c3e6dfd..024d2df 100644 --- a/src/main/java/io/netty/buffer/api/MemoryManager.java +++ b/src/main/java/io/netty/buffer/api/MemoryManager.java @@ -34,5 +34,6 @@ public interface MemoryManager { Buffer allocateShared(AllocatorControl allo, long size, Drop drop, Cleaner cleaner); Drop drop(); Object unwrapRecoverableMemory(Buffer buf); + int capacityOfRecoverableMemory(Object memory); Buffer recoverMemory(Object recoverableMemory, Drop drop); } diff --git a/src/main/java/io/netty/buffer/api/RcSupport.java b/src/main/java/io/netty/buffer/api/RcSupport.java index 2332342..48ddbaf 100644 --- a/src/main/java/io/netty/buffer/api/RcSupport.java +++ b/src/main/java/io/netty/buffer/api/RcSupport.java @@ -37,7 +37,7 @@ public abstract class RcSupport, T extends RcSupport> impl @Override public final I acquire() { if (acquires < 0) { - throw attachTrace(new IllegalStateException("Resource is closed.")); + throw attachTrace(new IllegalStateException("This resource is closed: " + this + '.')); } if (acquires == Integer.MAX_VALUE) { throw new IllegalStateException("Cannot acquire more references; counter would overflow."); diff --git a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java index 08a6949..5b336c4 100644 --- a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java +++ b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java @@ -28,7 +28,7 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop>> pool; + private final ConcurrentHashMap> pool; @SuppressWarnings("unused") private volatile boolean closed; @@ -41,9 +41,9 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop send = sizeClassPool.poll(); - if (send != null) { - return send.receive() + Object memory = sizeClassPool.poll(); + if (memory != null) { + return recoverMemoryIntoBuffer(memory) .reset() .readOnly(false) .fill((byte) 0) @@ -71,10 +71,10 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop(4); pool.forEach((k, v) -> { - Send send; - while ((send = v.poll()) != null) { + Object memory; + while ((memory = v.poll()) != null) { try { - send.receive().close(); + dispose(recoverMemoryIntoBuffer(memory)); } catch (Exception e) { capturedExceptions.add(e); } @@ -94,12 +94,13 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop send; - while ((send = sizeClassPool.poll()) != null) { - send.receive().close(); + Object memory; + while ((memory = sizeClassPool.poll()) != null) { + dispose(recoverMemoryIntoBuffer(memory)); } } } @@ -107,27 +108,28 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop send = sizeClassPool.poll(); - Buffer untetheredBuf; - if (send != null) { - var transfer = (TransferSend) send; - var owned = transfer.unsafeUnwrapOwned(); - untetheredBuf = owned.transferOwnership(NO_OP_DROP); - } else { - untetheredBuf = createBuf(size, NO_OP_DROP); + Object memory = sizeClassPool.poll(); + if (memory == null) { + Buffer untetheredBuf = createBuf(size, NO_OP_DROP); + memory = manager.unwrapRecoverableMemory(untetheredBuf); } - return manager.unwrapRecoverableMemory(untetheredBuf); + return memory; } @Override public void recoverMemory(Object memory) { - var drop = getDrop(); - var buf = manager.recoverMemory(memory, drop); - drop.attach(buf); + Buffer buf = recoverMemoryIntoBuffer(memory); buf.close(); } - private ConcurrentLinkedQueue> getSizeClassPool(int size) { + private Buffer recoverMemoryIntoBuffer(Object memory) { + var drop = getDrop(); + var buf = manager.recoverMemory(memory, drop); + drop.attach(buf); + return buf; + } + + private ConcurrentLinkedQueue getSizeClassPool(int size) { return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>()); } diff --git a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java index 797970b..db7b359 100644 --- a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java +++ b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java @@ -34,7 +34,7 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager { if (cleaner != null) { segment = segment.registerCleaner(cleaner); } - return new MemSegBuffer(segment, convert(drop), alloc); + return new MemSegBuffer(segment, segment, convert(drop), alloc); } @Override @@ -43,7 +43,7 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager { if (cleaner != null) { segment = segment.registerCleaner(cleaner); } - return new MemSegBuffer(segment, convert(drop), alloc); + return new MemSegBuffer(segment, segment, convert(drop), alloc); } protected abstract MemorySegment createSegment(long size); @@ -59,6 +59,11 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager { return b.recoverableMemory(); } + @Override + public int capacityOfRecoverableMemory(Object memory) { + return ((RecoverableMemory) memory).capacity(); + } + @Override public Buffer recoverMemory(Object recoverableMemory, Drop drop) { var recovery = (RecoverableMemory) recoverableMemory; diff --git a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java b/src/main/java/io/netty/buffer/api/memseg/ArcDrop.java similarity index 69% rename from src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java rename to src/main/java/io/netty/buffer/api/memseg/ArcDrop.java index 4a72f07..3e8bad5 100644 --- a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java +++ b/src/main/java/io/netty/buffer/api/memseg/ArcDrop.java @@ -20,33 +20,47 @@ import io.netty.buffer.api.Drop; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; -class BifurcatedDrop implements Drop { +final class ArcDrop implements Drop { private static final VarHandle COUNT; static { try { - COUNT = MethodHandles.lookup().findVarHandle(BifurcatedDrop.class, "count", int.class); + COUNT = MethodHandles.lookup().findVarHandle(ArcDrop.class, "count", int.class); } catch (Exception e) { throw new ExceptionInInitializerError(e); } } - private final MemSegBuffer originalBuf; private final Drop delegate; @SuppressWarnings("FieldMayBeFinal") private volatile int count; - BifurcatedDrop(MemSegBuffer originalBuf, Drop delegate) { - this.originalBuf = originalBuf; + ArcDrop(Drop delegate) { this.delegate = delegate; - count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop. + count = 1; } - void increment() { + static Drop wrap(Drop drop) { + if (drop.getClass() == ArcDrop.class) { + return drop; + } + return new ArcDrop(drop); + } + + static Drop acquire(Drop drop) { + if (drop.getClass() == ArcDrop.class) { + ((ArcDrop) drop).increment(); + return drop; + } + return new ArcDrop(drop); + } + + ArcDrop increment() { int c; do { c = count; checkValidState(c); } while (!COUNT.compareAndSet(this, c, c + 1)); + return this; } @Override @@ -59,10 +73,8 @@ class BifurcatedDrop implements Drop { checkValidState(c); } while (!COUNT.compareAndSet(this, c, n)); if (n == 0) { - delegate.attach(originalBuf); - delegate.drop(originalBuf); + delegate.drop(buf); } - buf.makeInaccessible(); } @Override @@ -70,6 +82,14 @@ class BifurcatedDrop implements Drop { delegate.attach(obj); } + boolean isOwned() { + return count <= 1; + } + + int countBorrows() { + return count - 1; + } + Drop unwrap() { return delegate; } diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java index 547a8d8..bf0d17a 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java @@ -59,31 +59,58 @@ class MemSegBuffer extends RcSupport implements Buffer, Re CLOSED_SEGMENT = MemorySegment.ofArray(new byte[0]); CLOSED_SEGMENT.close(); SEGMENT_CLOSE = buf -> { - try (var ignore = buf.seg) { - buf.makeInaccessible(); - } + buf.base.close(); }; } private final AllocatorControl alloc; - private final boolean isSendable; + private MemorySegment base; private MemorySegment seg; private MemorySegment wseg; private ByteOrder order; private int roff; private int woff; - MemSegBuffer(MemorySegment segmet, Drop drop, AllocatorControl alloc) { - this(segmet, drop, alloc, true); + MemSegBuffer(MemorySegment baseSegment, MemorySegment viewSegment, Drop drop, AllocatorControl alloc) { + super(new MakeInaccisbleOnDrop(ArcDrop.wrap(drop))); + this.alloc = alloc; + base = baseSegment; + seg = viewSegment; + wseg = viewSegment; + order = ByteOrder.nativeOrder(); } - private MemSegBuffer(MemorySegment segment, Drop drop, AllocatorControl alloc, boolean isSendable) { - super(drop); - this.alloc = alloc; - seg = segment; - wseg = segment; - this.isSendable = isSendable; - order = ByteOrder.nativeOrder(); + private static final class MakeInaccisbleOnDrop implements Drop { + final Drop delegate; + + private MakeInaccisbleOnDrop(Drop delegate) { + this.delegate = delegate; + } + + @Override + public void drop(MemSegBuffer buf) { + try { + delegate.drop(buf); + } finally { + buf.makeInaccessible(); + } + } + + @Override + public void attach(MemSegBuffer buf) { + delegate.attach(buf); + } + } + + @Override + protected Drop unsafeGetDrop() { + MakeInaccisbleOnDrop drop = (MakeInaccisbleOnDrop) super.unsafeGetDrop(); + return drop.delegate; + } + + @Override + protected void unsafeSetDrop(Drop replacement) { + super.unsafeSetDrop(new MakeInaccisbleOnDrop(replacement)); } @Override @@ -233,14 +260,12 @@ class MemSegBuffer extends RcSupport implements Buffer, Re if (length < 0) { throw new IllegalArgumentException("Length cannot be negative: " + length + '.'); } + if (!isAccessible()) { + throw new IllegalStateException("This buffer is closed: " + this + '.'); + } var slice = seg.asSlice(offset, length); - acquire(); - Drop drop = b -> { - close(); - b.makeInaccessible(); - }; - var sendable = false; // Sending implies ownership change, which we can't do for slices. - return new MemSegBuffer(slice, drop, alloc, sendable) + Drop drop = ArcDrop.acquire(unsafeGetDrop()); + return new MemSegBuffer(base, slice, drop, alloc) .writerOffset(length) .order(order()) .readOnly(readOnly()); @@ -482,19 +507,22 @@ class MemSegBuffer extends RcSupport implements Buffer, Re // Release old memory segment: var drop = unsafeGetDrop(); - if (drop instanceof BifurcatedDrop) { - // Disconnect from the bifurcated drop, since we'll get our own fresh memory segment. + if (drop instanceof ArcDrop) { + // Disconnect from the current arc drop, since we'll get our own fresh memory segment. int roff = this.roff; int woff = this.woff; drop.drop(this); - drop = ((BifurcatedDrop) drop).unwrap(); - unsafeSetDrop(drop); + while (drop instanceof ArcDrop) { + drop = ((ArcDrop) drop).unwrap(); + } + unsafeSetDrop(new ArcDrop(drop)); this.roff = roff; this.woff = woff; } else { alloc.recoverMemory(recoverableMemory()); } + base = newSegment; seg = newSegment; wseg = newSegment; drop.attach(this); @@ -505,19 +533,10 @@ class MemSegBuffer extends RcSupport implements Buffer, Re if (!isOwned()) { throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); } - var drop = unsafeGetDrop(); - if (seg.ownerThread() != null) { - seg = seg.share(); - drop.attach(this); - } - if (drop instanceof BifurcatedDrop) { - ((BifurcatedDrop) drop).increment(); - } else { - drop = new BifurcatedDrop(new MemSegBuffer(seg, drop, alloc), drop); - unsafeSetDrop(drop); - } + var drop = (ArcDrop) unsafeGetDrop(); + unsafeSetDrop(new ArcDrop(drop)); var bifurcatedSeg = seg.asSlice(0, woff); - var bifurcatedBuf = new MemSegBuffer(bifurcatedSeg, drop, alloc); + var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop(drop.increment()), alloc); bifurcatedBuf.woff = woff; bifurcatedBuf.roff = roff; bifurcatedBuf.order(order); @@ -1052,11 +1071,12 @@ class MemSegBuffer extends RcSupport implements Buffer, Re var readOnly = readOnly(); boolean isConfined = seg.ownerThread() == null; MemorySegment transferSegment = isConfined? seg : seg.share(); + MemorySegment base = this.base; makeInaccessible(); return new Owned() { @Override public MemSegBuffer transferOwnership(Drop drop) { - MemSegBuffer copy = new MemSegBuffer(transferSegment, drop, alloc); + MemSegBuffer copy = new MemSegBuffer(base, transferSegment, drop, alloc); copy.order = order; copy.roff = roff; copy.woff = woff; @@ -1067,6 +1087,7 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } void makeInaccessible() { + base = CLOSED_SEGMENT; seg = CLOSED_SEGMENT; wseg = CLOSED_SEGMENT; roff = 0; @@ -1074,17 +1095,13 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } @Override - protected IllegalStateException notSendableException() { - if (!isSendable) { - return new IllegalStateException( - "Cannot send() this buffer. This buffer might be a slice of another buffer."); - } - return super.notSendableException(); + public boolean isOwned() { + return super.isOwned() && ((ArcDrop) unsafeGetDrop()).isOwned(); } @Override - public boolean isOwned() { - return isSendable && super.isOwned(); + public int countBorrows() { + return super.countBorrows() + ((ArcDrop) unsafeGetDrop()).countBorrows(); } private void checkRead(int index, int size) { @@ -1153,7 +1170,7 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } Object recoverableMemory() { - return new RecoverableMemory(seg, alloc); + return new RecoverableMemory(base, alloc); } // @@ -1226,7 +1243,11 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } Buffer recover(Drop drop) { - return new MemSegBuffer(segment, drop, alloc); + return new MemSegBuffer(segment, segment, ArcDrop.acquire(drop), alloc); + } + + int capacity() { + return (int) segment.byteSize(); } } } diff --git a/src/test/java/io/netty/buffer/api/BufferTest.java b/src/test/java/io/netty/buffer/api/BufferTest.java index 2f4d998..82de3fe 100644 --- a/src/test/java/io/netty/buffer/api/BufferTest.java +++ b/src/test/java/io/netty/buffer/api/BufferTest.java @@ -926,6 +926,51 @@ public class BufferTest { } } + @ParameterizedTest + @MethodSource("nonCompositeAllocators") + public void acquireComposingAndSlicingMustIncrementBorrowsWithData(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(8)) { + buf.writeByte((byte) 1); + int borrows = buf.countBorrows(); + try (Buffer ignored = buf.acquire()) { + assertEquals(borrows + 1, buf.countBorrows()); + try (Buffer slice = buf.slice()) { + assertEquals(1, slice.capacity()); + int sliceBorrows = slice.countBorrows(); + assertEquals(borrows + 2, buf.countBorrows()); + try (Buffer ignored1 = Buffer.compose(allocator, buf, slice)) { + assertEquals(borrows + 3, buf.countBorrows()); + assertEquals(sliceBorrows + 1, slice.countBorrows()); + } + assertEquals(sliceBorrows, slice.countBorrows()); + assertEquals(borrows + 2, buf.countBorrows()); + } + assertEquals(borrows + 1, buf.countBorrows()); + } + assertEquals(borrows, buf.countBorrows()); + } + } + + @Disabled + @ParameterizedTest + @MethodSource("allocators") + public void sliceMustBecomeOwnedOnSourceBufferClose(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator()) { + Buffer buf = allocator.allocate(8); + buf.writeInt(42); + try (Buffer slice = buf.slice()) { + buf.close(); + assertFalse(buf.isAccessible()); + assertTrue(slice.isOwned()); + try (Buffer receive = slice.send().receive()) { + assertTrue(receive.isOwned()); + assertFalse(slice.isAccessible()); + } + } + } + } + @ParameterizedTest @MethodSource("allocators") void copyIntoByteArray(Fixture fixture) {