From 8b0ea44f02ff1d6ee984d750bc372302af389903 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 10 Aug 2020 12:15:02 +0200 Subject: [PATCH] Add support for closing BBuf/MemorySegment via Cleaner Motivation: Keeping track of refcounts is hard for client code. It is much easier to rely on the GC for cleanup. Modification: Add a new pool, native allocator implementation, that uses the Cleaner API to prevent leaks. Native memory accounting, which is not built into the JDK for MemorySegments, has also been added to the allocators so the Cleaner based implementation can be tested. Result: We have a leak resistant allocator for BBuf. --- .../java/io/netty/buffer/b2/Allocator.java | 26 +++++++++++-- .../main/java/io/netty/buffer/b2/BBuf.java | 23 ++++------- .../main/java/io/netty/buffer/b2/Drop.java | 8 +++- .../buffer/b2/NativeMemoryCleanerDrop.java | 38 +++++++++++++++++++ .../io/netty/buffer/b2/RendezvousSend.java | 1 + .../buffer/b2/SizeClassedMemoryPool.java | 20 ++++++++-- .../main/java/io/netty/buffer/b2/Statics.java | 14 +++++++ .../java/io/netty/buffer/b2/TransferSend.java | 5 ++- .../b2/PooledDirectBBufWithCleanerTest.java | 33 ++++++++++++++++ 9 files changed, 142 insertions(+), 26 deletions(-) create mode 100644 buffer/src/main/java/io/netty/buffer/b2/NativeMemoryCleanerDrop.java create mode 100644 buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufWithCleanerTest.java diff --git a/buffer/src/main/java/io/netty/buffer/b2/Allocator.java b/buffer/src/main/java/io/netty/buffer/b2/Allocator.java index 34a7c53..4cfe2b1 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Allocator.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Allocator.java @@ -5,7 +5,6 @@ import jdk.incubator.foreign.MemorySegment; import static io.netty.buffer.b2.BBuf.*; public interface Allocator extends AutoCloseable { - BBuf allocate(long size); @Override @@ -26,13 +25,15 @@ public interface Allocator extends AutoCloseable { return new Allocator() { @Override public BBuf allocate(long size) { - return new BBuf(MemorySegment.allocateNative(size), SEGMENT_CLOSE); + var segment = MemorySegment.allocateNative(size); + Statics.MEM_USAGE_NATIVE.add(size); + return new BBuf(segment, SEGMENT_CLOSE_NATIVE); } }; } static Allocator pooledHeap() { - return new SizeClassedMemoryPool() { + return new SizeClassedMemoryPool(false) { @Override protected MemorySegment createMemorySegment(long size) { return MemorySegment.ofArray(new byte[Math.toIntExact(size)]); @@ -41,11 +42,28 @@ public interface Allocator extends AutoCloseable { } static Allocator pooledDirect() { - return new SizeClassedMemoryPool() { + return new SizeClassedMemoryPool(true) { @Override protected MemorySegment createMemorySegment(long size) { return MemorySegment.allocateNative(size); } }; } + + static Allocator pooledDirectWithCleaner() { + return new SizeClassedMemoryPool(true) { + @Override + protected MemorySegment createMemorySegment(long size) { + return MemorySegment.allocateNative(size); + } + + @Override + protected BBuf createBBuf(MemorySegment segment) { + var drop = new NativeMemoryCleanerDrop(); + var buf = new BBuf(segment, drop); + drop.accept(buf); + return buf; + } + }; + } } diff --git a/buffer/src/main/java/io/netty/buffer/b2/BBuf.java b/buffer/src/main/java/io/netty/buffer/b2/BBuf.java index fb43aeb..b499dcc 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/BBuf.java +++ b/buffer/src/main/java/io/netty/buffer/b2/BBuf.java @@ -2,17 +2,19 @@ package io.netty.buffer.b2; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.util.internal.PlatformDependent; import jdk.incubator.foreign.MemoryAccess; -import jdk.incubator.foreign.MemoryAddress; import jdk.incubator.foreign.MemorySegment; -import java.lang.invoke.VarHandle; +import static io.netty.buffer.b2.Statics.*; public class BBuf extends Rc { static final Drop NO_DROP = buf -> {}; static final Drop SEGMENT_CLOSE = buf -> buf.segment.close(); - private final MemorySegment segment; + static final Drop SEGMENT_CLOSE_NATIVE = buf -> { + buf.segment.close(); + MEM_USAGE_NATIVE.add(-buf.segment.byteSize()); + }; + final MemorySegment segment; private long read; private long write; @@ -82,13 +84,11 @@ public class BBuf extends Rc { @Override protected BBuf prepareSend() { BBuf outer = this; - MemorySegment transferSegment = segment.withOwnerThread(Lazy.TRANSFER_OWNER); + MemorySegment transferSegment = segment.withOwnerThread(TRANSFER_OWNER); return new BBuf(transferSegment, NO_DROP) { @Override protected BBuf copy(Thread recipient, Drop drop) { - Object scope = PlatformDependent.getObject(transferSegment, Lazy.SCOPE); - PlatformDependent.putObject(scope, Lazy.OWNER, recipient); - VarHandle.fullFence(); + overwriteMemorySegmentOwner(transferSegment, recipient); BBuf copy = new BBuf(transferSegment, drop); copy.read = outer.read; copy.write = outer.write; @@ -96,11 +96,4 @@ public class BBuf extends Rc { } }; } - - private static class Lazy { - @SuppressWarnings("InstantiatingAThreadWithDefaultRunMethod") - private static final Thread TRANSFER_OWNER = new Thread("ByteBuf Transfer Owner"); - private static final long SCOPE = Statics.fieldOffset("jdk.internal.foreign.AbstractMemorySegmentImpl", "scope"); - private static final long OWNER = Statics.fieldOffset("jdk.internal.foreign.MemoryScope", "owner"); - } } diff --git a/buffer/src/main/java/io/netty/buffer/b2/Drop.java b/buffer/src/main/java/io/netty/buffer/b2/Drop.java index 9bab448..5e8e26e 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Drop.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Drop.java @@ -1,6 +1,12 @@ package io.netty.buffer.b2; +import java.util.function.Consumer; + @FunctionalInterface -public interface Drop> { +public interface Drop> extends Consumer { void drop(T obj); + + @Override + default void accept(T t) { + } } diff --git a/buffer/src/main/java/io/netty/buffer/b2/NativeMemoryCleanerDrop.java b/buffer/src/main/java/io/netty/buffer/b2/NativeMemoryCleanerDrop.java new file mode 100644 index 0000000..9464d54 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/b2/NativeMemoryCleanerDrop.java @@ -0,0 +1,38 @@ +package io.netty.buffer.b2; + +import java.lang.invoke.VarHandle; +import java.lang.ref.Cleaner; +import java.lang.ref.Cleaner.Cleanable; + +import static io.netty.buffer.b2.Statics.*; +import static java.lang.invoke.MethodHandles.*; + +class NativeMemoryCleanerDrop implements Drop { + private static final Cleaner CLEANER = Cleaner.create(); + private static final VarHandle CLEANABLE = + findVarHandle(lookup(), NativeMemoryCleanerDrop.class, "cleanable", Cleanable.class); + @SuppressWarnings("unused") + private volatile Cleanable cleanable; + + @Override + public void drop(BBuf buf) { + Cleanable c = (Cleanable) CLEANABLE.getAndSet(this, null); + if (c != null) { + c.clean(); + } + } + + @Override + public void accept(BBuf buf) { + drop(null); // Unregister old cleanable, if any, to avoid uncontrolled build-up. + var segment = buf.segment; + cleanable = CLEANER.register(this, () -> { + if (segment.isAlive()) { + // Clear owner so we can close from cleaner thread. + overwriteMemorySegmentOwner(segment, null); + segment.close(); + MEM_USAGE_NATIVE.add(-segment.byteSize()); + } + }); + } +} diff --git a/buffer/src/main/java/io/netty/buffer/b2/RendezvousSend.java b/buffer/src/main/java/io/netty/buffer/b2/RendezvousSend.java index b1e45ec..f8664c4 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/RendezvousSend.java +++ b/buffer/src/main/java/io/netty/buffer/b2/RendezvousSend.java @@ -46,6 +46,7 @@ class RendezvousSend> implements Send { } recipientLatch.await(); incoming = outgoing.copy(recipient, drop); + drop.accept(incoming); sentLatch.countDown(); } } diff --git a/buffer/src/main/java/io/netty/buffer/b2/SizeClassedMemoryPool.java b/buffer/src/main/java/io/netty/buffer/b2/SizeClassedMemoryPool.java index 70852b1..bd9bced 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/SizeClassedMemoryPool.java +++ b/buffer/src/main/java/io/netty/buffer/b2/SizeClassedMemoryPool.java @@ -11,11 +11,13 @@ import static java.lang.invoke.MethodHandles.*; abstract class SizeClassedMemoryPool implements Allocator, Drop { private static final VarHandle CLOSE = Statics.findVarHandle(lookup(), SizeClassedMemoryPool.class, "closed", boolean.class); private final ConcurrentHashMap>> pool; + private final Drop disposer; @SuppressWarnings("unused") private volatile boolean closed; - protected SizeClassedMemoryPool() { + protected SizeClassedMemoryPool(boolean allocatesNativeMemory) { pool = new ConcurrentHashMap<>(); + disposer = allocatesNativeMemory ? BBuf.SEGMENT_CLOSE_NATIVE : BBuf.SEGMENT_CLOSE; } @Override @@ -25,7 +27,17 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop { if (send != null) { return send.receive(); } - return new BBuf(createMemorySegment(size), this); + var segment = createMemorySegment(size); + Statics.MEM_USAGE_NATIVE.add(size); + return createBBuf(segment); + } + + protected BBuf createBBuf(MemorySegment segment) { + return new BBuf(segment, getDrop()); + } + + protected SizeClassedMemoryPool getDrop() { + return this; } protected abstract MemorySegment createMemorySegment(long size); @@ -58,7 +70,7 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop { return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>()); } - private static void dispose(BBuf buf) { - BBuf.SEGMENT_CLOSE.drop(buf); + private void dispose(BBuf buf) { + disposer.drop(buf); } } diff --git a/buffer/src/main/java/io/netty/buffer/b2/Statics.java b/buffer/src/main/java/io/netty/buffer/b2/Statics.java index 64a1c9d..6cf4814 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Statics.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Statics.java @@ -1,12 +1,20 @@ package io.netty.buffer.b2; import io.netty.util.internal.PlatformDependent; +import jdk.incubator.foreign.MemorySegment; import java.lang.invoke.MethodHandles.Lookup; import java.lang.invoke.VarHandle; import java.lang.reflect.Field; +import java.util.concurrent.atomic.LongAdder; interface Statics { + @SuppressWarnings("InstantiatingAThreadWithDefaultRunMethod") + Thread TRANSFER_OWNER = new Thread("ByteBuf Transfer Owner"); + long SCOPE = fieldOffset("jdk.internal.foreign.AbstractMemorySegmentImpl", "scope"); + long OWNER = fieldOffset("jdk.internal.foreign.MemoryScope", "owner"); + LongAdder MEM_USAGE_NATIVE = new LongAdder(); + static VarHandle findVarHandle(Lookup lookup, Class recv, String name, Class type) { try { return lookup.findVarHandle(recv, name, type); @@ -24,4 +32,10 @@ interface Statics { throw new ExceptionInInitializerError(e); } } + + static void overwriteMemorySegmentOwner(MemorySegment segment, Thread newOwner) { + Object scope = PlatformDependent.getObject(segment, SCOPE); + PlatformDependent.putObject(scope, OWNER, newOwner); + VarHandle.fullFence(); // Attempt to force visibility of overwritten final fields. + } } diff --git a/buffer/src/main/java/io/netty/buffer/b2/TransferSend.java b/buffer/src/main/java/io/netty/buffer/b2/TransferSend.java index 029bf22..bd14820 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/TransferSend.java +++ b/buffer/src/main/java/io/netty/buffer/b2/TransferSend.java @@ -1,6 +1,5 @@ package io.netty.buffer.b2; -import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import static io.netty.buffer.b2.Statics.*; @@ -23,6 +22,8 @@ class TransferSend> implements Send { if (!RECEIVED.compareAndSet(this, false, true)) { throw new IllegalStateException("This object has already been received."); } - return outgoing.copy(Thread.currentThread(), drop); + var copy = outgoing.copy(Thread.currentThread(), drop); + drop.accept(copy); + return copy; } } diff --git a/buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufWithCleanerTest.java b/buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufWithCleanerTest.java new file mode 100644 index 0000000..3670a3e --- /dev/null +++ b/buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufWithCleanerTest.java @@ -0,0 +1,33 @@ +package io.netty.buffer.b2; + +import org.junit.Test; + +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + +public class PooledDirectBBufWithCleanerTest extends BBufTest { + @Override + protected Allocator createAllocator() { + return Allocator.pooledDirectWithCleaner(); + } + + @Test + public void bufferMustBeClosedByCleaner() throws InterruptedException { + var allocator = createAllocator(); + double sumOfMemoryDataPoints = 0; + allocator.close(); + int iterations = 100; + int allocationSize = 1024; + for (int i = 0; i < iterations; i++) { + allocateAndForget(allocator, allocationSize); + System.gc(); + sumOfMemoryDataPoints += Statics.MEM_USAGE_NATIVE.sum(); + } + double meanMemoryUsage = sumOfMemoryDataPoints / iterations; + assertThat(meanMemoryUsage, lessThan(allocationSize * 5.0)); + } + + protected void allocateAndForget(Allocator allocator, long size) { + allocator.allocate(size); + } +}