diff --git a/buffer/src/main/java/io/netty/buffer/b2/Allocator.java b/buffer/src/main/java/io/netty/buffer/b2/Allocator.java index 34a7c53..4cfe2b1 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Allocator.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Allocator.java @@ -5,7 +5,6 @@ import jdk.incubator.foreign.MemorySegment; import static io.netty.buffer.b2.BBuf.*; public interface Allocator extends AutoCloseable { - BBuf allocate(long size); @Override @@ -26,13 +25,15 @@ public interface Allocator extends AutoCloseable { return new Allocator() { @Override public BBuf allocate(long size) { - return new BBuf(MemorySegment.allocateNative(size), SEGMENT_CLOSE); + var segment = MemorySegment.allocateNative(size); + Statics.MEM_USAGE_NATIVE.add(size); + return new BBuf(segment, SEGMENT_CLOSE_NATIVE); } }; } static Allocator pooledHeap() { - return new SizeClassedMemoryPool() { + return new SizeClassedMemoryPool(false) { @Override protected MemorySegment createMemorySegment(long size) { return MemorySegment.ofArray(new byte[Math.toIntExact(size)]); @@ -41,11 +42,28 @@ public interface Allocator extends AutoCloseable { } static Allocator pooledDirect() { - return new SizeClassedMemoryPool() { + return new SizeClassedMemoryPool(true) { @Override protected MemorySegment createMemorySegment(long size) { return MemorySegment.allocateNative(size); } }; } + + static Allocator pooledDirectWithCleaner() { + return new SizeClassedMemoryPool(true) { + @Override + protected MemorySegment createMemorySegment(long size) { + return MemorySegment.allocateNative(size); + } + + @Override + protected BBuf createBBuf(MemorySegment segment) { + var drop = new NativeMemoryCleanerDrop(); + var buf = new BBuf(segment, drop); + drop.accept(buf); + return buf; + } + }; + } } diff --git a/buffer/src/main/java/io/netty/buffer/b2/BBuf.java b/buffer/src/main/java/io/netty/buffer/b2/BBuf.java index fb43aeb..b499dcc 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/BBuf.java +++ b/buffer/src/main/java/io/netty/buffer/b2/BBuf.java @@ -2,17 +2,19 @@ package io.netty.buffer.b2; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.util.internal.PlatformDependent; import jdk.incubator.foreign.MemoryAccess; -import jdk.incubator.foreign.MemoryAddress; import jdk.incubator.foreign.MemorySegment; -import java.lang.invoke.VarHandle; +import static io.netty.buffer.b2.Statics.*; public class BBuf extends Rc { static final Drop NO_DROP = buf -> {}; static final Drop SEGMENT_CLOSE = buf -> buf.segment.close(); - private final MemorySegment segment; + static final Drop SEGMENT_CLOSE_NATIVE = buf -> { + buf.segment.close(); + MEM_USAGE_NATIVE.add(-buf.segment.byteSize()); + }; + final MemorySegment segment; private long read; private long write; @@ -82,13 +84,11 @@ public class BBuf extends Rc { @Override protected BBuf prepareSend() { BBuf outer = this; - MemorySegment transferSegment = segment.withOwnerThread(Lazy.TRANSFER_OWNER); + MemorySegment transferSegment = segment.withOwnerThread(TRANSFER_OWNER); return new BBuf(transferSegment, NO_DROP) { @Override protected BBuf copy(Thread recipient, Drop drop) { - Object scope = PlatformDependent.getObject(transferSegment, Lazy.SCOPE); - PlatformDependent.putObject(scope, Lazy.OWNER, recipient); - VarHandle.fullFence(); + overwriteMemorySegmentOwner(transferSegment, recipient); BBuf copy = new BBuf(transferSegment, drop); copy.read = outer.read; copy.write = outer.write; @@ -96,11 +96,4 @@ public class BBuf extends Rc { } }; } - - private static class Lazy { - @SuppressWarnings("InstantiatingAThreadWithDefaultRunMethod") - private static final Thread TRANSFER_OWNER = new Thread("ByteBuf Transfer Owner"); - private static final long SCOPE = Statics.fieldOffset("jdk.internal.foreign.AbstractMemorySegmentImpl", "scope"); - private static final long OWNER = Statics.fieldOffset("jdk.internal.foreign.MemoryScope", "owner"); - } } diff --git a/buffer/src/main/java/io/netty/buffer/b2/Drop.java b/buffer/src/main/java/io/netty/buffer/b2/Drop.java index 9bab448..5e8e26e 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Drop.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Drop.java @@ -1,6 +1,12 @@ package io.netty.buffer.b2; +import java.util.function.Consumer; + @FunctionalInterface -public interface Drop> { +public interface Drop> extends Consumer { void drop(T obj); + + @Override + default void accept(T t) { + } } diff --git a/buffer/src/main/java/io/netty/buffer/b2/NativeMemoryCleanerDrop.java b/buffer/src/main/java/io/netty/buffer/b2/NativeMemoryCleanerDrop.java new file mode 100644 index 0000000..9464d54 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/b2/NativeMemoryCleanerDrop.java @@ -0,0 +1,38 @@ +package io.netty.buffer.b2; + +import java.lang.invoke.VarHandle; +import java.lang.ref.Cleaner; +import java.lang.ref.Cleaner.Cleanable; + +import static io.netty.buffer.b2.Statics.*; +import static java.lang.invoke.MethodHandles.*; + +class NativeMemoryCleanerDrop implements Drop { + private static final Cleaner CLEANER = Cleaner.create(); + private static final VarHandle CLEANABLE = + findVarHandle(lookup(), NativeMemoryCleanerDrop.class, "cleanable", Cleanable.class); + @SuppressWarnings("unused") + private volatile Cleanable cleanable; + + @Override + public void drop(BBuf buf) { + Cleanable c = (Cleanable) CLEANABLE.getAndSet(this, null); + if (c != null) { + c.clean(); + } + } + + @Override + public void accept(BBuf buf) { + drop(null); // Unregister old cleanable, if any, to avoid uncontrolled build-up. + var segment = buf.segment; + cleanable = CLEANER.register(this, () -> { + if (segment.isAlive()) { + // Clear owner so we can close from cleaner thread. + overwriteMemorySegmentOwner(segment, null); + segment.close(); + MEM_USAGE_NATIVE.add(-segment.byteSize()); + } + }); + } +} diff --git a/buffer/src/main/java/io/netty/buffer/b2/RendezvousSend.java b/buffer/src/main/java/io/netty/buffer/b2/RendezvousSend.java index b1e45ec..f8664c4 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/RendezvousSend.java +++ b/buffer/src/main/java/io/netty/buffer/b2/RendezvousSend.java @@ -46,6 +46,7 @@ class RendezvousSend> implements Send { } recipientLatch.await(); incoming = outgoing.copy(recipient, drop); + drop.accept(incoming); sentLatch.countDown(); } } diff --git a/buffer/src/main/java/io/netty/buffer/b2/SizeClassedMemoryPool.java b/buffer/src/main/java/io/netty/buffer/b2/SizeClassedMemoryPool.java index 70852b1..bd9bced 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/SizeClassedMemoryPool.java +++ b/buffer/src/main/java/io/netty/buffer/b2/SizeClassedMemoryPool.java @@ -11,11 +11,13 @@ import static java.lang.invoke.MethodHandles.*; abstract class SizeClassedMemoryPool implements Allocator, Drop { private static final VarHandle CLOSE = Statics.findVarHandle(lookup(), SizeClassedMemoryPool.class, "closed", boolean.class); private final ConcurrentHashMap>> pool; + private final Drop disposer; @SuppressWarnings("unused") private volatile boolean closed; - protected SizeClassedMemoryPool() { + protected SizeClassedMemoryPool(boolean allocatesNativeMemory) { pool = new ConcurrentHashMap<>(); + disposer = allocatesNativeMemory ? BBuf.SEGMENT_CLOSE_NATIVE : BBuf.SEGMENT_CLOSE; } @Override @@ -25,7 +27,17 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop { if (send != null) { return send.receive(); } - return new BBuf(createMemorySegment(size), this); + var segment = createMemorySegment(size); + Statics.MEM_USAGE_NATIVE.add(size); + return createBBuf(segment); + } + + protected BBuf createBBuf(MemorySegment segment) { + return new BBuf(segment, getDrop()); + } + + protected SizeClassedMemoryPool getDrop() { + return this; } protected abstract MemorySegment createMemorySegment(long size); @@ -58,7 +70,7 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop { return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>()); } - private static void dispose(BBuf buf) { - BBuf.SEGMENT_CLOSE.drop(buf); + private void dispose(BBuf buf) { + disposer.drop(buf); } } diff --git a/buffer/src/main/java/io/netty/buffer/b2/Statics.java b/buffer/src/main/java/io/netty/buffer/b2/Statics.java index 64a1c9d..6cf4814 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Statics.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Statics.java @@ -1,12 +1,20 @@ package io.netty.buffer.b2; import io.netty.util.internal.PlatformDependent; +import jdk.incubator.foreign.MemorySegment; import java.lang.invoke.MethodHandles.Lookup; import java.lang.invoke.VarHandle; import java.lang.reflect.Field; +import java.util.concurrent.atomic.LongAdder; interface Statics { + @SuppressWarnings("InstantiatingAThreadWithDefaultRunMethod") + Thread TRANSFER_OWNER = new Thread("ByteBuf Transfer Owner"); + long SCOPE = fieldOffset("jdk.internal.foreign.AbstractMemorySegmentImpl", "scope"); + long OWNER = fieldOffset("jdk.internal.foreign.MemoryScope", "owner"); + LongAdder MEM_USAGE_NATIVE = new LongAdder(); + static VarHandle findVarHandle(Lookup lookup, Class recv, String name, Class type) { try { return lookup.findVarHandle(recv, name, type); @@ -24,4 +32,10 @@ interface Statics { throw new ExceptionInInitializerError(e); } } + + static void overwriteMemorySegmentOwner(MemorySegment segment, Thread newOwner) { + Object scope = PlatformDependent.getObject(segment, SCOPE); + PlatformDependent.putObject(scope, OWNER, newOwner); + VarHandle.fullFence(); // Attempt to force visibility of overwritten final fields. + } } diff --git a/buffer/src/main/java/io/netty/buffer/b2/TransferSend.java b/buffer/src/main/java/io/netty/buffer/b2/TransferSend.java index 029bf22..bd14820 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/TransferSend.java +++ b/buffer/src/main/java/io/netty/buffer/b2/TransferSend.java @@ -1,6 +1,5 @@ package io.netty.buffer.b2; -import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import static io.netty.buffer.b2.Statics.*; @@ -23,6 +22,8 @@ class TransferSend> implements Send { if (!RECEIVED.compareAndSet(this, false, true)) { throw new IllegalStateException("This object has already been received."); } - return outgoing.copy(Thread.currentThread(), drop); + var copy = outgoing.copy(Thread.currentThread(), drop); + drop.accept(copy); + return copy; } } diff --git a/buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufWithCleanerTest.java b/buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufWithCleanerTest.java new file mode 100644 index 0000000..3670a3e --- /dev/null +++ b/buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufWithCleanerTest.java @@ -0,0 +1,33 @@ +package io.netty.buffer.b2; + +import org.junit.Test; + +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + +public class PooledDirectBBufWithCleanerTest extends BBufTest { + @Override + protected Allocator createAllocator() { + return Allocator.pooledDirectWithCleaner(); + } + + @Test + public void bufferMustBeClosedByCleaner() throws InterruptedException { + var allocator = createAllocator(); + double sumOfMemoryDataPoints = 0; + allocator.close(); + int iterations = 100; + int allocationSize = 1024; + for (int i = 0; i < iterations; i++) { + allocateAndForget(allocator, allocationSize); + System.gc(); + sumOfMemoryDataPoints += Statics.MEM_USAGE_NATIVE.sum(); + } + double meanMemoryUsage = sumOfMemoryDataPoints / iterations; + assertThat(meanMemoryUsage, lessThan(allocationSize * 5.0)); + } + + protected void allocateAndForget(Allocator allocator, long size) { + allocator.allocate(size); + } +}