diff --git a/pom.xml b/pom.xml index d9ff998..a8dce09 100644 --- a/pom.xml +++ b/pom.xml @@ -101,7 +101,7 @@ org.junit.jupiter junit-jupiter-api - 5.8.0-M1 + 5.8.2 org.hamcrest @@ -112,12 +112,12 @@ org.junit.jupiter junit-jupiter-engine - 5.8.0-M1 + 5.8.2 org.junit.jupiter junit-jupiter-params - 5.8.0-M1 + 5.8.2 @@ -184,6 +184,7 @@ org.apache.lucene lucene-test-framework 9.0.0 + test org.jetbrains @@ -534,7 +535,7 @@ io.soabase.record-builder record-builder-processor - 32 + 33 @@ -583,10 +584,11 @@ org.junit.jupiter junit-jupiter-engine - 5.8.0-M1 + 5.8.2 + false --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --enable-native-access=ALL-UNNAMED diff --git a/src/main/java/it/cavallium/dbengine/MetricUtils.java b/src/main/java/it/cavallium/dbengine/MetricUtils.java index d3e7e83..7ba4418 100644 --- a/src/main/java/it/cavallium/dbengine/MetricUtils.java +++ b/src/main/java/it/cavallium/dbengine/MetricUtils.java @@ -6,12 +6,15 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * Netty5 hides some metrics. This utility class can read them. */ public class MetricUtils { + private static final Logger LOG = LogManager.getLogger(MetricUtils.class); private static final MethodHandle GET_ARENA_METRICS; static { @@ -24,12 +27,16 @@ public class MetricUtils { var pooledBufferClass = Class.forName("io.netty5.buffer.api.pool.PooledBufferAllocatorMetric"); // Find the handle of the method handle = lookup.findVirtual(pooledBufferClass, "arenaMetrics", MethodType.methodType(List.class)); - } catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException ignored) { - + } catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException ex) { + logMetricsNotAccessible(ex); } GET_ARENA_METRICS = handle; } + private static void logMetricsNotAccessible(Throwable ex) { + LOG.debug("Failed to open pooled buffer allocator metrics", ex); + } + /** * Get the metrics of each pool arena of a pooled allocator * @param allocator Pooled allocator diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index 20ad694..6a53def 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -15,6 +15,8 @@ module dbengine { exports it.cavallium.dbengine.client.query; exports it.cavallium.dbengine.database.memory; exports it.cavallium.dbengine.netty; + opens it.cavallium.dbengine.database.remote; + exports it.cavallium.dbengine; requires org.jetbrains.annotations; requires reactor.core; requires com.google.common; @@ -35,6 +37,8 @@ module dbengine { requires java.compiler; requires org.apache.lucene.analysis.common; requires org.apache.lucene.misc; + requires org.apache.lucene.codecs; + requires org.apache.lucene.backward_codecs; requires lucene.relevance; requires io.netty.buffer; requires io.netty.transport; @@ -49,5 +53,4 @@ module dbengine { requires io.netty.incubator.codec.classes.quic; requires org.apache.lucene.queryparser; requires reactor.netty.incubator.quic; - } \ No newline at end of file diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index 66d110a..07ac83a 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -41,10 +41,10 @@ public class DbTestUtils { return "0123456789".repeat(1024); } - public record TestAllocator(PooledBufferAllocator allocator) {} + public record TestAllocator(TestAllocatorImpl allocator) {} public static TestAllocator newAllocator() { - return new TestAllocator(new PooledBufferAllocator(MemoryManager.instance(), true, 1, 8192, 9, 0, 0, true)); + return new TestAllocator(TestAllocatorImpl.create()); } public static void destroyAllocator(TestAllocator testAllocator) { @@ -52,20 +52,10 @@ public class DbTestUtils { } @SuppressWarnings("SameParameterValue") - private static long getActiveAllocations(PooledBufferAllocator allocator, boolean printStats) { - allocator.trimCurrentThreadCache(); - var metrics = MetricUtils.getPoolArenaMetrics(allocator); - int allocations = 0; - int deallocations = 0; - int activeAllocations = 0; - for (PoolArenaMetric metric : metrics) { - allocations += metric.numAllocations(); - deallocations += metric.numDeallocations(); - activeAllocations += metric.numActiveAllocations(); - } + private static long getActiveAllocations(TestAllocatorImpl allocator, boolean printStats) { + long activeAllocations = allocator.getActiveAllocations(); if (printStats) { - System.out.println("allocations=" + allocations + ", deallocations=" + deallocations - + ", activeAllocations=" + activeAllocations); + System.out.println("activeAllocations=" + activeAllocations); } return activeAllocations; } @@ -102,7 +92,7 @@ public class DbTestUtils { return canUse; } - public static void ensureNoLeaks(PooledBufferAllocator allocator, boolean printStats, boolean useClassicException) { + public static void ensureNoLeaks(TestAllocatorImpl allocator, boolean printStats, boolean useClassicException) { if (allocator != null) { var allocs = getActiveAllocations(allocator, printStats); if (useClassicException) { @@ -203,7 +193,7 @@ public class DbTestUtils { ); } - public static DatabaseMapDictionaryHashed tempDatabaseMapDictionaryHashMap( + public static DatabaseMapDictionaryHashed tempDatabaseMapDictionaryHashMap( LLDictionary dictionary) { return DatabaseMapDictionaryHashed.simple(dictionary, Serializer.UTF8_SERIALIZER, diff --git a/src/test/java/it/cavallium/dbengine/TestAllocatorImpl.java b/src/test/java/it/cavallium/dbengine/TestAllocatorImpl.java new file mode 100644 index 0000000..9f04aef --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/TestAllocatorImpl.java @@ -0,0 +1,144 @@ +package it.cavallium.dbengine; + +import static io.netty5.buffer.api.internal.Statics.allocatorClosedException; +import static io.netty5.buffer.api.internal.Statics.standardDrop; + +import io.netty5.buffer.api.AllocationType; +import io.netty5.buffer.api.AllocatorControl; +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.BufferAllocator; +import io.netty5.buffer.api.Drop; +import io.netty5.buffer.api.MemoryManager; +import io.netty5.buffer.api.StandardAllocationTypes; +import io.netty5.buffer.api.internal.Statics; +import io.netty5.buffer.api.pool.PooledBufferAllocator; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; +import java.util.function.Supplier; + +public class TestAllocatorImpl implements BufferAllocator, AllocatorControl { + + private final TestMemoryManager manager; + private final AllocationType allocationType = StandardAllocationTypes.ON_HEAP; + private volatile boolean closed; + + private TestAllocatorImpl(TestMemoryManager testMemoryManager) { + this.manager = testMemoryManager; + } + + public static TestAllocatorImpl create() { + return new TestAllocatorImpl(new TestMemoryManager(MemoryManager.instance())); + } + + @Override + public boolean isPooling() { + return false; + } + + @Override + public AllocationType getAllocationType() { + return allocationType; + } + + @Override + public Buffer allocate(int size) { + if (closed) { + throw allocatorClosedException(); + } + Statics.assertValidBufferSize(size); + return manager.allocateShared(this, size, Statics.standardDrop(manager), allocationType); + } + + @Override + public Supplier constBufferSupplier(byte[] bytes) { + if (closed) { + throw allocatorClosedException(); + } + Buffer constantBuffer = manager.allocateShared( + this, bytes.length, standardDrop(manager), allocationType); + constantBuffer.writeBytes(bytes).makeReadOnly(); + return () -> manager.allocateConstChild(constantBuffer); + } + + @Override + public void close() { + closed = true; + } + + public long getActiveAllocations() { + return this.manager.getActiveAllocations(); + } + + @Override + public BufferAllocator getAllocator() { + return this; + } + + private static class TestMemoryManager implements MemoryManager { + + private final MemoryManager instance; + private final LongAdder activeAllocations = new LongAdder(); + + public TestMemoryManager(MemoryManager instance) { + this.instance = instance; + } + + @Override + public Buffer allocateShared(AllocatorControl allocatorControl, + long size, + Function, Drop> dropDecorator, + AllocationType allocationType) { + return instance.allocateShared(allocatorControl, size, this::createDrop, allocationType); + } + + @Override + public Buffer allocateConstChild(Buffer readOnlyConstParent) { + return instance.allocateConstChild(readOnlyConstParent); + } + + @Override + public Object unwrapRecoverableMemory(Buffer buf) { + return instance.unwrapRecoverableMemory(buf); + } + + @Override + public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop drop) { + return instance.recoverMemory(allocatorControl, recoverableMemory, drop); + } + + @Override + public Object sliceMemory(Object memory, int offset, int length) { + return instance.sliceMemory(memory, offset, length); + } + + @Override + public String implementationName() { + return instance.implementationName(); + } + + private Drop createDrop(Drop drop) { + activeAllocations.increment(); + return new Drop<>() { + @Override + public void drop(Buffer obj) { + activeAllocations.decrement(); + drop.drop(obj); + } + + @Override + public Drop fork() { + return createDrop(drop.fork()); + } + + @Override + public void attach(Buffer obj) { + drop.attach(obj); + } + }; + } + + public long getActiveAllocations() { + return activeAllocations.longValue(); + } + } +}