From 65295dbf0388537c9b543793a4957a229ef774c6 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 28 May 2022 14:34:35 +0200 Subject: [PATCH] Implement NettyMetrics --- .../database/disk/AbstractRocksDBColumn.java | 8 +- .../disk/LLLocalDatabaseConnection.java | 5 - .../disk/LLLocalKeyValueDatabase.java | 5 +- .../memory/LLMemoryDatabaseConnection.java | 5 - .../dbengine/netty/JMXNettyMonitoring.java | 66 ---------- .../netty/JMXNettyMonitoringMBean.java | 24 ---- .../netty/JMXNettyMonitoringManager.java | 64 ---------- .../netty/JMXPoolArenaNettyMonitoring.java | 115 ------------------ .../JMXPoolArenaNettyMonitoringMBean.java | 44 ------- .../netty/JMXPooledNettyMonitoring.java | 53 -------- .../dbengine/netty/NettyMetrics.java | 52 ++++++++ src/main/java/module-info.java | 1 + 12 files changed, 61 insertions(+), 381 deletions(-) delete mode 100644 src/main/java/it/cavallium/dbengine/netty/JMXNettyMonitoring.java delete mode 100644 src/main/java/it/cavallium/dbengine/netty/JMXNettyMonitoringMBean.java delete mode 100644 src/main/java/it/cavallium/dbengine/netty/JMXNettyMonitoringManager.java delete mode 100644 src/main/java/it/cavallium/dbengine/netty/JMXPoolArenaNettyMonitoring.java delete mode 100644 src/main/java/it/cavallium/dbengine/netty/JMXPoolArenaNettyMonitoringMBean.java delete mode 100644 src/main/java/it/cavallium/dbengine/netty/JMXPooledNettyMonitoring.java create mode 100644 src/main/java/it/cavallium/dbengine/netty/NettyMetrics.java diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 599e1c0..394727c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -3,6 +3,8 @@ package it.cavallium.dbengine.database.disk; import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP; import static it.cavallium.dbengine.database.LLUtils.INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES; import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect; +import static java.lang.Boolean.parseBoolean; +import static java.lang.System.getProperty; import static java.util.Objects.requireNonNull; import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue; import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue; @@ -57,6 +59,8 @@ public sealed abstract class AbstractRocksDBColumn implements * Default: true */ private static final boolean USE_DIRECT_BUFFER_BOUNDS = true; + private static final boolean WORKAROUND_MAY_EXIST_FAKE_ZERO + = parseBoolean(getProperty("it.cavallium.dbengine.workaround_may_exist_fake_zero", "false")); private static final byte[] NO_DATA = new byte[0]; protected static final UpdateAtomicResult RESULT_NOTHING = new UpdateAtomicResultNothing(); @@ -385,7 +389,7 @@ public sealed abstract class AbstractRocksDBColumn implements case kExistsWithoutValue: { if (keyMayExistState == kExistsWithoutValue) { isKExistsWithoutValue = true; - } else { + } else if (WORKAROUND_MAY_EXIST_FAKE_ZERO) { // todo: "size == 0 || resultWritable.limit() == 0" is checked because keyMayExist is broken, // and sometimes it returns an empty array, as if it exists if (size == 0 || resultWritable.limit() == 0) { @@ -455,7 +459,7 @@ public sealed abstract class AbstractRocksDBColumn implements if (db.keyMayExist(cfh, readOptions, keyArray, data)) { // todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it // returns an empty array, as if it exists - if (data.getValue() != null && data.getValue().length > 0) { + if (data.getValue() != null && (!WORKAROUND_MAY_EXIST_FAKE_ZERO || data.getValue().length > 0)) { readValueFoundWithBloomCacheBufferSize.record(data.getValue().length); return LLUtils.fromByteArray(alloc, data.getValue()); } else { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java index ee3d64b..f550bc8 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java @@ -6,7 +6,6 @@ import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.LuceneRocksDBManager; -import it.cavallium.dbengine.netty.JMXNettyMonitoringManager; import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; @@ -26,10 +25,6 @@ import reactor.core.scheduler.Schedulers; public class LLLocalDatabaseConnection implements LLDatabaseConnection { - static { - JMXNettyMonitoringManager.initialize(); - } - private final AtomicBoolean connected = new AtomicBoolean(); private final BufferAllocator allocator; private final MeterRegistry meterRegistry; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 2626516..07d99a4 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -338,8 +338,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setPartitionFilters(columnOptions.partitionFilters().orElse(false)) // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters .setIndexType(columnOptions.partitionFilters().orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch) - //todo: replace with kxxhash3 - .setChecksumType(ChecksumType.kCRC32c) + .setChecksumType(ChecksumType.kXXH3) // Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html @@ -428,7 +427,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } else { this.dbRScheduler = Schedulers.newBoundedElastic(threadCap, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - new ShortNamedThreadFactory("db-write-" + name).setDaemon(true).withGroup(new ThreadGroup("database-read")), + new ShortNamedThreadFactory("db-read-" + name).setDaemon(true).withGroup(new ThreadGroup("database-read")), 60 ); } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java index 364d27d..29e9ee4 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java @@ -8,7 +8,6 @@ import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex; import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; import it.cavallium.dbengine.lucene.LuceneHacks; -import it.cavallium.dbengine.netty.JMXNettyMonitoringManager; import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory; import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; @@ -26,10 +25,6 @@ import reactor.core.scheduler.Schedulers; public class LLMemoryDatabaseConnection implements LLDatabaseConnection { - static { - JMXNettyMonitoringManager.initialize(); - } - private final AtomicBoolean connected = new AtomicBoolean(); private final BufferAllocator allocator; private final MeterRegistry meterRegistry; diff --git a/src/main/java/it/cavallium/dbengine/netty/JMXNettyMonitoring.java b/src/main/java/it/cavallium/dbengine/netty/JMXNettyMonitoring.java deleted file mode 100644 index 5fae52f..0000000 --- a/src/main/java/it/cavallium/dbengine/netty/JMXNettyMonitoring.java +++ /dev/null @@ -1,66 +0,0 @@ -package it.cavallium.dbengine.netty; - -import io.netty5.buffer.api.pool.BufferAllocatorMetric; - -public class JMXNettyMonitoring implements JMXNettyMonitoringMBean { - - private final String name; - protected final boolean direct; - private final BufferAllocatorMetric metric; - - public JMXNettyMonitoring(String name, boolean direct, BufferAllocatorMetric metric) { - this.name = name; - this.direct = direct; - this.metric = metric; - } - - @Override - public String getName() { - return name; - } - - @Override - public Long getHeapUsed() { - return direct ? 0 : metric.usedMemory(); - } - - @Override - public Long getDirectUsed() { - return direct ? metric.usedMemory() : 0; - } - - @Override - public Integer getNumHeapArenas() { - return null; - } - - @Override - public Integer getNumDirectArenas() { - return null; - } - - @Override - public Integer getNumThreadLocalCachesArenas() { - return null; - } - - @Override - public Integer getTinyCacheSize() { - return null; - } - - @Override - public Integer getSmallCacheSize() { - return null; - } - - @Override - public Integer getNormalCacheSize() { - return null; - } - - @Override - public Integer getChunkSize() { - return null; - } -} diff --git a/src/main/java/it/cavallium/dbengine/netty/JMXNettyMonitoringMBean.java b/src/main/java/it/cavallium/dbengine/netty/JMXNettyMonitoringMBean.java deleted file mode 100644 index 3d7a536..0000000 --- a/src/main/java/it/cavallium/dbengine/netty/JMXNettyMonitoringMBean.java +++ /dev/null @@ -1,24 +0,0 @@ -package it.cavallium.dbengine.netty; - -public interface JMXNettyMonitoringMBean { - - String getName(); - - Long getHeapUsed(); - - Long getDirectUsed(); - - Integer getNumHeapArenas(); - - Integer getNumDirectArenas(); - - Integer getNumThreadLocalCachesArenas(); - - Integer getTinyCacheSize(); - - Integer getSmallCacheSize(); - - Integer getNormalCacheSize(); - - Integer getChunkSize(); -} diff --git a/src/main/java/it/cavallium/dbengine/netty/JMXNettyMonitoringManager.java b/src/main/java/it/cavallium/dbengine/netty/JMXNettyMonitoringManager.java deleted file mode 100644 index 9b4f440..0000000 --- a/src/main/java/it/cavallium/dbengine/netty/JMXNettyMonitoringManager.java +++ /dev/null @@ -1,64 +0,0 @@ -package it.cavallium.dbengine.netty; - -import io.netty5.buffer.api.BufferAllocator; -import io.netty5.buffer.api.DefaultBufferAllocators; -import it.cavallium.dbengine.MetricUtils; -import io.netty5.buffer.api.pool.PooledBufferAllocator; -import java.lang.management.ManagementFactory; -import java.util.concurrent.atomic.AtomicInteger; -import javax.management.InstanceAlreadyExistsException; -import javax.management.MBeanRegistrationException; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; -import javax.management.StandardMBean; - -public class JMXNettyMonitoringManager { - - private final AtomicInteger nextArenaId = new AtomicInteger(); - private static JMXNettyMonitoringManager instance; - - private final MBeanServer platformMBeanServer; - - private JMXNettyMonitoringManager() { - this.platformMBeanServer = ManagementFactory.getPlatformMBeanServer(); - } - - public static void initialize() { - var instance = getInstance(); - instance.register("global", DefaultBufferAllocators.preferredAllocator()); - } - - public synchronized static JMXNettyMonitoringManager getInstance() { - if (instance == null) { - instance = new JMXNettyMonitoringManager(); - } - return instance; - } - - public void register(String name, BufferAllocator allocator) { - try { - name = name.replaceAll("[^\\p{IsAlphabetic}\\p{IsDigit}_]", "_"); - String type; - StandardMBean mbean; - if (allocator instanceof PooledBufferAllocator pooledAllocator) { - for (var arenaMetric : MetricUtils.getPoolArenaMetrics(pooledAllocator)) { - String arenaType = pooledAllocator.isDirectBufferPooled() ? "direct" : "heap"; - var jmx = new JMXPoolArenaNettyMonitoring(arenaMetric); - mbean = new StandardMBean(jmx, JMXPoolArenaNettyMonitoringMBean.class); - ObjectName botObjectName = new ObjectName("io.netty.stats:name=PoolArena,type=" + arenaType + ",arenaId=" + nextArenaId.getAndIncrement()); - platformMBeanServer.registerMBean(mbean, botObjectName); - } - var jmx = new JMXPooledNettyMonitoring(name, pooledAllocator); - type = "pooled"; - mbean = new StandardMBean(jmx, JMXNettyMonitoringMBean.class); - ObjectName botObjectName = new ObjectName("io.netty.stats:name=ByteBufAllocator,allocatorName=" + name + ",type=" + type); - platformMBeanServer.registerMBean(mbean, botObjectName); - } - - } catch (MalformedObjectNameException | NotCompliantMBeanException | InstanceAlreadyExistsException | MBeanRegistrationException e) { - throw new RuntimeException(e); - } - } -} diff --git a/src/main/java/it/cavallium/dbengine/netty/JMXPoolArenaNettyMonitoring.java b/src/main/java/it/cavallium/dbengine/netty/JMXPoolArenaNettyMonitoring.java deleted file mode 100644 index 13e15cb..0000000 --- a/src/main/java/it/cavallium/dbengine/netty/JMXPoolArenaNettyMonitoring.java +++ /dev/null @@ -1,115 +0,0 @@ -package it.cavallium.dbengine.netty; - - -import io.netty5.buffer.api.pool.PoolArenaMetric; - -public class JMXPoolArenaNettyMonitoring implements JMXPoolArenaNettyMonitoringMBean { - - private final PoolArenaMetric metric; - - public JMXPoolArenaNettyMonitoring(PoolArenaMetric metric) { - this.metric = metric; - } - - @Override - public Integer getNumThreadCaches() { - return metric.numThreadCaches(); - } - - @Deprecated - @Override - public Integer getNumTinySubpages() { - return 0; - } - - @Override - public Integer getNumSmallSubpages() { - return metric.numSmallSubpages(); - } - - @Override - public Integer getNumChunkLists() { - return metric.numChunkLists(); - } - - @Override - public Long getNumAllocations() { - return metric.numAllocations(); - } - - @Override - public Long getNumTinyAllocations() { - return 0L; - } - - @Override - public Long getNumSmallAllocations() { - return metric.numSmallAllocations(); - } - - @Override - public Long getNumNormalAllocations() { - return metric.numNormalAllocations(); - } - - @Override - public Long getNumHugeAllocations() { - return metric.numHugeAllocations(); - } - - @Override - public Long getNumDeallocations() { - return metric.numDeallocations(); - } - - @Override - public Long getNumTinyDeallocations() { - return 0L; - } - - @Override - public Long getNumSmallDeallocations() { - return metric.numSmallDeallocations(); - } - - @Override - public Long getNumNormalDeallocations() { - return metric.numNormalDeallocations(); - } - - @Override - public Long getNumHugeDeallocations() { - return metric.numHugeDeallocations(); - } - - @Override - public Long getNumActiveAllocations() { - return metric.numActiveAllocations(); - } - - @Deprecated - @Override - public Long getNumActiveTinyAllocations() { - return 0L; - } - - @Override - public Long getNumActiveSmallAllocations() { - return metric.numActiveSmallAllocations(); - } - - @Override - public Long getNumActiveNormalAllocations() { - return metric.numActiveNormalAllocations(); - } - - @Override - public Long getNumActiveHugeAllocations() { - return metric.numActiveHugeAllocations(); - } - - @Override - public Long getNumActiveBytes() { - return metric.numActiveBytes(); - } -} diff --git a/src/main/java/it/cavallium/dbengine/netty/JMXPoolArenaNettyMonitoringMBean.java b/src/main/java/it/cavallium/dbengine/netty/JMXPoolArenaNettyMonitoringMBean.java deleted file mode 100644 index 049d848..0000000 --- a/src/main/java/it/cavallium/dbengine/netty/JMXPoolArenaNettyMonitoringMBean.java +++ /dev/null @@ -1,44 +0,0 @@ -package it.cavallium.dbengine.netty; - -public interface JMXPoolArenaNettyMonitoringMBean { - - Integer getNumThreadCaches(); - - Integer getNumTinySubpages(); - - Integer getNumSmallSubpages(); - - Integer getNumChunkLists(); - - Long getNumAllocations(); - - Long getNumTinyAllocations(); - - Long getNumSmallAllocations(); - - Long getNumNormalAllocations(); - - Long getNumHugeAllocations(); - - Long getNumDeallocations(); - - Long getNumTinyDeallocations(); - - Long getNumSmallDeallocations(); - - Long getNumNormalDeallocations(); - - Long getNumHugeDeallocations(); - - Long getNumActiveAllocations(); - - Long getNumActiveTinyAllocations(); - - Long getNumActiveSmallAllocations(); - - Long getNumActiveNormalAllocations(); - - Long getNumActiveHugeAllocations(); - - Long getNumActiveBytes(); -} diff --git a/src/main/java/it/cavallium/dbengine/netty/JMXPooledNettyMonitoring.java b/src/main/java/it/cavallium/dbengine/netty/JMXPooledNettyMonitoring.java deleted file mode 100644 index fef5072..0000000 --- a/src/main/java/it/cavallium/dbengine/netty/JMXPooledNettyMonitoring.java +++ /dev/null @@ -1,53 +0,0 @@ -package it.cavallium.dbengine.netty; - -import io.netty5.buffer.api.pool.BufferAllocatorMetric; -import io.netty5.buffer.api.pool.PooledBufferAllocator; -import java.lang.reflect.Field; - -public class JMXPooledNettyMonitoring extends JMXNettyMonitoring implements JMXNettyMonitoringMBean { - - private final PooledBufferAllocator alloc; - private final BufferAllocatorMetric metric; - - public JMXPooledNettyMonitoring(String name, PooledBufferAllocator alloc) { - super(name, alloc.isDirectBufferPooled(), alloc.metric()); - this.alloc = alloc; - this.metric = alloc.metric(); - } - - @Override - public Integer getNumHeapArenas() { - return direct ? 0 : alloc.numArenas(); - } - - @Override - public Integer getNumDirectArenas() { - return direct ? alloc.numArenas() : 0; - } - - @Override - public Integer getNumThreadLocalCachesArenas() { - return 0; - } - - @Deprecated - @Override - public Integer getTinyCacheSize() { - return 0; - } - - @Override - public Integer getSmallCacheSize() { - return 0; - } - - @Override - public Integer getNormalCacheSize() { - return 0; - } - - @Override - public Integer getChunkSize() { - return 0; - } -} diff --git a/src/main/java/it/cavallium/dbengine/netty/NettyMetrics.java b/src/main/java/it/cavallium/dbengine/netty/NettyMetrics.java new file mode 100644 index 0000000..0418ed0 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/netty/NettyMetrics.java @@ -0,0 +1,52 @@ +package it.cavallium.dbengine.netty; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.binder.MeterBinder; +import io.netty5.buffer.api.BufferAllocator; +import io.netty5.buffer.api.pool.BufferAllocatorMetric; +import io.netty5.buffer.api.pool.BufferAllocatorMetricProvider; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.jetbrains.annotations.NotNull; + +public class NettyMetrics implements MeterBinder { + + private final BufferAllocator allocator; + private final String resourceName; + private final String allocatorName; + private final Map extraTags; + + public NettyMetrics(BufferAllocator allocator, + String resourceName, + String allocatorName, + Map extraTags) { + this.allocator = allocator; + this.resourceName = resourceName; + this.allocatorName = allocatorName; + this.extraTags = extraTags; + } + + @Override + public void bindTo(@NotNull MeterRegistry registry) { + var direct = allocator.getAllocationType().isDirect(); + var pooling = allocator.isPooling(); + List tags = new ArrayList<>(); + tags.add(Tag.of("resource", resourceName)); + tags.add(Tag.of("allocator", allocatorName)); + tags.add(Tag.of("type", direct ? "direct" : "heap")); + tags.add(Tag.of("pooling-mode", pooling ? "pooled" : "unpooled")); + extraTags.forEach((key, value) -> tags.add(Tag.of(key, value))); + + if (allocator instanceof BufferAllocatorMetricProvider metricProvider) { + var metric = metricProvider.metric(); + registry.gauge("netty.chunk.size", tags, metric, BufferAllocatorMetric::chunkSize); + registry.gauge("netty.num.arenas", tags, metric, BufferAllocatorMetric::numArenas); + registry.gauge("netty.num.thread.local.caches", tags, metric, BufferAllocatorMetric::numThreadLocalCaches); + registry.gauge("netty.small.cache.size", tags, metric, BufferAllocatorMetric::smallCacheSize); + registry.gauge("netty.normal.cache.size", tags, metric, BufferAllocatorMetric::normalCacheSize); + } + } + +} diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index 1f66aa2..97c1e05 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -1,4 +1,5 @@ module dbengine { + uses io.netty5.buffer.api.pool.BufferAllocatorMetricProvider; exports it.cavallium.dbengine.lucene; exports it.cavallium.dbengine.database; exports it.cavallium.dbengine.rpc.current.data;