Buffers from the pooling allocator must be able to return memory to the pool if the buffer objects are leaked.

This commit is contained in:
Chris Vest 2021-05-17 18:08:16 +02:00
parent 03743fca0d
commit dec3756e6d
6 changed files with 118 additions and 8 deletions

View File

@ -47,7 +47,16 @@ public interface AllocatorControl {
* Memory that isn't attached to any particular buffer.
*/
interface UntetheredMemory {
/**
* Produce the recoverable memory object associated with this piece of untethered memory.
* @implNote This method should only be called once, since it might be expensive.
*/
<Memory> Memory memory();
/**
* Produce the drop instance associated with this piece of untethered memory.
* @implNote This method should only be called once, since it might be expensive, or interact with Cleaners.
*/
<BufferType extends Buffer> Drop<BufferType> drop();
}
}

View File

@ -130,6 +130,7 @@ public interface BufferAllocator extends AutoCloseable {
}
static BufferAllocator pooledDirect() {
return new SizeClassedMemoryPool(MemoryManagers.getManagers().getNativeMemoryManager());
return new PooledBufferAllocator(MemoryManagers.getManagers().getNativeMemoryManager());
// return new SizeClassedMemoryPool(MemoryManagers.getManagers().getNativeMemoryManager());
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.internal;
import io.netty.buffer.api.Drop;
import java.io.Serial;
import java.lang.ref.Cleaner;
import java.util.concurrent.atomic.AtomicReference;
/**
* A drop implementation that delegates to another drop instance, either when called directly, or when it becomes
* cleanable. This ensures that objects are dropped even if they leak.
*/
public final class CleanerDrop<T> implements Drop<T> {
private Cleaner.Cleanable cleanable;
private GatedRunner<T> runner;
/**
* Wrap the given drop instance, and produce a new drop instance that will also call the delegate drop instance if
* it becomes cleanable.
*/
public static <T> Drop<T> wrap(Drop<T> drop) {
CleanerDrop<T> cleanerDrop = new CleanerDrop<>();
GatedRunner<T> runner = new GatedRunner<>(drop);
cleanerDrop.cleanable = Statics.CLEANER.register(cleanerDrop, runner);
cleanerDrop.runner = runner;
return cleanerDrop;
}
private CleanerDrop() {
}
@Override
public void attach(T obj) {
runner.set(obj);
runner.drop.attach(obj);
}
@Override
public void drop(T obj) {
attach(obj);
cleanable.clean();
}
@Override
public String toString() {
return "CleanerDrop(" + runner.drop + ')';
}
private static final class GatedRunner<T> extends AtomicReference<T> implements Runnable {
@Serial
private static final long serialVersionUID = 2685535951915798850L;
final Drop<T> drop;
private GatedRunner(Drop<T> drop) {
this.drop = drop;
}
@Override
public void run() {
T obj = getAndSet(null); // Make absolutely sure we only delegate once.
if (obj != null) {
drop.drop(obj);
}
}
}
}

View File

@ -18,6 +18,7 @@ package io.netty.buffer.api.pool;
import io.netty.buffer.api.AllocatorControl.UntetheredMemory;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.internal.CleanerDrop;
import java.util.PriorityQueue;
@ -572,7 +573,8 @@ final class PoolChunk implements PoolChunkMetric {
@Override
public <BufferType extends Buffer> Drop<BufferType> drop() {
return (Drop<BufferType>) new PooledDrop(chunk.arena, chunk, threadCache, handle, maxLength);
PooledDrop pooledDrop = new PooledDrop(chunk.arena, chunk, threadCache, handle, maxLength);
return (Drop<BufferType>) CleanerDrop.wrap(pooledDrop);
}
}

View File

@ -19,11 +19,29 @@ import io.netty.buffer.api.memseg.NativeMemorySegmentManager;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.List;
import java.util.stream.Stream;
import static io.netty.buffer.api.MemoryManagers.using;
import static org.assertj.core.api.Assertions.assertThat;
public class BufferCleanerTest extends BufferTestSupport {
@SuppressWarnings("OptionalGetWithoutIsPresent")
static Fixture[] memorySegmentAllocators() {
MemoryManagers managers = MemoryManagers.getAllManagers()
.map(p -> p.get())
.filter(mm -> "MS".equals(mm.toString()))
.findFirst().get();
List<Fixture> initFixtures = initialAllocators().stream().flatMap(f -> {
Stream.Builder<Fixture> builder = Stream.builder();
builder.add(new Fixture(f + "/" + managers, () -> using(managers, f), f.getProperties()));
return builder.build();
}).toList();
return fixtureCombinations(initFixtures).filter(f -> f.isDirect()).toArray(Fixture[]::new);
}
@ParameterizedTest
@MethodSource("directAllocators")
@MethodSource("memorySegmentAllocators")
public void bufferMustBeClosedByCleaner(Fixture fixture) throws InterruptedException {
var initial = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum();
int allocationSize = 1024;
@ -34,6 +52,7 @@ public class BufferCleanerTest extends BufferTestSupport {
System.runFinalization();
sum = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum() - initial;
if (sum < allocationSize) {
// The memory must have been cleaned.
return;
}
}
@ -42,7 +61,7 @@ public class BufferCleanerTest extends BufferTestSupport {
private static void allocateAndForget(Fixture fixture, int size) {
var allocator = fixture.createAllocator();
allocator.close();
allocator.allocate(size);
allocator.close();
}
}

View File

@ -59,7 +59,7 @@ public abstract class BufferTestSupport {
private static final Memoize<Fixture[]> INITIAL_NO_CONST = new Memoize<>(
() -> initialFixturesForEachImplementation().stream().filter(f -> !f.isConst()).toArray(Fixture[]::new));
private static final Memoize<Fixture[]> ALL_COMBINATIONS = new Memoize<>(
() -> fixtureCombinations().toArray(Fixture[]::new));
() -> fixtureCombinations(initialFixturesForEachImplementation()).toArray(Fixture[]::new));
private static final Memoize<Fixture[]> ALL_ALLOCATORS = new Memoize<>(
() -> Arrays.stream(ALL_COMBINATIONS.get())
.filter(sample())
@ -160,9 +160,7 @@ public abstract class BufferTestSupport {
return initFixtures;
}
private static Stream<Fixture> fixtureCombinations() {
List<Fixture> initFixtures = initialFixturesForEachImplementation();
static Stream<Fixture> fixtureCombinations(List<Fixture> initFixtures) {
Builder<Fixture> builder = Stream.builder();
initFixtures.forEach(builder);