Make cleaner in pooledDirectWithCleaner return segments to pool instead of deallocating

Motivation:
 Allocating memory is expensive, which is why we pool our allocations.
 The cleaner, when used with pooled memory, should then return that memory to the pool instead of deallocating it.
 The purpose of the cleaner is, after all, to avoid having to track the reference counts so precisely.

Modification:
 The NativeMemoryCleanerDrop is now able to either recover lost memory segments, when a buffer wasn't closed explicitly
 and is being cleaned by the GC, or return the buffer to the pool via ordinary drop.
 The GatedCleanable has been added, because buffer ownership transfer involves generating new memory segments,
 and in those cases we need to invalidate the old cleanables without deallocating the tracked memory segment or returning
 it to the pool more than once.

Result:
 The pooledDirectWithCleaner allocator is now able to reuse memory segments, even when their references are forgotten and
 they processed by the cleaner thread.
This commit is contained in:
Chris Vest 2020-09-25 12:15:45 +02:00
parent 0055837b75
commit 1aa439991a
9 changed files with 128 additions and 47 deletions

View File

@ -57,7 +57,7 @@ public interface Allocator extends AutoCloseable {
@Override
public BBuf allocate(long size) {
checkSize(size);
var segment = MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
var segment = allocateHeap(size);
return new BBuf(segment, SEGMENT_CLOSE);
}
};
@ -68,48 +68,58 @@ public interface Allocator extends AutoCloseable {
@Override
public Buf allocate(long size) {
checkSize(size);
var segment = MemorySegment.allocateNative(size);
Statics.MEM_USAGE_NATIVE.add(size);
return new BBuf(segment, SEGMENT_CLOSE_NATIVE);
var segment = allocateNative(size);
return new BBuf(segment, SEGMENT_CLOSE);
}
};
}
static Allocator pooledHeap() {
return new SizeClassedMemoryPool(false) {
return new SizeClassedMemoryPool() {
@Override
protected MemorySegment createMemorySegment(long size) {
checkSize(size);
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]).withOwnerThread(null);
return allocateHeap(size).withOwnerThread(null);
}
};
}
static Allocator pooledDirect() {
return new SizeClassedMemoryPool(true) {
return new SizeClassedMemoryPool() {
@Override
protected MemorySegment createMemorySegment(long size) {
checkSize(size);
return MemorySegment.allocateNative(size).withOwnerThread(null);
return allocateNative(size).withOwnerThread(null);
}
};
}
static Allocator pooledDirectWithCleaner() {
return new SizeClassedMemoryPool(true) {
return new SizeClassedMemoryPool() {
@Override
protected MemorySegment createMemorySegment(long size) {
checkSize(size);
return MemorySegment.allocateNative(size).withOwnerThread(null);
return allocateNative(size).withOwnerThread(null);
}
@Override
protected BBuf createBBuf(MemorySegment segment) {
var drop = new NativeMemoryCleanerDrop();
var drop = new NativeMemoryCleanerDrop(this, getDrop());
var buf = new BBuf(segment, drop);
drop.accept(buf);
return buf;
}
};
}
private static MemorySegment allocateHeap(long size) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
}
private static MemorySegment allocateNative(long size) {
var segment = MemorySegment.allocateNative(size)
.withCleanupAction(Statics.getCleanupAction(size));
Statics.MEM_USAGE_NATIVE.add(size);
return segment;
}
}

View File

@ -17,15 +17,10 @@ package io.netty.buffer.b2;
import jdk.incubator.foreign.MemorySegment;
import static io.netty.buffer.b2.Statics.*;
import static jdk.incubator.foreign.MemoryAccess.*;
class BBuf extends RcSupport<Buf, BBuf> implements Buf {
static final Drop<BBuf> SEGMENT_CLOSE = buf -> buf.seg.close();
static final Drop<BBuf> SEGMENT_CLOSE_NATIVE = buf -> {
buf.seg.close();
MEM_USAGE_NATIVE.add(-buf.seg.byteSize());
};
final MemorySegment seg;
private int roff;
private int woff;
@ -95,6 +90,7 @@ class BBuf extends RcSupport<Buf, BBuf> implements Buf {
}
// ### CODEGEN START primitive accessors implementation
// <editor-fold defaultstate="collapsed" desc="Generated primitive accessors implementation.">
@Override
public byte readByte() {
@ -729,6 +725,7 @@ class BBuf extends RcSupport<Buf, BBuf> implements Buf {
setDoubleAtOffset_LE(seg, woff, value);
return this;
}
// </editor-fold>
// ### CODEGEN END primitive accessors implementation
@Override

View File

@ -96,6 +96,7 @@ public interface Buf extends Rc<Buf> {
long getNativeAddress();
// ### CODEGEN START primitive accessors interface
// <editor-fold defaultstate="collapsed" desc="Generated primitive accessors interface.">
/**
* Get the byte value at the current {@link Buf#readerIndex()},
@ -1086,5 +1087,6 @@ public interface Buf extends Rc<Buf> {
* greater than or equal to {@link Buf#capacity()} minus {@link Double#BYTES}.
*/
Buf writeDoubleLE(int woff, double value);
// </editor-fold>
// ### CODEGEN END primitive accessors interface
}

View File

@ -16,22 +16,29 @@
package io.netty.buffer.b2;
import java.lang.invoke.VarHandle;
import java.lang.ref.Cleaner;
import java.lang.ref.Cleaner.Cleanable;
import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.netty.buffer.b2.Statics.*;
import static java.lang.invoke.MethodHandles.*;
class NativeMemoryCleanerDrop implements Drop<BBuf> {
private static final Cleaner CLEANER = Cleaner.create();
private static final VarHandle CLEANABLE =
findVarHandle(lookup(), NativeMemoryCleanerDrop.class, "cleanable", Cleanable.class);
findVarHandle(lookup(), NativeMemoryCleanerDrop.class, "cleanable", GatedCleanable.class);
private final SizeClassedMemoryPool pool;
private final Drop<BBuf> delegate;
@SuppressWarnings("unused")
private volatile Cleanable cleanable;
private volatile GatedCleanable cleanable;
NativeMemoryCleanerDrop(SizeClassedMemoryPool pool, Drop<BBuf> delegate) {
this.pool = pool;
this.delegate = delegate;
}
@Override
public void drop(BBuf buf) {
Cleanable c = (Cleanable) CLEANABLE.getAndSet(this, null);
GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null);
if (c != null) {
c.clean();
}
@ -39,14 +46,46 @@ class NativeMemoryCleanerDrop implements Drop<BBuf> {
@Override
public void accept(BBuf buf) {
drop(null); // Unregister old cleanable, if any, to avoid uncontrolled build-up.
var segment = buf.seg;
cleanable = CLEANER.register(this, () -> {
if (segment.isAlive()) {
// TODO return segment to pool, or call out to external drop, instead of closing it directly.
segment.close();
MEM_USAGE_NATIVE.add(-segment.byteSize());
// Unregister old cleanable, if any, to avoid uncontrolled build-up.
GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null);
if (c != null) {
c.disable();
c.clean();
}
var pool = this.pool;
var seg = buf.seg;
var delegate = this.delegate;
WeakReference<BBuf> ref = new WeakReference<>(buf);
AtomicBoolean gate = new AtomicBoolean();
cleanable = new GatedCleanable(gate, CLEANER.register(this, () -> {
if (gate.compareAndSet(false, true)) {
BBuf b = ref.get();
if (b == null) {
pool.recoverLostSegment(seg);
} else {
delegate.drop(b);
}
}
});
}));
}
private static class GatedCleanable implements Cleanable {
private final AtomicBoolean gate;
private final Cleanable cleanable;
GatedCleanable(AtomicBoolean gate, Cleanable cleanable) {
this.gate = gate;
this.cleanable = cleanable;
}
public void disable() {
gate.set(true);
}
@Override
public void clean() {
cleanable.clean();
}
}
}

View File

@ -28,13 +28,11 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
private static final VarHandle CLOSE = Statics.findVarHandle(
lookup(), SizeClassedMemoryPool.class, "closed", boolean.class);
private final ConcurrentHashMap<Long, ConcurrentLinkedQueue<Send<Buf>>> pool;
private final Drop<BBuf> disposer;
@SuppressWarnings("unused")
private volatile boolean closed;
protected SizeClassedMemoryPool(boolean allocatesNativeMemory) {
protected SizeClassedMemoryPool() {
pool = new ConcurrentHashMap<>();
disposer = allocatesNativeMemory ? BBuf.SEGMENT_CLOSE_NATIVE : BBuf.SEGMENT_CLOSE;
}
@Override
@ -45,7 +43,6 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
return send.receive();
}
var segment = createMemorySegment(size);
Statics.MEM_USAGE_NATIVE.add(size);
return createBBuf(segment);
}
@ -86,18 +83,22 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
var sizeClassPool = getSizeClassPool(buf.capacity());
sizeClassPool.offer(buf.send());
if (closed) {
var send = sizeClassPool.poll();
if (send != null) {
Send<Buf> send;
while ((send = sizeClassPool.poll()) != null) {
dispose(send.receive());
}
}
}
void recoverLostSegment(MemorySegment segment) {
createBBuf(segment).close();
}
private ConcurrentLinkedQueue<Send<Buf>> getSizeClassPool(long size) {
return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>());
}
private void dispose(Buf buf) {
disposer.drop((BBuf) buf);
private static void dispose(Buf buf) {
BBuf.SEGMENT_CLOSE.drop((BBuf) buf);
}
}

View File

@ -17,10 +17,14 @@ package io.netty.buffer.b2;
import java.lang.invoke.MethodHandles.Lookup;
import java.lang.invoke.VarHandle;
import java.lang.ref.Cleaner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
interface Statics {
Cleaner CLEANER = Cleaner.create();
LongAdder MEM_USAGE_NATIVE = new LongAdder();
ConcurrentHashMap<Long,Runnable> CLEANUP_ACTIONS = new ConcurrentHashMap<>();
static VarHandle findVarHandle(Lookup lookup, Class<?> recv, String name, Class<?> type) {
try {
@ -29,4 +33,8 @@ interface Statics {
throw new ExceptionInInitializerError(e);
}
}
static Runnable getCleanupAction(long size) {
return CLEANUP_ACTIONS.computeIfAbsent(size, s -> () -> MEM_USAGE_NATIVE.add(-s));
}
}

View File

@ -296,6 +296,7 @@ public abstract class BBufTest {
}
// ### CODEGEN START primitive accessors tests
// <editor-fold defaultstate="collapsed" desc="Generated primitive accessors tests.">
@Test
public void relativeReadOfByteMustNotBoundsCheckWhenReadOffsetAndSizeIsEqualToWriteOffset() {
@ -3948,6 +3949,7 @@ public abstract class BBufTest {
assertEquals((byte) 0x02, buf.readByte());
assertEquals((byte) 0x01, buf.readByte());
}
// </editor-fold>
// ### CODEGEN END primitive accessors tests
private static void assertEquals(byte expected, byte actual) {

View File

@ -503,16 +503,18 @@ public final class Codegen {
private static Function<String, Stream<String>> processLines() {
return new Function<String, Stream<String>>() {
final Pattern codegenStart = Pattern.compile("^\\s*// ### CODEGEN START (.*)$");
final Pattern codegenEnd = Pattern.compile("^\\s*// ### CODEGEN END (.*)$");
final Pattern codegenStart = Pattern.compile("^(\\s*// )### CODEGEN START (.*)$");
final Pattern codegenEnd = Pattern.compile("^(\\s*// )### CODEGEN END (.*)$");
boolean inCodeGenRegion;
@Override
public Stream<String> apply(String line) {
if (inCodeGenRegion) {
if (codegenEnd.matcher(line).find()) {
var matcher = codegenEnd.matcher(line);
if (matcher.find()) {
inCodeGenRegion = false;
return Stream.of(line);
String regionEnd = matcher.group(1) + "</editor-fold>";
return Stream.of(regionEnd, line);
}
return Stream.empty();
}
@ -520,10 +522,13 @@ public final class Codegen {
var matcher = codegenStart.matcher(line);
Stream<String> generator = Stream.empty();
if (matcher.find()) {
String region = matcher.group(1);
String region = matcher.group(2);
var generatorSupplier = REGION_GENERATORS.get(region);
if (generatorSupplier != null) {
generator = generatorSupplier.get();
String regionStart =
matcher.group(1) + "<editor-fold defaultstate=\"collapsed\" desc=\"Generated " +
region + ".\">";
generator = Stream.concat(Stream.of(regionStart), generatorSupplier.get());
inCodeGenRegion = true;
}
}

View File

@ -29,17 +29,34 @@ public class PooledDirectBBufWithCleanerTest extends DirectBBufTest {
@Test
public void bufferMustBeClosedByCleaner() throws InterruptedException {
var allocator = createAllocator();
double sumOfMemoryDataPoints = 0;
allocator.close();
int iterations = 100;
int allocationSize = 1024;
for (int i = 0; i < iterations; i++) {
allocateAndForget(allocator, allocationSize);
System.gc();
sumOfMemoryDataPoints += Statics.MEM_USAGE_NATIVE.sum();
System.runFinalization();
}
var sum = Statics.MEM_USAGE_NATIVE.sum();
var totalAllocated = (long) allocationSize * iterations;
assertThat(sum, lessThan(totalAllocated));
}
@Test
public void buffersMustBeReusedByPoolingAllocatorEvenWhenDroppedByCleanerInsteadOfExplicitly()
throws InterruptedException {
try (var allocator = createAllocator()) {
int iterations = 100;
int allocationSize = 1024;
for (int i = 0; i < iterations; i++) {
allocateAndForget(allocator, allocationSize);
System.gc();
System.runFinalization();
}
var sum = Statics.MEM_USAGE_NATIVE.sum();
var totalAllocated = (long) allocationSize * iterations;
assertThat(sum, lessThan(totalAllocated));
}
double meanMemoryUsage = sumOfMemoryDataPoints / iterations;
assertThat(meanMemoryUsage, lessThan(allocationSize * 5.0));
}
protected void allocateAndForget(Allocator allocator, long size) {