Merge pull request #20 from netty/always-cleaner

Buffers always have a cleaner attached
This commit is contained in:
Chris Vest 2020-12-30 14:51:29 +01:00 committed by GitHub
commit 9afad3a578
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 14 additions and 46 deletions

View File

@ -150,14 +150,10 @@ public interface Allocator extends AutoCloseable {
} }
static Allocator heap() { static Allocator heap() {
return new ManagedAllocator(MemoryManager.getHeapMemoryManager(), null); return new ManagedAllocator(MemoryManager.getHeapMemoryManager(), Statics.CLEANER);
} }
static Allocator direct() { static Allocator direct() {
return new ManagedAllocator(MemoryManager.getNativeMemoryManager(), null);
}
static Allocator directWithCleaner() {
return new ManagedAllocator(MemoryManager.getNativeMemoryManager(), Statics.CLEANER); return new ManagedAllocator(MemoryManager.getNativeMemoryManager(), Statics.CLEANER);
} }
@ -168,13 +164,4 @@ public interface Allocator extends AutoCloseable {
static Allocator pooledDirect() { static Allocator pooledDirect() {
return new SizeClassedMemoryPool(MemoryManager.getNativeMemoryManager()); return new SizeClassedMemoryPool(MemoryManager.getNativeMemoryManager());
} }
static Allocator pooledDirectWithCleaner() {
return new SizeClassedMemoryPool(MemoryManager.getNativeMemoryManager()) {
@Override
protected Drop<Buf> getDrop() {
return new NativeMemoryCleanerDrop(this, getMemoryManager(), super.getDrop());
}
};
}
} }

View File

@ -24,16 +24,16 @@ import static io.netty.buffer.api.Statics.CLEANER;
import static io.netty.buffer.api.Statics.findVarHandle; import static io.netty.buffer.api.Statics.findVarHandle;
import static java.lang.invoke.MethodHandles.lookup; import static java.lang.invoke.MethodHandles.lookup;
class NativeMemoryCleanerDrop implements Drop<Buf> { class CleanerPooledDrop implements Drop<Buf> {
private static final VarHandle CLEANABLE = private static final VarHandle CLEANABLE =
findVarHandle(lookup(), NativeMemoryCleanerDrop.class, "cleanable", GatedCleanable.class); findVarHandle(lookup(), CleanerPooledDrop.class, "cleanable", GatedCleanable.class);
private final SizeClassedMemoryPool pool; private final SizeClassedMemoryPool pool;
private final MemoryManager manager; private final MemoryManager manager;
private final Drop<Buf> delegate; private final Drop<Buf> delegate;
@SuppressWarnings("unused") @SuppressWarnings("unused")
private volatile GatedCleanable cleanable; private volatile GatedCleanable cleanable;
NativeMemoryCleanerDrop(SizeClassedMemoryPool pool, MemoryManager manager, CleanerPooledDrop(SizeClassedMemoryPool pool, MemoryManager manager,
Drop<Buf> delegate) { Drop<Buf> delegate) {
this.pool = pool; this.pool = pool;
this.manager = manager; this.manager = manager;

View File

@ -59,7 +59,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop<Buf> {
} }
protected Drop<Buf> getDrop() { protected Drop<Buf> getDrop() {
return this; return new CleanerPooledDrop(this, getMemoryManager(), this);
} }
@Override @Override

View File

@ -68,11 +68,9 @@ public class BufTest {
static List<Fixture> initialAllocators() { static List<Fixture> initialAllocators() {
return List.of( return List.of(
new Fixture("heap", Allocator::heap, HEAP), new Fixture("heap", Allocator::heap, HEAP),
new Fixture("direct", Allocator::direct, DIRECT), new Fixture("direct", Allocator::direct, DIRECT, CLEANER),
new Fixture("directWithCleaner", Allocator::directWithCleaner, DIRECT, CLEANER),
new Fixture("pooledHeap", Allocator::pooledHeap, POOLED, HEAP), new Fixture("pooledHeap", Allocator::pooledHeap, POOLED, HEAP),
new Fixture("pooledDirect", Allocator::pooledDirect, POOLED, DIRECT), new Fixture("pooledDirect", Allocator::pooledDirect, POOLED, DIRECT, CLEANER));
new Fixture("pooledDirectWithCleaner", Allocator::pooledDirectWithCleaner, POOLED, DIRECT, CLEANER));
} }
static Stream<Fixture> nonSliceAllocators() { static Stream<Fixture> nonSliceAllocators() {
@ -87,18 +85,10 @@ public class BufTest {
return fixtureCombinations().filter(Fixture::isDirect); return fixtureCombinations().filter(Fixture::isDirect);
} }
static Stream<Fixture> directWithCleanerAllocators() { static Stream<Fixture> directPooledAllocators() {
return fixtureCombinations().filter(f -> f.isDirect() && f.isCleaner());
}
static Stream<Fixture> directPooledWithCleanerAllocators() {
return fixtureCombinations().filter(f -> f.isDirect() && f.isCleaner() && f.isPooled()); return fixtureCombinations().filter(f -> f.isDirect() && f.isCleaner() && f.isPooled());
} }
static Stream<Fixture> poolingAllocators() {
return fixtureCombinations().filter(f -> f.isPooled());
}
private static Stream<Fixture> fixtureCombinations() { private static Stream<Fixture> fixtureCombinations() {
Fixture[] fxs = fixtures; Fixture[] fxs = fixtures;
if (fxs != null) { if (fxs != null) {
@ -1496,7 +1486,7 @@ public class BufTest {
class CleanerTests { class CleanerTests {
@Disabled("Precise native memory accounting does not work since recent panama-foreign changes.") @Disabled("Precise native memory accounting does not work since recent panama-foreign changes.")
@ParameterizedTest @ParameterizedTest
@MethodSource("io.netty.buffer.api.BufTest#directWithCleanerAllocators") @MethodSource("io.netty.buffer.api.BufTest#directAllocators")
public void bufferMustBeClosedByCleaner(Fixture fixture) throws InterruptedException { public void bufferMustBeClosedByCleaner(Fixture fixture) throws InterruptedException {
var allocator = fixture.createAllocator(); var allocator = fixture.createAllocator();
allocator.close(); allocator.close();
@ -1518,7 +1508,7 @@ public class BufTest {
@Disabled("Precise native memory accounting does not work since recent panama-foreign changes.") @Disabled("Precise native memory accounting does not work since recent panama-foreign changes.")
@ParameterizedTest @ParameterizedTest
@MethodSource("io.netty.buffer.api.BufTest#directPooledWithCleanerAllocators") @MethodSource("io.netty.buffer.api.BufTest#directPooledAllocators")
public void buffersMustBeReusedByPoolingAllocatorEvenWhenDroppedByCleanerInsteadOfExplicitly(Fixture fixture) public void buffersMustBeReusedByPoolingAllocatorEvenWhenDroppedByCleanerInsteadOfExplicitly(Fixture fixture)
throws InterruptedException { throws InterruptedException {
try (var allocator = fixture.createAllocator()) { try (var allocator = fixture.createAllocator()) {

View File

@ -42,9 +42,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
@State(Scope.Benchmark) @State(Scope.Benchmark)
public class MemorySegmentClosedByCleanerBenchmark { public class MemorySegmentClosedByCleanerBenchmark {
private static final Allocator direct = Allocator.direct(); private static final Allocator direct = Allocator.direct();
private static final Allocator withCleaner = Allocator.directWithCleaner();
private static final Allocator directPooled = Allocator.pooledDirect(); private static final Allocator directPooled = Allocator.pooledDirect();
private static final Allocator pooledWithCleaner = Allocator.pooledDirectWithCleaner();
@Param({"heavy", "light"}) @Param({"heavy", "light"})
public String workload; public String workload;
@ -77,19 +75,12 @@ public class MemorySegmentClosedByCleanerBenchmark {
@Benchmark @Benchmark
public Buf cleanerClose() throws Exception { public Buf cleanerClose() throws Exception {
return process(withCleaner.allocate(256)); return process(direct.allocate(256));
} }
@Benchmark @Benchmark
public Buf cleanerClosePooled() throws Exception { public Buf cleanerClosePooled() throws Exception {
return process(pooledWithCleaner.allocate(256)); return process(directPooled.allocate(256));
}
@Benchmark
public Buf pooledWithCleanerExplicitClose() throws Exception {
try (Buf buf = pooledWithCleaner.allocate(256)) {
return process(buf);
}
} }
private Buf process(Buf buffer) throws Exception { private Buf process(Buf buffer) throws Exception {

View File

@ -22,7 +22,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
public final class AsyncExample { public final class AsyncExample {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
try (Allocator allocator = Allocator.pooledDirectWithCleaner(); try (Allocator allocator = Allocator.pooledDirect();
Buf startBuf = allocator.allocate(16)) { Buf startBuf = allocator.allocate(16)) {
startBuf.writeLong(threadId()); startBuf.writeLong(threadId());