diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index dc3ff78..0df39a7 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -30,7 +30,6 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSi import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange; import it.cavallium.rockserver.core.common.RocksDBException; import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; -import it.cavallium.rockserver.core.common.api.proto.OpenTransactionRequest; import it.cavallium.rockserver.core.config.*; import it.cavallium.rockserver.core.impl.rocksdb.*; import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions; @@ -56,6 +55,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.LongAdder; import java.util.stream.Stream; import org.cliffc.high_scale_lib.NonBlockingHashMapLong; @@ -105,7 +105,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable private final Timer deleteColumnTimer; private final Timer getColumnIdTimer; private final Timer putTimer; - private final Timer putMulti; + private final Timer putMultiTimer; private final Timer putBatchTimer; private final Timer getTimer; private final Timer openIteratorTimer; @@ -136,7 +136,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable this.deleteColumnTimer = createActionTimer(DeleteColumn.class); this.getColumnIdTimer = createActionTimer(GetColumnId.class); this.putTimer = createActionTimer(Put.class); - this.putMulti = createActionTimer(PutMulti.class); + this.putMultiTimer = createActionTimer(PutMulti.class); this.putBatchTimer = createActionTimer(PutBatch.class); this.getTimer = createActionTimer(Get.class); this.openIteratorTimer = createActionTimer(OpenIterator.class); @@ -361,7 +361,13 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable @Override public long openTransaction(long timeoutMs) { - return allocateTransactionInternal(openTransactionInternal(timeoutMs, false)); + var start = System.nanoTime(); + try { + return allocateTransactionInternal(openTransactionInternal(timeoutMs, false)); + } finally { + var end = System.nanoTime(); + openTransactionTimer.record(end - start, TimeUnit.NANOSECONDS); + } } private long allocateTransactionInternal(Tx tx) { @@ -383,10 +389,20 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable @Override public boolean closeTransaction(long transactionId, boolean commit) { + var start = System.nanoTime(); + try { + return closeTransactionInternal(transactionId, commit); + } finally { + var end = System.nanoTime(); + closeTransactionTimer.record(end - start, TimeUnit.NANOSECONDS); + } + } + + private boolean closeTransactionInternal(long transactionId, boolean commit) { var tx = txs.get(transactionId); if (tx != null) { try { - var committed = closeTransaction(tx, commit); + var committed = closeTransactionInternal(tx, commit); if (committed) { txs.remove(transactionId, tx); } @@ -405,7 +421,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable } } - private boolean closeTransaction(@NotNull Tx tx, boolean commit) { + private boolean closeTransactionInternal(@NotNull Tx tx, boolean commit) { ops.beginOp(); try { // Transaction found @@ -436,7 +452,13 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable @Override public void closeFailedUpdate(long updateId) throws it.cavallium.rockserver.core.common.RocksDBException { - this.closeTransaction(updateId, false); + var start = System.nanoTime(); + try { + closeTransactionInternal(updateId, false); + } finally { + var end = System.nanoTime(); + closeFailedUpdateTimer.record(end - start, TimeUnit.NANOSECONDS); + } } private boolean commitTxOptimistically(@NotNull Tx tx) throws org.rocksdb.RocksDBException { @@ -454,6 +476,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable @Override public long createColumn(String name, @NotNull ColumnSchema schema) throws it.cavallium.rockserver.core.common.RocksDBException { + var start = System.nanoTime(); ops.beginOp(); try { synchronized (columnEditLock) { @@ -488,11 +511,14 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable } } finally { ops.endOp(); + var end = System.nanoTime(); + createColumnTimer.record(end - start, TimeUnit.NANOSECONDS); } } @Override public void deleteColumn(long columnId) throws it.cavallium.rockserver.core.common.RocksDBException { + var start = System.nanoTime(); ops.beginOp(); try { synchronized (columnEditLock) { @@ -506,17 +532,25 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable } } finally { ops.endOp(); + var end = System.nanoTime(); + deleteColumnTimer.record(end - start, TimeUnit.NANOSECONDS); } } @Override public long getColumnId(@NotNull String name) { - var columnId = getColumnIdOrNull(name); - if (columnId == null) { - throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_NOT_FOUND, - "Column not found: " + name); - } else { - return columnId; + var start = System.nanoTime(); + try { + var columnId = getColumnIdOrNull(name); + if (columnId == null) { + throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_NOT_FOUND, + "Column not found: " + name); + } else { + return columnId; + } + } finally { + var end = System.nanoTime(); + getColumnIdTimer.record(end - start, TimeUnit.NANOSECONDS); } } @@ -568,17 +602,23 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable @NotNull List keys, @NotNull List<@NotNull MemorySegment> values, RequestPut requestType) throws it.cavallium.rockserver.core.common.RocksDBException { - if (keys.size() != values.size()) { - throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size()); - } - List responses = requestType instanceof RequestType.RequestNothing ? null : new ArrayList<>(keys.size()); - for (int i = 0; i < keys.size(); i++) { - var result = put(arena, transactionOrUpdateId, columnId, keys.get(i), values.get(i), requestType); - if (responses != null) { - responses.add(result); + var start = System.nanoTime(); + try { + if (keys.size() != values.size()) { + throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size()); } + List responses = requestType instanceof RequestType.RequestNothing ? null : new ArrayList<>(keys.size()); + for (int i = 0; i < keys.size(); i++) { + var result = put(arena, transactionOrUpdateId, columnId, keys.get(i), values.get(i), requestType); + if (responses != null) { + responses.add(result); + } + } + return responses != null ? responses : List.of(); + } finally { + var end = System.nanoTime(); + putMultiTimer.record(end - start, TimeUnit.NANOSECONDS); } - return responses != null ? responses : List.of(); } public CompletableFuture putBatchInternal(long columnId, @@ -722,24 +762,28 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable if (Files.notExists(tempSSTsPath)) { Files.createDirectories(tempSSTsPath); } - return SSTWriter.open(tempSSTsPath, db, col, columnConifg, forceNoOptions, ingestBehind, refs); - } catch (IOException ex) { + return SSTWriter.open(tempSSTsPath, db, col, columnConifg, forceNoOptions, ingestBehind, refs); + } catch (IOException ex) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_2, ex); - } catch (org.rocksdb.RocksDBException ex) { + } catch (org.rocksdb.RocksDBException ex) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_3, ex); - } - } + } + } @Override public void putBatch(long columnId, @NotNull Publisher<@NotNull KVBatch> batchPublisher, @NotNull PutBatchMode mode) throws it.cavallium.rockserver.core.common.RocksDBException { + var start = System.nanoTime(); try { putBatchInternal(columnId, batchPublisher, mode).get(); - } catch (it.cavallium.rockserver.core.common.RocksDBException ex) { + } catch (it.cavallium.rockserver.core.common.RocksDBException ex) { throw ex; } catch (Exception ex) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex); + } finally { + var end = System.nanoTime(); + putBatchTimer.record(end - start, TimeUnit.NANOSECONDS); } } @@ -770,13 +814,13 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable do { //noinspection unchecked result = txConsumer.apply((T) newTx); - committed = this.closeTransaction(newTx, true); + committed = this.closeTransactionInternal(newTx, true); if (!committed) { Thread.yield(); } } while (!committed); } finally { - this.closeTransaction(newTx, false); + this.closeTransactionInternal(newTx, false); } } else { result = txConsumer.apply(tx); @@ -1020,6 +1064,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable @Nullable Keys endKeysExclusive, boolean reverse, long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { + var start = System.nanoTime(); // Open an operation that ends when the iterator is closed ops.beginOp(); try { @@ -1036,29 +1081,37 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable return FastRandomUtils.allocateNewValue(its, itEntry, 1, Long.MAX_VALUE); } catch (Throwable ex) { ops.endOp(); + var end = System.nanoTime(); + openIteratorTimer.record(end - start, TimeUnit.NANOSECONDS); throw ex; } } @Override public void closeIterator(long iteratorId) throws it.cavallium.rockserver.core.common.RocksDBException { + var start = System.nanoTime(); ops.beginOp(); try { // Should close the iterator operation throw new UnsupportedOperationException(); } finally { ops.endOp(); + var end = System.nanoTime(); + closeIteratorTimer.record(end - start, TimeUnit.NANOSECONDS); } } @Override public void seekTo(Arena arena, long iterationId, @NotNull Keys keys) throws it.cavallium.rockserver.core.common.RocksDBException { + var start = System.nanoTime(); ops.beginOp(); try { throw new UnsupportedOperationException(); } finally { ops.endOp(); + var end = System.nanoTime(); + seekToTimer.record(end - start, TimeUnit.NANOSECONDS); } } @@ -1068,11 +1121,14 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable long skipCount, long takeCount, @NotNull RequestType.RequestIterate requestType) throws it.cavallium.rockserver.core.common.RocksDBException { + var start = System.nanoTime(); ops.beginOp(); try { throw new UnsupportedOperationException(); } finally { ops.endOp(); + var end = System.nanoTime(); + subsequentTimer.record(end - start, TimeUnit.NANOSECONDS); } } @@ -1086,6 +1142,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable boolean reverse, RequestType.@NotNull RequestReduceRange requestType, long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { + var start = System.nanoTime(); ops.beginOp(); try { var col = getColumn(columnId); @@ -1172,11 +1229,20 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable } } finally { ops.endOp(); + var end = System.nanoTime(); + reduceRangeTimer.record(end - start, TimeUnit.NANOSECONDS); } } @Override - public Stream getRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange requestType, long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { + public Stream getRange(Arena arena, + long transactionId, + long columnId, + @Nullable Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + @NotNull RequestType.RequestGetRange requestType, + long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { return Flux .from(this.getRangeAsyncInternal(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs)) .toStream(); @@ -1191,6 +1257,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable boolean reverse, RequestType.RequestGetRange requestType, long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { + LongAdder totalTime = new LongAdder(); record Resources(ColumnInstance col, ReadOptions ro, AbstractSlice startKeySlice, AbstractSlice endKeySlice, RocksIterator it) { public void close() { @@ -1201,6 +1268,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable } } return Flux.using(() -> { + var initializationStartTime = System.nanoTime(); var col = getColumn(columnId); if (requestType instanceof RequestType.RequestGetAllInRange) { @@ -1244,33 +1312,55 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable } catch (Throwable ex) { ro.close(); throw ex; + } finally { + totalTime.add(System.nanoTime() - initializationStartTime); } }, res -> Flux.generate(() -> { + var seekStartTime = System.nanoTime(); + try { + if (!reverse) { + res.it.seekToFirst(); + } else { + res.it.seekToLast(); + } + return res.it; + } finally { + totalTime.add(System.nanoTime() - seekStartTime); + } + }, (it, sink) -> { + var nextTime = System.nanoTime(); + try { + if (!it.isValid()) { + sink.complete(); + } else { + var calculatedKey = toMemorySegment(arena, it.key()); + var calculatedValue = res.col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL; if (!reverse) { - res.it.seekToFirst(); + res.it.next(); } else { - res.it.seekToLast(); + res.it.prev(); } - return res.it; - }, (it, sink) -> { - if (!it.isValid()) { - sink.complete(); - } else { - var calculatedKey = toMemorySegment(arena, it.key()); - var calculatedValue = res.col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL; - if (!reverse) { - res.it.next(); - } else { - res.it.prev(); - } - //noinspection unchecked - sink.next((T) decodeKVNoBuckets(arena, res.col, calculatedKey, calculatedValue)); - } - return it; - }), Resources::close) - .subscribeOn(scheduler.read()) - .doFirst(ops::beginOp) - .doFinally(_ -> ops.endOp()); + //noinspection unchecked + sink.next((T) decodeKVNoBuckets(arena, res.col, calculatedKey, calculatedValue)); + } + return it; + } finally { + totalTime.add(System.nanoTime() - nextTime); + } + }), resources -> { + var closeTime = System.nanoTime(); + try { + resources.close(); + } finally { + totalTime.add(System.nanoTime() - closeTime); + } + }) + .subscribeOn(scheduler.read()) + .doFirst(ops::beginOp) + .doFinally(_ -> { + ops.endOp(); + getRangeTimer.record(totalTime.sum(), TimeUnit.NANOSECONDS); + }); } private MemorySegment dbGet(Tx tx, diff --git a/src/main/java/it/cavallium/rockserver/core/impl/MetricsManager.java b/src/main/java/it/cavallium/rockserver/core/impl/MetricsManager.java new file mode 100644 index 0000000..e1ad574 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/MetricsManager.java @@ -0,0 +1,222 @@ +package it.cavallium.rockserver.core.impl; + +import io.micrometer.core.instrument.Clock; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.Timer.Builder; +import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmCompilationMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmHeapPressureMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmInfoMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics; +import io.micrometer.core.instrument.binder.system.FileDescriptorMetrics; +import io.micrometer.core.instrument.binder.system.ProcessorMetrics; +import io.micrometer.core.instrument.binder.system.UptimeMetrics; +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import io.micrometer.core.instrument.util.NamedThreadFactory; +import io.micrometer.core.ipc.http.HttpUrlConnectionSender; +import io.micrometer.influx.InfluxConfig; +import io.micrometer.influx.InfluxMeterRegistry; +import io.micrometer.jmx.JmxConfig; +import io.micrometer.jmx.JmxMeterRegistry; +import io.vertx.core.MultiMap; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpVersion; +import io.vertx.core.http.RequestOptions; +import io.vertx.rxjava3.core.Vertx; +import io.vertx.rxjava3.core.buffer.Buffer; +import io.vertx.rxjava3.core.http.HttpClient; +import it.cavallium.rockserver.core.common.RocksDBException; +import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; +import it.cavallium.rockserver.core.config.DatabaseConfig; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Map.Entry; +import org.github.gestalt.config.exceptions.GestaltException; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsManager implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(MetricsManager.class); + + private final JvmGcMetrics gcMetrics; + private final JvmHeapPressureMetrics heapPressureMetrics; + private final CompositeMeterRegistry compositeRegistry; + private final long startTime; + private HttpClient httpClient; + + public MetricsManager(DatabaseConfig config) { + try { + this.startTime = System.currentTimeMillis(); + compositeRegistry = new CompositeMeterRegistry(); + if (config.metrics().jmx().enabled()) { + try { + JmxMeterRegistry jmxMeterRegistry = new JmxMeterRegistry(new JmxConfig() { + + @Override + public @NotNull String prefix() { + return "rocksdb-jmx"; + } + + @Override + public @NotNull String domain() { + return "metrics"; + } + + @Override + public String get(@NotNull String s) { + return null; + } + }, Clock.SYSTEM); + compositeRegistry.add(jmxMeterRegistry); + } catch (Throwable ex) { + LOG.error("Failed to initialize jmx metrics"); + } + } + if (config.metrics().influx().enabled()) { + try { + this.httpClient = Vertx.vertx().createHttpClient(new HttpClientOptions() + .setTrustAll(config.metrics().influx().allowInsecureCertificates()) + .setVerifyHost(!config.metrics().influx().allowInsecureCertificates()) + .setTryUseCompression(true) + .setProtocolVersion(HttpVersion.HTTP_2) + .setUseAlpn(true) + .setConnectTimeout(1000) + .setReadIdleTimeout(10000)); + var influxUrl = config.metrics().influx().url(); + var bucket = config.metrics().influx().bucket(); + var userName = config.metrics().influx().user(); + var token = config.metrics().influx().token(); + var org = config.metrics().influx().org(); + var step = Duration.ofMinutes(1); + InfluxMeterRegistry influxMeterRegistry = InfluxMeterRegistry + .builder(new InfluxConfig() { + + @Override + public String uri() { + return influxUrl; + } + + @Override + public String bucket() { + return bucket; + } + + @Override + public String userName() { + return userName; + } + + @Override + public String token() { + return token; + } + + @Override + public String org() { + return org; + } + + @Override + public Duration step() { + return step; + } + + @Override + public String get(@NotNull String s) { + return null; + } + }) + .clock(Clock.SYSTEM) + .httpClient(new HttpUrlConnectionSender() { + + @Override + public Response send(Request request) { + if (httpClient == null) { + return new Response(400, "httpClient is null"); + } + Method method = request.getMethod(); + var requestOptions = new RequestOptions(); + requestOptions.setMethod(HttpMethod.valueOf(method.name())); + requestOptions.setAbsoluteURI(request.getUrl()); + requestOptions.setTimeout(10000); + MultiMap headers = MultiMap.caseInsensitiveMultiMap(); + for (Entry header : request.getRequestHeaders().entrySet()) { + headers.add(header.getKey(), header.getValue()); + } + requestOptions.setHeaders(headers); + return httpClient + .rxRequest(requestOptions).flatMap(req -> { + if (method != Method.GET) { + return req.rxSend(Buffer.buffer(request.getEntity())); + } else { + return req.rxSend(); + } + }) + .flatMap(response -> { + int status = response.statusCode(); + + return response.rxBody() + .map(body -> new Response(status, body.toString(StandardCharsets.UTF_8))) + .onErrorReturn(ex -> new Response(status, ex.toString())); + }) + .onErrorReturn(ex -> new Response(400, ex.toString())) + .blockingGet(); + } + }) + .threadFactory(new NamedThreadFactory("influx-metrics-publisher")) + .build(); + compositeRegistry.add(influxMeterRegistry); + } catch (Throwable ex) { + LOG.error("Failed to initialize influx metrics"); + } + } else { + this.httpClient = null; + } + + new JvmCompilationMetrics().bindTo(compositeRegistry); + new JvmMemoryMetrics().bindTo(compositeRegistry); + new JvmInfoMetrics().bindTo(compositeRegistry); + new ProcessorMetrics().bindTo(compositeRegistry); + new ClassLoaderMetrics().bindTo(compositeRegistry); + new FileDescriptorMetrics().bindTo(compositeRegistry); + new UptimeMetrics().bindTo(compositeRegistry); + this.gcMetrics = new JvmGcMetrics(); + gcMetrics.bindTo(compositeRegistry); + new JvmThreadMetrics().bindTo(compositeRegistry); + this.heapPressureMetrics = new JvmHeapPressureMetrics(); + heapPressureMetrics.bindTo(compositeRegistry); + + compositeRegistry.gauge("yotsuba.uptime.millis", + this, + statsManager -> System.currentTimeMillis() - statsManager.getStartTime() + ); + } catch (GestaltException e) { + throw RocksDBException.of(RocksDBErrorType.CONFIG_ERROR, "Failed to parse metrics configuration", e); + } + } + + private long getStartTime() { + return startTime; + } + + @Override + public void close() { + if (httpClient != null) { + httpClient.rxClose().blockingAwait(); + } + gcMetrics.close(); + heapPressureMetrics.close(); + compositeRegistry.close(); + } + + public MeterRegistry getRegistry() { + return compositeRegistry; + } +}