Add metrics

This commit is contained in:
Andrea Cavalli 2024-10-30 12:19:50 +01:00
parent 302ae92fc1
commit f419f92662
13 changed files with 303 additions and 46 deletions

35
pom.xml
View File

@ -68,6 +68,17 @@
</snapshotRepository>
</distributionManagement>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-bom</artifactId>
<version>1.12.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.rocksdb</groupId>
@ -190,6 +201,28 @@
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-influx</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-jmx</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.5.10</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java3</artifactId>
<version>4.5.10</version>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
@ -200,7 +233,7 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.1</version>
<version>5.10.3</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -32,6 +32,13 @@ module rockserver.core {
requires reactor.core;
requires reactor.grpc.stub;
requires java.annotation;
requires micrometer.core;
requires micrometer.registry.jmx;
requires micrometer.registry.influx;
requires java.net.http;
requires io.vertx.core;
requires vertx.rx.java3;
requires io.reactivex.rxjava3;
exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common;

View File

@ -27,9 +27,16 @@ module rockserver.core {
requires io.netty.transport.classes.epoll;
requires org.reactivestreams;
requires io.netty.transport.unix.common;
requires reactor.core;
requires reactor.grpc.stub;
requires java.annotation;
requires reactor.core;
requires reactor.grpc.stub;
requires java.annotation;
requires micrometer.core;
requires micrometer.registry.jmx;
requires micrometer.registry.influx;
requires java.net.http;
requires io.vertx.core;
requires vertx.rx.java3;
requires io.reactivex.rxjava3;
exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common;

View File

@ -29,8 +29,11 @@ public class ConfigPrinter {
return """
{
"parallelism": %s,
"metrics": %s,
"global": %s
}""".formatted(stringifyParallelism(o.parallelism()), stringifyGlobalDatabase(o.global()));
}""".formatted(stringifyParallelism(o.parallelism()),
stringifyMetrics(o.metrics()),
stringifyGlobalDatabase(o.global()));
}
public static String stringifyLevel(ColumnLevelConfig o) throws GestaltException {
@ -104,6 +107,46 @@ public class ConfigPrinter {
);
}
public static String stringifyMetrics(MetricsConfig o) throws GestaltException {
return """
{
"influx": %s,
"jmx": %s
}\
""".formatted(stringifyInflux(o.influx()),
stringifyJmx(o.jmx())
);
}
public static String stringifyInflux(InfluxMetricsConfig o) throws GestaltException {
return """
{
"enabled": %b,
"url": "%s",
"bucket": "%s",
"user": "%s",
"token": "%s",
"org": "%s",
"allow-insecure-certificates": %b
}\
""".formatted(o.enabled(),
o.url(),
o.bucket(),
o.user(),
o.token(),
o.org(),
o.allowInsecureCertificates()
);
}
public static String stringifyJmx(JmxMetricsConfig o) throws GestaltException {
return """
{
"enabled": %b
}\
""".formatted(o.enabled());
}
private static String stringifyVolume(VolumeConfig o) throws GestaltException {
return """
{

View File

@ -7,4 +7,6 @@ public interface DatabaseConfig {
GlobalDatabaseConfig global() throws GestaltException;
ParallelismConfig parallelism() throws GestaltException;
MetricsConfig metrics() throws GestaltException;
}

View File

@ -16,28 +16,21 @@ public interface GlobalDatabaseConfig {
boolean allowRocksdbMemoryMapping() throws GestaltException;
@Nullable
Integer maximumOpenFiles() throws GestaltException;
@Nullable Integer maximumOpenFiles() throws GestaltException;
boolean optimistic() throws GestaltException;
@Nullable
DataSize blockCache() throws GestaltException;
@Nullable DataSize blockCache() throws GestaltException;
@Nullable
DataSize writeBufferManager() throws GestaltException;
@Nullable DataSize writeBufferManager() throws GestaltException;
@Nullable
Path logPath() throws GestaltException;
@Nullable Path logPath() throws GestaltException;
@Nullable
Path walPath() throws GestaltException;
@Nullable Path walPath() throws GestaltException;
@Nullable
Path tempSstPath() throws GestaltException;
@Nullable Path tempSstPath() throws GestaltException;
@Nullable
Duration delayWalFlushDuration() throws GestaltException;
@Nullable Duration delayWalFlushDuration() throws GestaltException;
boolean absoluteConsistency() throws GestaltException;
@ -48,6 +41,7 @@ public interface GlobalDatabaseConfig {
VolumeConfig[] volumes() throws GestaltException;
FallbackColumnConfig fallbackColumnOptions() throws GestaltException;
NamedColumnConfig[] columnOptions() throws GestaltException;
}

View File

@ -0,0 +1,24 @@
package it.cavallium.rockserver.core.config;
import java.time.Duration;
import org.github.gestalt.config.exceptions.GestaltException;
import org.jetbrains.annotations.Nullable;
import java.nio.file.Path;
public interface InfluxMetricsConfig {
boolean enabled() throws GestaltException;
@Nullable String url() throws GestaltException;
@Nullable String bucket() throws GestaltException;
@Nullable String user() throws GestaltException;
@Nullable String token() throws GestaltException;
@Nullable String org() throws GestaltException;
@Nullable Boolean allowInsecureCertificates() throws GestaltException;
}

View File

@ -0,0 +1,13 @@
package it.cavallium.rockserver.core.config;
import java.time.Duration;
import org.github.gestalt.config.exceptions.GestaltException;
import org.jetbrains.annotations.Nullable;
import java.nio.file.Path;
public interface JmxMetricsConfig {
boolean enabled() throws GestaltException;
}

View File

@ -0,0 +1,11 @@
package it.cavallium.rockserver.core.config;
import org.github.gestalt.config.exceptions.GestaltException;
public interface MetricsConfig {
InfluxMetricsConfig influx() throws GestaltException;
JmxMetricsConfig jmx() throws GestaltException;
}

View File

@ -5,12 +5,32 @@ import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES;
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue;
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.RequestType.RequestEntriesCount;
import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseFailedUpdate;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CreateColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.DeleteColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Get;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.GetColumnId;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Put;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutMulti;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.ReduceRange;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.SeekTo;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Subsequent;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange;
import it.cavallium.rockserver.core.common.RocksDBException;
import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType;
import it.cavallium.rockserver.core.common.api.proto.OpenTransactionRequest;
import it.cavallium.rockserver.core.config.*;
import it.cavallium.rockserver.core.impl.rocksdb.*;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions;
@ -28,11 +48,13 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
@ -49,8 +71,6 @@ import org.rocksdb.Status.Code;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable {
@ -58,7 +78,6 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
public static final long MAX_TRANSACTION_DURATION_MS = 10_000L;
private static final boolean USE_FAST_GET = true;
private static final byte[] COLUMN_SCHEMAS_COLUMN = "_column_schemas_".getBytes(StandardCharsets.UTF_8);
private static final KV NO_MORE_RESULTS = new KV(new Keys(), null);
private final Logger logger;
private final @Nullable Path path;
private final TransactionalDB db;
@ -75,10 +94,31 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
private final DatabaseConfig config;
private final RocksDBObjects refs;
private final @Nullable Cache cache;
private final MetricsManager metrics;
private final String name;
private final List<Meter> meters = new ArrayList<>();
private final Timer loadTimer;
private final Timer openTransactionTimer;
private final Timer closeTransactionTimer;
private final Timer closeFailedUpdateTimer;
private final Timer createColumnTimer;
private final Timer deleteColumnTimer;
private final Timer getColumnIdTimer;
private final Timer putTimer;
private final Timer putMulti;
private final Timer putBatchTimer;
private final Timer getTimer;
private final Timer openIteratorTimer;
private final Timer closeIteratorTimer;
private final Timer seekToTimer;
private final Timer subsequentTimer;
private final Timer reduceRangeTimer;
private final Timer getRangeTimer;
private Path tempSSTsPath;
public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) throws IOException {
this.path = path;
this.name = name;
this.logger = LoggerFactory.getLogger("db." + name);
this.columns = new NonBlockingHashMapLong<>();
this.txs = new NonBlockingHashMapLong<>();
@ -86,6 +126,27 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
this.columnNamesIndex = new ConcurrentHashMap<>();
this.ops = new SafeShutdown();
DatabaseConfig config = ConfigParser.parse(embeddedConfigPath);
this.metrics = new MetricsManager(config);
this.loadTimer = createTimer(Tags.empty());
this.openTransactionTimer = createActionTimer(OpenTransaction.class);
this.closeTransactionTimer = createActionTimer(CloseTransaction.class);
this.closeFailedUpdateTimer = createActionTimer(CloseFailedUpdate.class);
this.createColumnTimer = createActionTimer(CreateColumn.class);
this.deleteColumnTimer = createActionTimer(DeleteColumn.class);
this.getColumnIdTimer = createActionTimer(GetColumnId.class);
this.putTimer = createActionTimer(Put.class);
this.putMulti = createActionTimer(PutMulti.class);
this.putBatchTimer = createActionTimer(PutBatch.class);
this.getTimer = createActionTimer(Get.class);
this.openIteratorTimer = createActionTimer(OpenIterator.class);
this.closeIteratorTimer = createActionTimer(CloseIterator.class);
this.seekToTimer = createActionTimer(SeekTo.class);
this.subsequentTimer = createActionTimer(Subsequent.class);
this.reduceRangeTimer = createActionTimer(ReduceRange.class);
this.getRangeTimer = createActionTimer(GetRange.class);
var beforeLoad = Instant.now();
this.config = config;
var loadedDb = RocksDBLoader.load(path, config, logger);
this.db = loadedDb.db();
@ -139,6 +200,25 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
if (Boolean.parseBoolean(System.getProperty("rockserver.core.print-config", "true"))) {
logger.info("Database configuration: {}", ConfigPrinter.stringify(config));
}
var afterLoad = Instant.now();
loadTimer.record(Duration.between(beforeLoad, afterLoad));
}
private Timer createActionTimer(Class<? extends RocksDBAPICommand> className) {
return createTimer(Tags.of("action", className.getSimpleName()));
}
private Timer createTimer(Tags tags) {
var t = Timer
.builder("rocksdb.operation.timer")
.publishPercentiles(0.3, 0.5, 0.95)
.publishPercentileHistogram()
.tag("database", this.name)
.tags(tags)
.register(metrics.getRegistry());
meters.add(t);
return t;
}
private ColumnSchema decodeColumnSchema(byte[] value) {
@ -262,6 +342,12 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
if (path == null) {
Utils.deleteDirectory(db.getPath());
}
for (Meter meter : meters) {
meter.close();
}
if (metrics != null) {
metrics.close();
}
} catch (TimeoutException e) {
logger.error("Some operations lasted more than 10 seconds, forcing database shutdown...");
}
@ -451,6 +537,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
@NotNull Keys keys,
@NotNull MemorySegment value,
RequestPut<? super MemorySegment, T> requestType) throws it.cavallium.rockserver.core.common.RocksDBException {
var start = System.nanoTime();
ops.beginOp();
try {
// Column id
@ -469,6 +556,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
} finally {
ops.endOp();
var end = System.nanoTime();
putTimer.record(end - start, TimeUnit.NANOSECONDS);
}
}
@ -823,28 +912,34 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
long columnId,
Keys keys,
RequestGet<? super MemorySegment, T> requestType) throws it.cavallium.rockserver.core.common.RocksDBException {
// Column id
var col = getColumn(columnId);
Tx tx = transactionOrUpdateId != 0 ? getTransaction(transactionOrUpdateId, true) : null;
long updateId;
if (requestType instanceof RequestType.RequestForUpdate<?>) {
if (tx == null) {
tx = openTransactionInternal(MAX_TRANSACTION_DURATION_MS, true);
updateId = allocateTransactionInternal(tx);
} else {
updateId = transactionOrUpdateId;
}
} else {
updateId = 0;
}
var start = System.nanoTime();
try {
return get(arena, tx, updateId, col, keys, requestType);
} catch (Throwable ex) {
if (updateId != 0 && tx.isFromGetForUpdate()) {
closeTransaction(updateId, false);
// Column id
var col = getColumn(columnId);
Tx tx = transactionOrUpdateId != 0 ? getTransaction(transactionOrUpdateId, true) : null;
long updateId;
if (requestType instanceof RequestType.RequestForUpdate<?>) {
if (tx == null) {
tx = openTransactionInternal(MAX_TRANSACTION_DURATION_MS, true);
updateId = allocateTransactionInternal(tx);
} else {
updateId = transactionOrUpdateId;
}
} else {
updateId = 0;
}
throw ex;
try {
return get(arena, tx, updateId, col, keys, requestType);
} catch (Throwable ex) {
if (updateId != 0 && tx.isFromGetForUpdate()) {
closeTransaction(updateId, false);
}
throw ex;
}
} finally {
var end = System.nanoTime();
getTimer.record(end - start, TimeUnit.NANOSECONDS);
}
}

View File

@ -3,4 +3,5 @@ package it.cavallium.rockserver.core.impl;
public interface InternalConnection {
RWScheduler getScheduler();
}

View File

@ -1,4 +1,24 @@
database: {
metrics: {
influx: {
enabled: false
url: ""
bucket: ""
user: ""
token: ""
org: ""
allow-insecure-certificates: true
}
jmx: {
enabled: true
}
}
parallelism: {
# Maximum read tasks in parallel
read: 30
# Maximum write tasks in parallel
write: 10
}
global: {
# Keep false unless you have a legacy database
enable-column-bug: false

View File

@ -29,9 +29,16 @@ module rockserver.core {
requires io.netty.transport.classes.epoll;
requires org.reactivestreams;
requires io.netty.transport.unix.common;
requires reactor.core;
requires reactor.grpc.stub;
requires java.annotation;
requires reactor.core;
requires reactor.grpc.stub;
requires java.annotation;
requires micrometer.core;
requires micrometer.registry.jmx;
requires micrometer.registry.influx;
requires java.net.http;
requires io.vertx.core;
requires vertx.rx.java3;
requires io.reactivex.rxjava3;
exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common;