diff --git a/pom.xml b/pom.xml index 4811ca8..2fb5259 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,17 @@ + + + + io.micrometer + micrometer-bom + 1.12.4 + pom + import + + + org.rocksdb @@ -190,6 +201,28 @@ javax.annotation-api 1.3.2 + + io.micrometer + micrometer-core + + + io.micrometer + micrometer-registry-influx + + + io.micrometer + micrometer-registry-jmx + + + io.vertx + vertx-core + 4.5.10 + + + io.vertx + vertx-rx-java3 + 4.5.10 + org.lz4 @@ -200,7 +233,7 @@ org.junit.jupiter junit-jupiter - 5.10.1 + 5.10.3 test diff --git a/src/fatjar/java/module-info.java b/src/fatjar/java/module-info.java index c8ee4f3..fd46e6a 100644 --- a/src/fatjar/java/module-info.java +++ b/src/fatjar/java/module-info.java @@ -32,6 +32,13 @@ module rockserver.core { requires reactor.core; requires reactor.grpc.stub; requires java.annotation; + requires micrometer.core; + requires micrometer.registry.jmx; + requires micrometer.registry.influx; + requires java.net.http; + requires io.vertx.core; + requires vertx.rx.java3; + requires io.reactivex.rxjava3; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common; diff --git a/src/library/java/module-info.java b/src/library/java/module-info.java index 6e34027..be835ff 100644 --- a/src/library/java/module-info.java +++ b/src/library/java/module-info.java @@ -27,9 +27,16 @@ module rockserver.core { requires io.netty.transport.classes.epoll; requires org.reactivestreams; requires io.netty.transport.unix.common; - requires reactor.core; - requires reactor.grpc.stub; - requires java.annotation; + requires reactor.core; + requires reactor.grpc.stub; + requires java.annotation; + requires micrometer.core; + requires micrometer.registry.jmx; + requires micrometer.registry.influx; + requires java.net.http; + requires io.vertx.core; + requires vertx.rx.java3; + requires io.reactivex.rxjava3; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common; diff --git a/src/main/java/it/cavallium/rockserver/core/config/ConfigPrinter.java b/src/main/java/it/cavallium/rockserver/core/config/ConfigPrinter.java index df207d0..f5ad604 100644 --- a/src/main/java/it/cavallium/rockserver/core/config/ConfigPrinter.java +++ b/src/main/java/it/cavallium/rockserver/core/config/ConfigPrinter.java @@ -29,8 +29,11 @@ public class ConfigPrinter { return """ { "parallelism": %s, + "metrics": %s, "global": %s - }""".formatted(stringifyParallelism(o.parallelism()), stringifyGlobalDatabase(o.global())); + }""".formatted(stringifyParallelism(o.parallelism()), + stringifyMetrics(o.metrics()), + stringifyGlobalDatabase(o.global())); } public static String stringifyLevel(ColumnLevelConfig o) throws GestaltException { @@ -104,6 +107,46 @@ public class ConfigPrinter { ); } + public static String stringifyMetrics(MetricsConfig o) throws GestaltException { + return """ + { + "influx": %s, + "jmx": %s + }\ + """.formatted(stringifyInflux(o.influx()), + stringifyJmx(o.jmx()) + ); + } + + public static String stringifyInflux(InfluxMetricsConfig o) throws GestaltException { + return """ + { + "enabled": %b, + "url": "%s", + "bucket": "%s", + "user": "%s", + "token": "%s", + "org": "%s", + "allow-insecure-certificates": %b + }\ + """.formatted(o.enabled(), + o.url(), + o.bucket(), + o.user(), + o.token(), + o.org(), + o.allowInsecureCertificates() + ); + } + + public static String stringifyJmx(JmxMetricsConfig o) throws GestaltException { + return """ + { + "enabled": %b + }\ + """.formatted(o.enabled()); + } + private static String stringifyVolume(VolumeConfig o) throws GestaltException { return """ { diff --git a/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java b/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java index 6c20d9d..abdb01b 100644 --- a/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java +++ b/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java @@ -7,4 +7,6 @@ public interface DatabaseConfig { GlobalDatabaseConfig global() throws GestaltException; ParallelismConfig parallelism() throws GestaltException; + + MetricsConfig metrics() throws GestaltException; } diff --git a/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java b/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java index 8d9887e..99cd7ee 100644 --- a/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java +++ b/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java @@ -16,28 +16,21 @@ public interface GlobalDatabaseConfig { boolean allowRocksdbMemoryMapping() throws GestaltException; - @Nullable - Integer maximumOpenFiles() throws GestaltException; + @Nullable Integer maximumOpenFiles() throws GestaltException; boolean optimistic() throws GestaltException; - @Nullable - DataSize blockCache() throws GestaltException; + @Nullable DataSize blockCache() throws GestaltException; - @Nullable - DataSize writeBufferManager() throws GestaltException; + @Nullable DataSize writeBufferManager() throws GestaltException; - @Nullable - Path logPath() throws GestaltException; + @Nullable Path logPath() throws GestaltException; - @Nullable - Path walPath() throws GestaltException; + @Nullable Path walPath() throws GestaltException; - @Nullable - Path tempSstPath() throws GestaltException; + @Nullable Path tempSstPath() throws GestaltException; - @Nullable - Duration delayWalFlushDuration() throws GestaltException; + @Nullable Duration delayWalFlushDuration() throws GestaltException; boolean absoluteConsistency() throws GestaltException; @@ -48,6 +41,7 @@ public interface GlobalDatabaseConfig { VolumeConfig[] volumes() throws GestaltException; FallbackColumnConfig fallbackColumnOptions() throws GestaltException; + NamedColumnConfig[] columnOptions() throws GestaltException; } diff --git a/src/main/java/it/cavallium/rockserver/core/config/InfluxMetricsConfig.java b/src/main/java/it/cavallium/rockserver/core/config/InfluxMetricsConfig.java new file mode 100644 index 0000000..89b216c --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/InfluxMetricsConfig.java @@ -0,0 +1,24 @@ +package it.cavallium.rockserver.core.config; + +import java.time.Duration; +import org.github.gestalt.config.exceptions.GestaltException; +import org.jetbrains.annotations.Nullable; + +import java.nio.file.Path; + +public interface InfluxMetricsConfig { + + boolean enabled() throws GestaltException; + + @Nullable String url() throws GestaltException; + + @Nullable String bucket() throws GestaltException; + + @Nullable String user() throws GestaltException; + + @Nullable String token() throws GestaltException; + + @Nullable String org() throws GestaltException; + + @Nullable Boolean allowInsecureCertificates() throws GestaltException; +} diff --git a/src/main/java/it/cavallium/rockserver/core/config/JmxMetricsConfig.java b/src/main/java/it/cavallium/rockserver/core/config/JmxMetricsConfig.java new file mode 100644 index 0000000..41985ef --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/JmxMetricsConfig.java @@ -0,0 +1,13 @@ +package it.cavallium.rockserver.core.config; + +import java.time.Duration; +import org.github.gestalt.config.exceptions.GestaltException; +import org.jetbrains.annotations.Nullable; + +import java.nio.file.Path; + +public interface JmxMetricsConfig { + + boolean enabled() throws GestaltException; + +} diff --git a/src/main/java/it/cavallium/rockserver/core/config/MetricsConfig.java b/src/main/java/it/cavallium/rockserver/core/config/MetricsConfig.java new file mode 100644 index 0000000..3752ee6 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/MetricsConfig.java @@ -0,0 +1,11 @@ +package it.cavallium.rockserver.core.config; + +import org.github.gestalt.config.exceptions.GestaltException; + +public interface MetricsConfig { + + InfluxMetricsConfig influx() throws GestaltException; + + JmxMetricsConfig jmx() throws GestaltException; + +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index e130d02..dc3ff78 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -5,12 +5,32 @@ import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES; import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue; import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.RequestType.RequestEntriesCount; import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestPut; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseFailedUpdate; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseIterator; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseTransaction; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CreateColumn; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.DeleteColumn; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Get; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.GetColumnId; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenIterator; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenTransaction; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Put; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutMulti; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.ReduceRange; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.SeekTo; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Subsequent; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange; import it.cavallium.rockserver.core.common.RocksDBException; import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; +import it.cavallium.rockserver.core.common.api.proto.OpenTransactionRequest; import it.cavallium.rockserver.core.config.*; import it.cavallium.rockserver.core.impl.rocksdb.*; import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions; @@ -28,11 +48,13 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Stream; @@ -49,8 +71,6 @@ import org.rocksdb.Status.Code; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable { @@ -58,7 +78,6 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable public static final long MAX_TRANSACTION_DURATION_MS = 10_000L; private static final boolean USE_FAST_GET = true; private static final byte[] COLUMN_SCHEMAS_COLUMN = "_column_schemas_".getBytes(StandardCharsets.UTF_8); - private static final KV NO_MORE_RESULTS = new KV(new Keys(), null); private final Logger logger; private final @Nullable Path path; private final TransactionalDB db; @@ -75,10 +94,31 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable private final DatabaseConfig config; private final RocksDBObjects refs; private final @Nullable Cache cache; + private final MetricsManager metrics; + private final String name; + private final List meters = new ArrayList<>(); + private final Timer loadTimer; + private final Timer openTransactionTimer; + private final Timer closeTransactionTimer; + private final Timer closeFailedUpdateTimer; + private final Timer createColumnTimer; + private final Timer deleteColumnTimer; + private final Timer getColumnIdTimer; + private final Timer putTimer; + private final Timer putMulti; + private final Timer putBatchTimer; + private final Timer getTimer; + private final Timer openIteratorTimer; + private final Timer closeIteratorTimer; + private final Timer seekToTimer; + private final Timer subsequentTimer; + private final Timer reduceRangeTimer; + private final Timer getRangeTimer; private Path tempSSTsPath; public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) throws IOException { this.path = path; + this.name = name; this.logger = LoggerFactory.getLogger("db." + name); this.columns = new NonBlockingHashMapLong<>(); this.txs = new NonBlockingHashMapLong<>(); @@ -86,6 +126,27 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable this.columnNamesIndex = new ConcurrentHashMap<>(); this.ops = new SafeShutdown(); DatabaseConfig config = ConfigParser.parse(embeddedConfigPath); + + this.metrics = new MetricsManager(config); + this.loadTimer = createTimer(Tags.empty()); + this.openTransactionTimer = createActionTimer(OpenTransaction.class); + this.closeTransactionTimer = createActionTimer(CloseTransaction.class); + this.closeFailedUpdateTimer = createActionTimer(CloseFailedUpdate.class); + this.createColumnTimer = createActionTimer(CreateColumn.class); + this.deleteColumnTimer = createActionTimer(DeleteColumn.class); + this.getColumnIdTimer = createActionTimer(GetColumnId.class); + this.putTimer = createActionTimer(Put.class); + this.putMulti = createActionTimer(PutMulti.class); + this.putBatchTimer = createActionTimer(PutBatch.class); + this.getTimer = createActionTimer(Get.class); + this.openIteratorTimer = createActionTimer(OpenIterator.class); + this.closeIteratorTimer = createActionTimer(CloseIterator.class); + this.seekToTimer = createActionTimer(SeekTo.class); + this.subsequentTimer = createActionTimer(Subsequent.class); + this.reduceRangeTimer = createActionTimer(ReduceRange.class); + this.getRangeTimer = createActionTimer(GetRange.class); + + var beforeLoad = Instant.now(); this.config = config; var loadedDb = RocksDBLoader.load(path, config, logger); this.db = loadedDb.db(); @@ -139,6 +200,25 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable if (Boolean.parseBoolean(System.getProperty("rockserver.core.print-config", "true"))) { logger.info("Database configuration: {}", ConfigPrinter.stringify(config)); } + var afterLoad = Instant.now(); + + loadTimer.record(Duration.between(beforeLoad, afterLoad)); + } + + private Timer createActionTimer(Class className) { + return createTimer(Tags.of("action", className.getSimpleName())); + } + + private Timer createTimer(Tags tags) { + var t = Timer + .builder("rocksdb.operation.timer") + .publishPercentiles(0.3, 0.5, 0.95) + .publishPercentileHistogram() + .tag("database", this.name) + .tags(tags) + .register(metrics.getRegistry()); + meters.add(t); + return t; } private ColumnSchema decodeColumnSchema(byte[] value) { @@ -262,6 +342,12 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable if (path == null) { Utils.deleteDirectory(db.getPath()); } + for (Meter meter : meters) { + meter.close(); + } + if (metrics != null) { + metrics.close(); + } } catch (TimeoutException e) { logger.error("Some operations lasted more than 10 seconds, forcing database shutdown..."); } @@ -451,6 +537,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable @NotNull Keys keys, @NotNull MemorySegment value, RequestPut requestType) throws it.cavallium.rockserver.core.common.RocksDBException { + var start = System.nanoTime(); ops.beginOp(); try { // Column id @@ -469,6 +556,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex); } finally { ops.endOp(); + var end = System.nanoTime(); + putTimer.record(end - start, TimeUnit.NANOSECONDS); } } @@ -823,28 +912,34 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable long columnId, Keys keys, RequestGet requestType) throws it.cavallium.rockserver.core.common.RocksDBException { - // Column id - var col = getColumn(columnId); - Tx tx = transactionOrUpdateId != 0 ? getTransaction(transactionOrUpdateId, true) : null; - long updateId; - if (requestType instanceof RequestType.RequestForUpdate) { - if (tx == null) { - tx = openTransactionInternal(MAX_TRANSACTION_DURATION_MS, true); - updateId = allocateTransactionInternal(tx); - } else { - updateId = transactionOrUpdateId; - } - } else { - updateId = 0; - } - + var start = System.nanoTime(); try { - return get(arena, tx, updateId, col, keys, requestType); - } catch (Throwable ex) { - if (updateId != 0 && tx.isFromGetForUpdate()) { - closeTransaction(updateId, false); + // Column id + var col = getColumn(columnId); + Tx tx = transactionOrUpdateId != 0 ? getTransaction(transactionOrUpdateId, true) : null; + long updateId; + if (requestType instanceof RequestType.RequestForUpdate) { + if (tx == null) { + tx = openTransactionInternal(MAX_TRANSACTION_DURATION_MS, true); + updateId = allocateTransactionInternal(tx); + } else { + updateId = transactionOrUpdateId; + } + } else { + updateId = 0; } - throw ex; + + try { + return get(arena, tx, updateId, col, keys, requestType); + } catch (Throwable ex) { + if (updateId != 0 && tx.isFromGetForUpdate()) { + closeTransaction(updateId, false); + } + throw ex; + } + } finally { + var end = System.nanoTime(); + getTimer.record(end - start, TimeUnit.NANOSECONDS); } } diff --git a/src/main/java/it/cavallium/rockserver/core/impl/InternalConnection.java b/src/main/java/it/cavallium/rockserver/core/impl/InternalConnection.java index 8b21ccc..9b9cde1 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/InternalConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/InternalConnection.java @@ -3,4 +3,5 @@ package it.cavallium.rockserver.core.impl; public interface InternalConnection { RWScheduler getScheduler(); + } diff --git a/src/main/resources/it/cavallium/rockserver/core/resources/default.conf b/src/main/resources/it/cavallium/rockserver/core/resources/default.conf index 43e004f..ab418fa 100644 --- a/src/main/resources/it/cavallium/rockserver/core/resources/default.conf +++ b/src/main/resources/it/cavallium/rockserver/core/resources/default.conf @@ -1,4 +1,24 @@ database: { + metrics: { + influx: { + enabled: false + url: "" + bucket: "" + user: "" + token: "" + org: "" + allow-insecure-certificates: true + } + jmx: { + enabled: true + } + } + parallelism: { + # Maximum read tasks in parallel + read: 30 + # Maximum write tasks in parallel + write: 10 + } global: { # Keep false unless you have a legacy database enable-column-bug: false diff --git a/src/native/java/module-info.java b/src/native/java/module-info.java index 6aba5a5..5b35a82 100644 --- a/src/native/java/module-info.java +++ b/src/native/java/module-info.java @@ -29,9 +29,16 @@ module rockserver.core { requires io.netty.transport.classes.epoll; requires org.reactivestreams; requires io.netty.transport.unix.common; - requires reactor.core; - requires reactor.grpc.stub; - requires java.annotation; + requires reactor.core; + requires reactor.grpc.stub; + requires java.annotation; + requires micrometer.core; + requires micrometer.registry.jmx; + requires micrometer.registry.influx; + requires java.net.http; + requires io.vertx.core; + requires vertx.rx.java3; + requires io.reactivex.rxjava3; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common;