Add support for closing BBuf/MemorySegment via Cleaner

Motivation:
 Keeping track of refcounts is hard for client code.
 It is much easier to rely on the GC for cleanup.
Modification:
 Add a new pool, native allocator implementation, that uses the Cleaner API to prevent leaks.
 Native memory accounting, which is not built into the JDK for MemorySegments, has also been added to the allocators so the Cleaner based implementation can be tested.
Result:
 We have a leak resistant allocator for BBuf.
This commit is contained in:
Chris Vest 2020-08-10 12:15:02 +02:00
parent c63403199e
commit 8b0ea44f02
9 changed files with 142 additions and 26 deletions

View File

@ -5,7 +5,6 @@ import jdk.incubator.foreign.MemorySegment;
import static io.netty.buffer.b2.BBuf.*;
public interface Allocator extends AutoCloseable {
BBuf allocate(long size);
@Override
@ -26,13 +25,15 @@ public interface Allocator extends AutoCloseable {
return new Allocator() {
@Override
public BBuf allocate(long size) {
return new BBuf(MemorySegment.allocateNative(size), SEGMENT_CLOSE);
var segment = MemorySegment.allocateNative(size);
Statics.MEM_USAGE_NATIVE.add(size);
return new BBuf(segment, SEGMENT_CLOSE_NATIVE);
}
};
}
static Allocator pooledHeap() {
return new SizeClassedMemoryPool() {
return new SizeClassedMemoryPool(false) {
@Override
protected MemorySegment createMemorySegment(long size) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
@ -41,11 +42,28 @@ public interface Allocator extends AutoCloseable {
}
static Allocator pooledDirect() {
return new SizeClassedMemoryPool() {
return new SizeClassedMemoryPool(true) {
@Override
protected MemorySegment createMemorySegment(long size) {
return MemorySegment.allocateNative(size);
}
};
}
static Allocator pooledDirectWithCleaner() {
return new SizeClassedMemoryPool(true) {
@Override
protected MemorySegment createMemorySegment(long size) {
return MemorySegment.allocateNative(size);
}
@Override
protected BBuf createBBuf(MemorySegment segment) {
var drop = new NativeMemoryCleanerDrop();
var buf = new BBuf(segment, drop);
drop.accept(buf);
return buf;
}
};
}
}

View File

@ -2,17 +2,19 @@ package io.netty.buffer.b2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.PlatformDependent;
import jdk.incubator.foreign.MemoryAccess;
import jdk.incubator.foreign.MemoryAddress;
import jdk.incubator.foreign.MemorySegment;
import java.lang.invoke.VarHandle;
import static io.netty.buffer.b2.Statics.*;
public class BBuf extends Rc<BBuf> {
static final Drop<BBuf> NO_DROP = buf -> {};
static final Drop<BBuf> SEGMENT_CLOSE = buf -> buf.segment.close();
private final MemorySegment segment;
static final Drop<BBuf> SEGMENT_CLOSE_NATIVE = buf -> {
buf.segment.close();
MEM_USAGE_NATIVE.add(-buf.segment.byteSize());
};
final MemorySegment segment;
private long read;
private long write;
@ -82,13 +84,11 @@ public class BBuf extends Rc<BBuf> {
@Override
protected BBuf prepareSend() {
BBuf outer = this;
MemorySegment transferSegment = segment.withOwnerThread(Lazy.TRANSFER_OWNER);
MemorySegment transferSegment = segment.withOwnerThread(TRANSFER_OWNER);
return new BBuf(transferSegment, NO_DROP) {
@Override
protected BBuf copy(Thread recipient, Drop<BBuf> drop) {
Object scope = PlatformDependent.getObject(transferSegment, Lazy.SCOPE);
PlatformDependent.putObject(scope, Lazy.OWNER, recipient);
VarHandle.fullFence();
overwriteMemorySegmentOwner(transferSegment, recipient);
BBuf copy = new BBuf(transferSegment, drop);
copy.read = outer.read;
copy.write = outer.write;
@ -96,11 +96,4 @@ public class BBuf extends Rc<BBuf> {
}
};
}
private static class Lazy {
@SuppressWarnings("InstantiatingAThreadWithDefaultRunMethod")
private static final Thread TRANSFER_OWNER = new Thread("ByteBuf Transfer Owner");
private static final long SCOPE = Statics.fieldOffset("jdk.internal.foreign.AbstractMemorySegmentImpl", "scope");
private static final long OWNER = Statics.fieldOffset("jdk.internal.foreign.MemoryScope", "owner");
}
}

View File

@ -1,6 +1,12 @@
package io.netty.buffer.b2;
import java.util.function.Consumer;
@FunctionalInterface
public interface Drop<T extends Rc<T>> {
public interface Drop<T extends Rc<T>> extends Consumer<T> {
void drop(T obj);
@Override
default void accept(T t) {
}
}

View File

@ -0,0 +1,38 @@
package io.netty.buffer.b2;
import java.lang.invoke.VarHandle;
import java.lang.ref.Cleaner;
import java.lang.ref.Cleaner.Cleanable;
import static io.netty.buffer.b2.Statics.*;
import static java.lang.invoke.MethodHandles.*;
class NativeMemoryCleanerDrop implements Drop<BBuf> {
private static final Cleaner CLEANER = Cleaner.create();
private static final VarHandle CLEANABLE =
findVarHandle(lookup(), NativeMemoryCleanerDrop.class, "cleanable", Cleanable.class);
@SuppressWarnings("unused")
private volatile Cleanable cleanable;
@Override
public void drop(BBuf buf) {
Cleanable c = (Cleanable) CLEANABLE.getAndSet(this, null);
if (c != null) {
c.clean();
}
}
@Override
public void accept(BBuf buf) {
drop(null); // Unregister old cleanable, if any, to avoid uncontrolled build-up.
var segment = buf.segment;
cleanable = CLEANER.register(this, () -> {
if (segment.isAlive()) {
// Clear owner so we can close from cleaner thread.
overwriteMemorySegmentOwner(segment, null);
segment.close();
MEM_USAGE_NATIVE.add(-segment.byteSize());
}
});
}
}

View File

@ -46,6 +46,7 @@ class RendezvousSend<T extends Rc<T>> implements Send<T> {
}
recipientLatch.await();
incoming = outgoing.copy(recipient, drop);
drop.accept(incoming);
sentLatch.countDown();
}
}

View File

@ -11,11 +11,13 @@ import static java.lang.invoke.MethodHandles.*;
abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
private static final VarHandle CLOSE = Statics.findVarHandle(lookup(), SizeClassedMemoryPool.class, "closed", boolean.class);
private final ConcurrentHashMap<Long, ConcurrentLinkedQueue<Send<BBuf>>> pool;
private final Drop<BBuf> disposer;
@SuppressWarnings("unused")
private volatile boolean closed;
protected SizeClassedMemoryPool() {
protected SizeClassedMemoryPool(boolean allocatesNativeMemory) {
pool = new ConcurrentHashMap<>();
disposer = allocatesNativeMemory ? BBuf.SEGMENT_CLOSE_NATIVE : BBuf.SEGMENT_CLOSE;
}
@Override
@ -25,7 +27,17 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
if (send != null) {
return send.receive();
}
return new BBuf(createMemorySegment(size), this);
var segment = createMemorySegment(size);
Statics.MEM_USAGE_NATIVE.add(size);
return createBBuf(segment);
}
protected BBuf createBBuf(MemorySegment segment) {
return new BBuf(segment, getDrop());
}
protected SizeClassedMemoryPool getDrop() {
return this;
}
protected abstract MemorySegment createMemorySegment(long size);
@ -58,7 +70,7 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>());
}
private static void dispose(BBuf buf) {
BBuf.SEGMENT_CLOSE.drop(buf);
private void dispose(BBuf buf) {
disposer.drop(buf);
}
}

View File

@ -1,12 +1,20 @@
package io.netty.buffer.b2;
import io.netty.util.internal.PlatformDependent;
import jdk.incubator.foreign.MemorySegment;
import java.lang.invoke.MethodHandles.Lookup;
import java.lang.invoke.VarHandle;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.LongAdder;
interface Statics {
@SuppressWarnings("InstantiatingAThreadWithDefaultRunMethod")
Thread TRANSFER_OWNER = new Thread("ByteBuf Transfer Owner");
long SCOPE = fieldOffset("jdk.internal.foreign.AbstractMemorySegmentImpl", "scope");
long OWNER = fieldOffset("jdk.internal.foreign.MemoryScope", "owner");
LongAdder MEM_USAGE_NATIVE = new LongAdder();
static VarHandle findVarHandle(Lookup lookup, Class<?> recv, String name, Class<?> type) {
try {
return lookup.findVarHandle(recv, name, type);
@ -24,4 +32,10 @@ interface Statics {
throw new ExceptionInInitializerError(e);
}
}
static void overwriteMemorySegmentOwner(MemorySegment segment, Thread newOwner) {
Object scope = PlatformDependent.getObject(segment, SCOPE);
PlatformDependent.putObject(scope, OWNER, newOwner);
VarHandle.fullFence(); // Attempt to force visibility of overwritten final fields.
}
}

View File

@ -1,6 +1,5 @@
package io.netty.buffer.b2;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import static io.netty.buffer.b2.Statics.*;
@ -23,6 +22,8 @@ class TransferSend<T extends Rc<T>> implements Send<T> {
if (!RECEIVED.compareAndSet(this, false, true)) {
throw new IllegalStateException("This object has already been received.");
}
return outgoing.copy(Thread.currentThread(), drop);
var copy = outgoing.copy(Thread.currentThread(), drop);
drop.accept(copy);
return copy;
}
}

View File

@ -0,0 +1,33 @@
package io.netty.buffer.b2;
import org.junit.Test;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
public class PooledDirectBBufWithCleanerTest extends BBufTest {
@Override
protected Allocator createAllocator() {
return Allocator.pooledDirectWithCleaner();
}
@Test
public void bufferMustBeClosedByCleaner() throws InterruptedException {
var allocator = createAllocator();
double sumOfMemoryDataPoints = 0;
allocator.close();
int iterations = 100;
int allocationSize = 1024;
for (int i = 0; i < iterations; i++) {
allocateAndForget(allocator, allocationSize);
System.gc();
sumOfMemoryDataPoints += Statics.MEM_USAGE_NATIVE.sum();
}
double meanMemoryUsage = sumOfMemoryDataPoints / iterations;
assertThat(meanMemoryUsage, lessThan(allocationSize * 5.0));
}
protected void allocateAndForget(Allocator allocator, long size) {
allocator.allocate(size);
}
}