Re-create allocations tester

This commit is contained in:
Andrea Cavalli 2022-04-11 16:40:55 +02:00
parent 851f73481a
commit 0ca6f4c2c5
5 changed files with 171 additions and 25 deletions

12
pom.xml
View File

@ -101,7 +101,7 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.8.0-M1</version>
<version>5.8.2</version>
<exclusions>
<exclusion>
<groupId>org.hamcrest</groupId>
@ -112,12 +112,12 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.8.0-M1</version>
<version>5.8.2</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>5.8.0-M1</version>
<version>5.8.2</version>
</dependency>
<!-- This will get hamcrest-core automatically -->
<dependency>
@ -184,6 +184,7 @@
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-test-framework</artifactId>
<version>9.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
@ -534,7 +535,7 @@
<annotationProcessorPath>
<groupId>io.soabase.record-builder</groupId>
<artifactId>record-builder-processor</artifactId>
<version>32</version>
<version>33</version>
</annotationProcessorPath>
</annotationProcessorPaths>
<annotationProcessors>
@ -583,10 +584,11 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.8.0-M1</version>
<version>5.8.2</version>
</dependency>
</dependencies>
<configuration>
<useModulePath>false</useModulePath>
<argLine>--add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --enable-native-access=ALL-UNNAMED</argLine>
<systemProperties>
<property>

View File

@ -6,12 +6,15 @@ import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* Netty5 hides some metrics. This utility class can read them.
*/
public class MetricUtils {
private static final Logger LOG = LogManager.getLogger(MetricUtils.class);
private static final MethodHandle GET_ARENA_METRICS;
static {
@ -24,12 +27,16 @@ public class MetricUtils {
var pooledBufferClass = Class.forName("io.netty5.buffer.api.pool.PooledBufferAllocatorMetric");
// Find the handle of the method
handle = lookup.findVirtual(pooledBufferClass, "arenaMetrics", MethodType.methodType(List.class));
} catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException ignored) {
} catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException ex) {
logMetricsNotAccessible(ex);
}
GET_ARENA_METRICS = handle;
}
private static void logMetricsNotAccessible(Throwable ex) {
LOG.debug("Failed to open pooled buffer allocator metrics", ex);
}
/**
* Get the metrics of each pool arena of a pooled allocator
* @param allocator Pooled allocator

View File

@ -15,6 +15,8 @@ module dbengine {
exports it.cavallium.dbengine.client.query;
exports it.cavallium.dbengine.database.memory;
exports it.cavallium.dbengine.netty;
opens it.cavallium.dbengine.database.remote;
exports it.cavallium.dbengine;
requires org.jetbrains.annotations;
requires reactor.core;
requires com.google.common;
@ -35,6 +37,8 @@ module dbengine {
requires java.compiler;
requires org.apache.lucene.analysis.common;
requires org.apache.lucene.misc;
requires org.apache.lucene.codecs;
requires org.apache.lucene.backward_codecs;
requires lucene.relevance;
requires io.netty.buffer;
requires io.netty.transport;
@ -49,5 +53,4 @@ module dbengine {
requires io.netty.incubator.codec.classes.quic;
requires org.apache.lucene.queryparser;
requires reactor.netty.incubator.quic;
}

View File

@ -41,10 +41,10 @@ public class DbTestUtils {
return "0123456789".repeat(1024);
}
public record TestAllocator(PooledBufferAllocator allocator) {}
public record TestAllocator(TestAllocatorImpl allocator) {}
public static TestAllocator newAllocator() {
return new TestAllocator(new PooledBufferAllocator(MemoryManager.instance(), true, 1, 8192, 9, 0, 0, true));
return new TestAllocator(TestAllocatorImpl.create());
}
public static void destroyAllocator(TestAllocator testAllocator) {
@ -52,20 +52,10 @@ public class DbTestUtils {
}
@SuppressWarnings("SameParameterValue")
private static long getActiveAllocations(PooledBufferAllocator allocator, boolean printStats) {
allocator.trimCurrentThreadCache();
var metrics = MetricUtils.getPoolArenaMetrics(allocator);
int allocations = 0;
int deallocations = 0;
int activeAllocations = 0;
for (PoolArenaMetric metric : metrics) {
allocations += metric.numAllocations();
deallocations += metric.numDeallocations();
activeAllocations += metric.numActiveAllocations();
}
private static long getActiveAllocations(TestAllocatorImpl allocator, boolean printStats) {
long activeAllocations = allocator.getActiveAllocations();
if (printStats) {
System.out.println("allocations=" + allocations + ", deallocations=" + deallocations
+ ", activeAllocations=" + activeAllocations);
System.out.println("activeAllocations=" + activeAllocations);
}
return activeAllocations;
}
@ -102,7 +92,7 @@ public class DbTestUtils {
return canUse;
}
public static void ensureNoLeaks(PooledBufferAllocator allocator, boolean printStats, boolean useClassicException) {
public static void ensureNoLeaks(TestAllocatorImpl allocator, boolean printStats, boolean useClassicException) {
if (allocator != null) {
var allocs = getActiveAllocations(allocator, printStats);
if (useClassicException) {
@ -203,7 +193,7 @@ public class DbTestUtils {
);
}
public static <T, U> DatabaseMapDictionaryHashed<String, String, Integer> tempDatabaseMapDictionaryHashMap(
public static DatabaseMapDictionaryHashed<String, String, Integer> tempDatabaseMapDictionaryHashMap(
LLDictionary dictionary) {
return DatabaseMapDictionaryHashed.simple(dictionary,
Serializer.UTF8_SERIALIZER,

View File

@ -0,0 +1,144 @@
package it.cavallium.dbengine;
import static io.netty5.buffer.api.internal.Statics.allocatorClosedException;
import static io.netty5.buffer.api.internal.Statics.standardDrop;
import io.netty5.buffer.api.AllocationType;
import io.netty5.buffer.api.AllocatorControl;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.MemoryManager;
import io.netty5.buffer.api.StandardAllocationTypes;
import io.netty5.buffer.api.internal.Statics;
import io.netty5.buffer.api.pool.PooledBufferAllocator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.function.Supplier;
public class TestAllocatorImpl implements BufferAllocator, AllocatorControl {
private final TestMemoryManager manager;
private final AllocationType allocationType = StandardAllocationTypes.ON_HEAP;
private volatile boolean closed;
private TestAllocatorImpl(TestMemoryManager testMemoryManager) {
this.manager = testMemoryManager;
}
public static TestAllocatorImpl create() {
return new TestAllocatorImpl(new TestMemoryManager(MemoryManager.instance()));
}
@Override
public boolean isPooling() {
return false;
}
@Override
public AllocationType getAllocationType() {
return allocationType;
}
@Override
public Buffer allocate(int size) {
if (closed) {
throw allocatorClosedException();
}
Statics.assertValidBufferSize(size);
return manager.allocateShared(this, size, Statics.standardDrop(manager), allocationType);
}
@Override
public Supplier<Buffer> constBufferSupplier(byte[] bytes) {
if (closed) {
throw allocatorClosedException();
}
Buffer constantBuffer = manager.allocateShared(
this, bytes.length, standardDrop(manager), allocationType);
constantBuffer.writeBytes(bytes).makeReadOnly();
return () -> manager.allocateConstChild(constantBuffer);
}
@Override
public void close() {
closed = true;
}
public long getActiveAllocations() {
return this.manager.getActiveAllocations();
}
@Override
public BufferAllocator getAllocator() {
return this;
}
private static class TestMemoryManager implements MemoryManager {
private final MemoryManager instance;
private final LongAdder activeAllocations = new LongAdder();
public TestMemoryManager(MemoryManager instance) {
this.instance = instance;
}
@Override
public Buffer allocateShared(AllocatorControl allocatorControl,
long size,
Function<Drop<Buffer>, Drop<Buffer>> dropDecorator,
AllocationType allocationType) {
return instance.allocateShared(allocatorControl, size, this::createDrop, allocationType);
}
@Override
public Buffer allocateConstChild(Buffer readOnlyConstParent) {
return instance.allocateConstChild(readOnlyConstParent);
}
@Override
public Object unwrapRecoverableMemory(Buffer buf) {
return instance.unwrapRecoverableMemory(buf);
}
@Override
public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop<Buffer> drop) {
return instance.recoverMemory(allocatorControl, recoverableMemory, drop);
}
@Override
public Object sliceMemory(Object memory, int offset, int length) {
return instance.sliceMemory(memory, offset, length);
}
@Override
public String implementationName() {
return instance.implementationName();
}
private Drop<Buffer> createDrop(Drop<Buffer> drop) {
activeAllocations.increment();
return new Drop<>() {
@Override
public void drop(Buffer obj) {
activeAllocations.decrement();
drop.drop(obj);
}
@Override
public Drop<Buffer> fork() {
return createDrop(drop.fork());
}
@Override
public void attach(Buffer obj) {
drop.attach(obj);
}
};
}
public long getActiveAllocations() {
return activeAllocations.longValue();
}
}
}