Add metrics
This commit is contained in:
parent
f419f92662
commit
d4488a5042
@ -30,7 +30,6 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSi
|
|||||||
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange;
|
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange;
|
||||||
import it.cavallium.rockserver.core.common.RocksDBException;
|
import it.cavallium.rockserver.core.common.RocksDBException;
|
||||||
import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType;
|
import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType;
|
||||||
import it.cavallium.rockserver.core.common.api.proto.OpenTransactionRequest;
|
|
||||||
import it.cavallium.rockserver.core.config.*;
|
import it.cavallium.rockserver.core.config.*;
|
||||||
import it.cavallium.rockserver.core.impl.rocksdb.*;
|
import it.cavallium.rockserver.core.impl.rocksdb.*;
|
||||||
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions;
|
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions;
|
||||||
@ -56,6 +55,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
|
import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
|
||||||
@ -105,7 +105,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
private final Timer deleteColumnTimer;
|
private final Timer deleteColumnTimer;
|
||||||
private final Timer getColumnIdTimer;
|
private final Timer getColumnIdTimer;
|
||||||
private final Timer putTimer;
|
private final Timer putTimer;
|
||||||
private final Timer putMulti;
|
private final Timer putMultiTimer;
|
||||||
private final Timer putBatchTimer;
|
private final Timer putBatchTimer;
|
||||||
private final Timer getTimer;
|
private final Timer getTimer;
|
||||||
private final Timer openIteratorTimer;
|
private final Timer openIteratorTimer;
|
||||||
@ -136,7 +136,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
this.deleteColumnTimer = createActionTimer(DeleteColumn.class);
|
this.deleteColumnTimer = createActionTimer(DeleteColumn.class);
|
||||||
this.getColumnIdTimer = createActionTimer(GetColumnId.class);
|
this.getColumnIdTimer = createActionTimer(GetColumnId.class);
|
||||||
this.putTimer = createActionTimer(Put.class);
|
this.putTimer = createActionTimer(Put.class);
|
||||||
this.putMulti = createActionTimer(PutMulti.class);
|
this.putMultiTimer = createActionTimer(PutMulti.class);
|
||||||
this.putBatchTimer = createActionTimer(PutBatch.class);
|
this.putBatchTimer = createActionTimer(PutBatch.class);
|
||||||
this.getTimer = createActionTimer(Get.class);
|
this.getTimer = createActionTimer(Get.class);
|
||||||
this.openIteratorTimer = createActionTimer(OpenIterator.class);
|
this.openIteratorTimer = createActionTimer(OpenIterator.class);
|
||||||
@ -361,7 +361,13 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long openTransaction(long timeoutMs) {
|
public long openTransaction(long timeoutMs) {
|
||||||
return allocateTransactionInternal(openTransactionInternal(timeoutMs, false));
|
var start = System.nanoTime();
|
||||||
|
try {
|
||||||
|
return allocateTransactionInternal(openTransactionInternal(timeoutMs, false));
|
||||||
|
} finally {
|
||||||
|
var end = System.nanoTime();
|
||||||
|
openTransactionTimer.record(end - start, TimeUnit.NANOSECONDS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private long allocateTransactionInternal(Tx tx) {
|
private long allocateTransactionInternal(Tx tx) {
|
||||||
@ -383,10 +389,20 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean closeTransaction(long transactionId, boolean commit) {
|
public boolean closeTransaction(long transactionId, boolean commit) {
|
||||||
|
var start = System.nanoTime();
|
||||||
|
try {
|
||||||
|
return closeTransactionInternal(transactionId, commit);
|
||||||
|
} finally {
|
||||||
|
var end = System.nanoTime();
|
||||||
|
closeTransactionTimer.record(end - start, TimeUnit.NANOSECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean closeTransactionInternal(long transactionId, boolean commit) {
|
||||||
var tx = txs.get(transactionId);
|
var tx = txs.get(transactionId);
|
||||||
if (tx != null) {
|
if (tx != null) {
|
||||||
try {
|
try {
|
||||||
var committed = closeTransaction(tx, commit);
|
var committed = closeTransactionInternal(tx, commit);
|
||||||
if (committed) {
|
if (committed) {
|
||||||
txs.remove(transactionId, tx);
|
txs.remove(transactionId, tx);
|
||||||
}
|
}
|
||||||
@ -405,7 +421,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean closeTransaction(@NotNull Tx tx, boolean commit) {
|
private boolean closeTransactionInternal(@NotNull Tx tx, boolean commit) {
|
||||||
ops.beginOp();
|
ops.beginOp();
|
||||||
try {
|
try {
|
||||||
// Transaction found
|
// Transaction found
|
||||||
@ -436,7 +452,13 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeFailedUpdate(long updateId) throws it.cavallium.rockserver.core.common.RocksDBException {
|
public void closeFailedUpdate(long updateId) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||||
this.closeTransaction(updateId, false);
|
var start = System.nanoTime();
|
||||||
|
try {
|
||||||
|
closeTransactionInternal(updateId, false);
|
||||||
|
} finally {
|
||||||
|
var end = System.nanoTime();
|
||||||
|
closeFailedUpdateTimer.record(end - start, TimeUnit.NANOSECONDS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean commitTxOptimistically(@NotNull Tx tx) throws org.rocksdb.RocksDBException {
|
private boolean commitTxOptimistically(@NotNull Tx tx) throws org.rocksdb.RocksDBException {
|
||||||
@ -454,6 +476,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long createColumn(String name, @NotNull ColumnSchema schema) throws it.cavallium.rockserver.core.common.RocksDBException {
|
public long createColumn(String name, @NotNull ColumnSchema schema) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||||
|
var start = System.nanoTime();
|
||||||
ops.beginOp();
|
ops.beginOp();
|
||||||
try {
|
try {
|
||||||
synchronized (columnEditLock) {
|
synchronized (columnEditLock) {
|
||||||
@ -488,11 +511,14 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
ops.endOp();
|
ops.endOp();
|
||||||
|
var end = System.nanoTime();
|
||||||
|
createColumnTimer.record(end - start, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteColumn(long columnId) throws it.cavallium.rockserver.core.common.RocksDBException {
|
public void deleteColumn(long columnId) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||||
|
var start = System.nanoTime();
|
||||||
ops.beginOp();
|
ops.beginOp();
|
||||||
try {
|
try {
|
||||||
synchronized (columnEditLock) {
|
synchronized (columnEditLock) {
|
||||||
@ -506,17 +532,25 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
ops.endOp();
|
ops.endOp();
|
||||||
|
var end = System.nanoTime();
|
||||||
|
deleteColumnTimer.record(end - start, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getColumnId(@NotNull String name) {
|
public long getColumnId(@NotNull String name) {
|
||||||
var columnId = getColumnIdOrNull(name);
|
var start = System.nanoTime();
|
||||||
if (columnId == null) {
|
try {
|
||||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_NOT_FOUND,
|
var columnId = getColumnIdOrNull(name);
|
||||||
"Column not found: " + name);
|
if (columnId == null) {
|
||||||
} else {
|
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_NOT_FOUND,
|
||||||
return columnId;
|
"Column not found: " + name);
|
||||||
|
} else {
|
||||||
|
return columnId;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
var end = System.nanoTime();
|
||||||
|
getColumnIdTimer.record(end - start, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -568,17 +602,23 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
@NotNull List<Keys> keys,
|
@NotNull List<Keys> keys,
|
||||||
@NotNull List<@NotNull MemorySegment> values,
|
@NotNull List<@NotNull MemorySegment> values,
|
||||||
RequestPut<? super MemorySegment, T> requestType) throws it.cavallium.rockserver.core.common.RocksDBException {
|
RequestPut<? super MemorySegment, T> requestType) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||||
if (keys.size() != values.size()) {
|
var start = System.nanoTime();
|
||||||
throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size());
|
try {
|
||||||
}
|
if (keys.size() != values.size()) {
|
||||||
List<T> responses = requestType instanceof RequestType.RequestNothing<?> ? null : new ArrayList<>(keys.size());
|
throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size());
|
||||||
for (int i = 0; i < keys.size(); i++) {
|
|
||||||
var result = put(arena, transactionOrUpdateId, columnId, keys.get(i), values.get(i), requestType);
|
|
||||||
if (responses != null) {
|
|
||||||
responses.add(result);
|
|
||||||
}
|
}
|
||||||
|
List<T> responses = requestType instanceof RequestType.RequestNothing<?> ? null : new ArrayList<>(keys.size());
|
||||||
|
for (int i = 0; i < keys.size(); i++) {
|
||||||
|
var result = put(arena, transactionOrUpdateId, columnId, keys.get(i), values.get(i), requestType);
|
||||||
|
if (responses != null) {
|
||||||
|
responses.add(result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return responses != null ? responses : List.of();
|
||||||
|
} finally {
|
||||||
|
var end = System.nanoTime();
|
||||||
|
putMultiTimer.record(end - start, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
return responses != null ? responses : List.of();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompletableFuture<Void> putBatchInternal(long columnId,
|
public CompletableFuture<Void> putBatchInternal(long columnId,
|
||||||
@ -722,24 +762,28 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
if (Files.notExists(tempSSTsPath)) {
|
if (Files.notExists(tempSSTsPath)) {
|
||||||
Files.createDirectories(tempSSTsPath);
|
Files.createDirectories(tempSSTsPath);
|
||||||
}
|
}
|
||||||
return SSTWriter.open(tempSSTsPath, db, col, columnConifg, forceNoOptions, ingestBehind, refs);
|
return SSTWriter.open(tempSSTsPath, db, col, columnConifg, forceNoOptions, ingestBehind, refs);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_2, ex);
|
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_2, ex);
|
||||||
} catch (org.rocksdb.RocksDBException ex) {
|
} catch (org.rocksdb.RocksDBException ex) {
|
||||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_3, ex);
|
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_3, ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void putBatch(long columnId,
|
public void putBatch(long columnId,
|
||||||
@NotNull Publisher<@NotNull KVBatch> batchPublisher,
|
@NotNull Publisher<@NotNull KVBatch> batchPublisher,
|
||||||
@NotNull PutBatchMode mode) throws it.cavallium.rockserver.core.common.RocksDBException {
|
@NotNull PutBatchMode mode) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||||
|
var start = System.nanoTime();
|
||||||
try {
|
try {
|
||||||
putBatchInternal(columnId, batchPublisher, mode).get();
|
putBatchInternal(columnId, batchPublisher, mode).get();
|
||||||
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
|
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
|
||||||
throw ex;
|
throw ex;
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
|
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
|
||||||
|
} finally {
|
||||||
|
var end = System.nanoTime();
|
||||||
|
putBatchTimer.record(end - start, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -770,13 +814,13 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
do {
|
do {
|
||||||
//noinspection unchecked
|
//noinspection unchecked
|
||||||
result = txConsumer.apply((T) newTx);
|
result = txConsumer.apply((T) newTx);
|
||||||
committed = this.closeTransaction(newTx, true);
|
committed = this.closeTransactionInternal(newTx, true);
|
||||||
if (!committed) {
|
if (!committed) {
|
||||||
Thread.yield();
|
Thread.yield();
|
||||||
}
|
}
|
||||||
} while (!committed);
|
} while (!committed);
|
||||||
} finally {
|
} finally {
|
||||||
this.closeTransaction(newTx, false);
|
this.closeTransactionInternal(newTx, false);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
result = txConsumer.apply(tx);
|
result = txConsumer.apply(tx);
|
||||||
@ -1020,6 +1064,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
@Nullable Keys endKeysExclusive,
|
@Nullable Keys endKeysExclusive,
|
||||||
boolean reverse,
|
boolean reverse,
|
||||||
long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
|
long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||||
|
var start = System.nanoTime();
|
||||||
// Open an operation that ends when the iterator is closed
|
// Open an operation that ends when the iterator is closed
|
||||||
ops.beginOp();
|
ops.beginOp();
|
||||||
try {
|
try {
|
||||||
@ -1036,29 +1081,37 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
return FastRandomUtils.allocateNewValue(its, itEntry, 1, Long.MAX_VALUE);
|
return FastRandomUtils.allocateNewValue(its, itEntry, 1, Long.MAX_VALUE);
|
||||||
} catch (Throwable ex) {
|
} catch (Throwable ex) {
|
||||||
ops.endOp();
|
ops.endOp();
|
||||||
|
var end = System.nanoTime();
|
||||||
|
openIteratorTimer.record(end - start, TimeUnit.NANOSECONDS);
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeIterator(long iteratorId) throws it.cavallium.rockserver.core.common.RocksDBException {
|
public void closeIterator(long iteratorId) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||||
|
var start = System.nanoTime();
|
||||||
ops.beginOp();
|
ops.beginOp();
|
||||||
try {
|
try {
|
||||||
// Should close the iterator operation
|
// Should close the iterator operation
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
} finally {
|
} finally {
|
||||||
ops.endOp();
|
ops.endOp();
|
||||||
|
var end = System.nanoTime();
|
||||||
|
closeIteratorTimer.record(end - start, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void seekTo(Arena arena, long iterationId, @NotNull Keys keys)
|
public void seekTo(Arena arena, long iterationId, @NotNull Keys keys)
|
||||||
throws it.cavallium.rockserver.core.common.RocksDBException {
|
throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||||
|
var start = System.nanoTime();
|
||||||
ops.beginOp();
|
ops.beginOp();
|
||||||
try {
|
try {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
} finally {
|
} finally {
|
||||||
ops.endOp();
|
ops.endOp();
|
||||||
|
var end = System.nanoTime();
|
||||||
|
seekToTimer.record(end - start, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1068,11 +1121,14 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
long skipCount,
|
long skipCount,
|
||||||
long takeCount,
|
long takeCount,
|
||||||
@NotNull RequestType.RequestIterate<? super MemorySegment, T> requestType) throws it.cavallium.rockserver.core.common.RocksDBException {
|
@NotNull RequestType.RequestIterate<? super MemorySegment, T> requestType) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||||
|
var start = System.nanoTime();
|
||||||
ops.beginOp();
|
ops.beginOp();
|
||||||
try {
|
try {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
} finally {
|
} finally {
|
||||||
ops.endOp();
|
ops.endOp();
|
||||||
|
var end = System.nanoTime();
|
||||||
|
subsequentTimer.record(end - start, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1086,6 +1142,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
boolean reverse,
|
boolean reverse,
|
||||||
RequestType.@NotNull RequestReduceRange<? super KV, T> requestType,
|
RequestType.@NotNull RequestReduceRange<? super KV, T> requestType,
|
||||||
long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
|
long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||||
|
var start = System.nanoTime();
|
||||||
ops.beginOp();
|
ops.beginOp();
|
||||||
try {
|
try {
|
||||||
var col = getColumn(columnId);
|
var col = getColumn(columnId);
|
||||||
@ -1172,11 +1229,20 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
ops.endOp();
|
ops.endOp();
|
||||||
|
var end = System.nanoTime();
|
||||||
|
reduceRangeTimer.record(end - start, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> Stream<T> getRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange<? super KV, T> requestType, long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
|
public <T> Stream<T> getRange(Arena arena,
|
||||||
|
long transactionId,
|
||||||
|
long columnId,
|
||||||
|
@Nullable Keys startKeysInclusive,
|
||||||
|
@Nullable Keys endKeysExclusive,
|
||||||
|
boolean reverse,
|
||||||
|
@NotNull RequestType.RequestGetRange<? super KV, T> requestType,
|
||||||
|
long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||||
return Flux
|
return Flux
|
||||||
.from(this.getRangeAsyncInternal(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs))
|
.from(this.getRangeAsyncInternal(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs))
|
||||||
.toStream();
|
.toStream();
|
||||||
@ -1191,6 +1257,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
boolean reverse,
|
boolean reverse,
|
||||||
RequestType.RequestGetRange<? super KV, T> requestType,
|
RequestType.RequestGetRange<? super KV, T> requestType,
|
||||||
long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
|
long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||||
|
LongAdder totalTime = new LongAdder();
|
||||||
record Resources(ColumnInstance col, ReadOptions ro, AbstractSlice<?> startKeySlice,
|
record Resources(ColumnInstance col, ReadOptions ro, AbstractSlice<?> startKeySlice,
|
||||||
AbstractSlice<?> endKeySlice, RocksIterator it) {
|
AbstractSlice<?> endKeySlice, RocksIterator it) {
|
||||||
public void close() {
|
public void close() {
|
||||||
@ -1201,6 +1268,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Flux.using(() -> {
|
return Flux.using(() -> {
|
||||||
|
var initializationStartTime = System.nanoTime();
|
||||||
var col = getColumn(columnId);
|
var col = getColumn(columnId);
|
||||||
|
|
||||||
if (requestType instanceof RequestType.RequestGetAllInRange<?>) {
|
if (requestType instanceof RequestType.RequestGetAllInRange<?>) {
|
||||||
@ -1244,33 +1312,55 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
|||||||
} catch (Throwable ex) {
|
} catch (Throwable ex) {
|
||||||
ro.close();
|
ro.close();
|
||||||
throw ex;
|
throw ex;
|
||||||
|
} finally {
|
||||||
|
totalTime.add(System.nanoTime() - initializationStartTime);
|
||||||
}
|
}
|
||||||
}, res -> Flux.<T, RocksIterator>generate(() -> {
|
}, res -> Flux.<T, RocksIterator>generate(() -> {
|
||||||
|
var seekStartTime = System.nanoTime();
|
||||||
|
try {
|
||||||
|
if (!reverse) {
|
||||||
|
res.it.seekToFirst();
|
||||||
|
} else {
|
||||||
|
res.it.seekToLast();
|
||||||
|
}
|
||||||
|
return res.it;
|
||||||
|
} finally {
|
||||||
|
totalTime.add(System.nanoTime() - seekStartTime);
|
||||||
|
}
|
||||||
|
}, (it, sink) -> {
|
||||||
|
var nextTime = System.nanoTime();
|
||||||
|
try {
|
||||||
|
if (!it.isValid()) {
|
||||||
|
sink.complete();
|
||||||
|
} else {
|
||||||
|
var calculatedKey = toMemorySegment(arena, it.key());
|
||||||
|
var calculatedValue = res.col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL;
|
||||||
if (!reverse) {
|
if (!reverse) {
|
||||||
res.it.seekToFirst();
|
res.it.next();
|
||||||
} else {
|
} else {
|
||||||
res.it.seekToLast();
|
res.it.prev();
|
||||||
}
|
}
|
||||||
return res.it;
|
//noinspection unchecked
|
||||||
}, (it, sink) -> {
|
sink.next((T) decodeKVNoBuckets(arena, res.col, calculatedKey, calculatedValue));
|
||||||
if (!it.isValid()) {
|
}
|
||||||
sink.complete();
|
return it;
|
||||||
} else {
|
} finally {
|
||||||
var calculatedKey = toMemorySegment(arena, it.key());
|
totalTime.add(System.nanoTime() - nextTime);
|
||||||
var calculatedValue = res.col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL;
|
}
|
||||||
if (!reverse) {
|
}), resources -> {
|
||||||
res.it.next();
|
var closeTime = System.nanoTime();
|
||||||
} else {
|
try {
|
||||||
res.it.prev();
|
resources.close();
|
||||||
}
|
} finally {
|
||||||
//noinspection unchecked
|
totalTime.add(System.nanoTime() - closeTime);
|
||||||
sink.next((T) decodeKVNoBuckets(arena, res.col, calculatedKey, calculatedValue));
|
}
|
||||||
}
|
})
|
||||||
return it;
|
.subscribeOn(scheduler.read())
|
||||||
}), Resources::close)
|
.doFirst(ops::beginOp)
|
||||||
.subscribeOn(scheduler.read())
|
.doFinally(_ -> {
|
||||||
.doFirst(ops::beginOp)
|
ops.endOp();
|
||||||
.doFinally(_ -> ops.endOp());
|
getRangeTimer.record(totalTime.sum(), TimeUnit.NANOSECONDS);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private MemorySegment dbGet(Tx tx,
|
private MemorySegment dbGet(Tx tx,
|
||||||
|
@ -0,0 +1,222 @@
|
|||||||
|
package it.cavallium.rockserver.core.impl;
|
||||||
|
|
||||||
|
import io.micrometer.core.instrument.Clock;
|
||||||
|
import io.micrometer.core.instrument.Meter;
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry;
|
||||||
|
import io.micrometer.core.instrument.Timer;
|
||||||
|
import io.micrometer.core.instrument.Timer.Builder;
|
||||||
|
import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics;
|
||||||
|
import io.micrometer.core.instrument.binder.jvm.JvmCompilationMetrics;
|
||||||
|
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
|
||||||
|
import io.micrometer.core.instrument.binder.jvm.JvmHeapPressureMetrics;
|
||||||
|
import io.micrometer.core.instrument.binder.jvm.JvmInfoMetrics;
|
||||||
|
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
|
||||||
|
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
|
||||||
|
import io.micrometer.core.instrument.binder.system.FileDescriptorMetrics;
|
||||||
|
import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
|
||||||
|
import io.micrometer.core.instrument.binder.system.UptimeMetrics;
|
||||||
|
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
|
||||||
|
import io.micrometer.core.instrument.util.NamedThreadFactory;
|
||||||
|
import io.micrometer.core.ipc.http.HttpUrlConnectionSender;
|
||||||
|
import io.micrometer.influx.InfluxConfig;
|
||||||
|
import io.micrometer.influx.InfluxMeterRegistry;
|
||||||
|
import io.micrometer.jmx.JmxConfig;
|
||||||
|
import io.micrometer.jmx.JmxMeterRegistry;
|
||||||
|
import io.vertx.core.MultiMap;
|
||||||
|
import io.vertx.core.http.HttpClientOptions;
|
||||||
|
import io.vertx.core.http.HttpMethod;
|
||||||
|
import io.vertx.core.http.HttpVersion;
|
||||||
|
import io.vertx.core.http.RequestOptions;
|
||||||
|
import io.vertx.rxjava3.core.Vertx;
|
||||||
|
import io.vertx.rxjava3.core.buffer.Buffer;
|
||||||
|
import io.vertx.rxjava3.core.http.HttpClient;
|
||||||
|
import it.cavallium.rockserver.core.common.RocksDBException;
|
||||||
|
import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType;
|
||||||
|
import it.cavallium.rockserver.core.config.DatabaseConfig;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import org.github.gestalt.config.exceptions.GestaltException;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class MetricsManager implements AutoCloseable {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(MetricsManager.class);
|
||||||
|
|
||||||
|
private final JvmGcMetrics gcMetrics;
|
||||||
|
private final JvmHeapPressureMetrics heapPressureMetrics;
|
||||||
|
private final CompositeMeterRegistry compositeRegistry;
|
||||||
|
private final long startTime;
|
||||||
|
private HttpClient httpClient;
|
||||||
|
|
||||||
|
public MetricsManager(DatabaseConfig config) {
|
||||||
|
try {
|
||||||
|
this.startTime = System.currentTimeMillis();
|
||||||
|
compositeRegistry = new CompositeMeterRegistry();
|
||||||
|
if (config.metrics().jmx().enabled()) {
|
||||||
|
try {
|
||||||
|
JmxMeterRegistry jmxMeterRegistry = new JmxMeterRegistry(new JmxConfig() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public @NotNull String prefix() {
|
||||||
|
return "rocksdb-jmx";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public @NotNull String domain() {
|
||||||
|
return "metrics";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String get(@NotNull String s) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}, Clock.SYSTEM);
|
||||||
|
compositeRegistry.add(jmxMeterRegistry);
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
LOG.error("Failed to initialize jmx metrics");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (config.metrics().influx().enabled()) {
|
||||||
|
try {
|
||||||
|
this.httpClient = Vertx.vertx().createHttpClient(new HttpClientOptions()
|
||||||
|
.setTrustAll(config.metrics().influx().allowInsecureCertificates())
|
||||||
|
.setVerifyHost(!config.metrics().influx().allowInsecureCertificates())
|
||||||
|
.setTryUseCompression(true)
|
||||||
|
.setProtocolVersion(HttpVersion.HTTP_2)
|
||||||
|
.setUseAlpn(true)
|
||||||
|
.setConnectTimeout(1000)
|
||||||
|
.setReadIdleTimeout(10000));
|
||||||
|
var influxUrl = config.metrics().influx().url();
|
||||||
|
var bucket = config.metrics().influx().bucket();
|
||||||
|
var userName = config.metrics().influx().user();
|
||||||
|
var token = config.metrics().influx().token();
|
||||||
|
var org = config.metrics().influx().org();
|
||||||
|
var step = Duration.ofMinutes(1);
|
||||||
|
InfluxMeterRegistry influxMeterRegistry = InfluxMeterRegistry
|
||||||
|
.builder(new InfluxConfig() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String uri() {
|
||||||
|
return influxUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String bucket() {
|
||||||
|
return bucket;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String userName() {
|
||||||
|
return userName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String token() {
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String org() {
|
||||||
|
return org;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Duration step() {
|
||||||
|
return step;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String get(@NotNull String s) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.clock(Clock.SYSTEM)
|
||||||
|
.httpClient(new HttpUrlConnectionSender() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response send(Request request) {
|
||||||
|
if (httpClient == null) {
|
||||||
|
return new Response(400, "httpClient is null");
|
||||||
|
}
|
||||||
|
Method method = request.getMethod();
|
||||||
|
var requestOptions = new RequestOptions();
|
||||||
|
requestOptions.setMethod(HttpMethod.valueOf(method.name()));
|
||||||
|
requestOptions.setAbsoluteURI(request.getUrl());
|
||||||
|
requestOptions.setTimeout(10000);
|
||||||
|
MultiMap headers = MultiMap.caseInsensitiveMultiMap();
|
||||||
|
for (Entry<String, String> header : request.getRequestHeaders().entrySet()) {
|
||||||
|
headers.add(header.getKey(), header.getValue());
|
||||||
|
}
|
||||||
|
requestOptions.setHeaders(headers);
|
||||||
|
return httpClient
|
||||||
|
.rxRequest(requestOptions).flatMap(req -> {
|
||||||
|
if (method != Method.GET) {
|
||||||
|
return req.rxSend(Buffer.buffer(request.getEntity()));
|
||||||
|
} else {
|
||||||
|
return req.rxSend();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.flatMap(response -> {
|
||||||
|
int status = response.statusCode();
|
||||||
|
|
||||||
|
return response.rxBody()
|
||||||
|
.map(body -> new Response(status, body.toString(StandardCharsets.UTF_8)))
|
||||||
|
.onErrorReturn(ex -> new Response(status, ex.toString()));
|
||||||
|
})
|
||||||
|
.onErrorReturn(ex -> new Response(400, ex.toString()))
|
||||||
|
.blockingGet();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.threadFactory(new NamedThreadFactory("influx-metrics-publisher"))
|
||||||
|
.build();
|
||||||
|
compositeRegistry.add(influxMeterRegistry);
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
LOG.error("Failed to initialize influx metrics");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.httpClient = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
new JvmCompilationMetrics().bindTo(compositeRegistry);
|
||||||
|
new JvmMemoryMetrics().bindTo(compositeRegistry);
|
||||||
|
new JvmInfoMetrics().bindTo(compositeRegistry);
|
||||||
|
new ProcessorMetrics().bindTo(compositeRegistry);
|
||||||
|
new ClassLoaderMetrics().bindTo(compositeRegistry);
|
||||||
|
new FileDescriptorMetrics().bindTo(compositeRegistry);
|
||||||
|
new UptimeMetrics().bindTo(compositeRegistry);
|
||||||
|
this.gcMetrics = new JvmGcMetrics();
|
||||||
|
gcMetrics.bindTo(compositeRegistry);
|
||||||
|
new JvmThreadMetrics().bindTo(compositeRegistry);
|
||||||
|
this.heapPressureMetrics = new JvmHeapPressureMetrics();
|
||||||
|
heapPressureMetrics.bindTo(compositeRegistry);
|
||||||
|
|
||||||
|
compositeRegistry.gauge("yotsuba.uptime.millis",
|
||||||
|
this,
|
||||||
|
statsManager -> System.currentTimeMillis() - statsManager.getStartTime()
|
||||||
|
);
|
||||||
|
} catch (GestaltException e) {
|
||||||
|
throw RocksDBException.of(RocksDBErrorType.CONFIG_ERROR, "Failed to parse metrics configuration", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getStartTime() {
|
||||||
|
return startTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
if (httpClient != null) {
|
||||||
|
httpClient.rxClose().blockingAwait();
|
||||||
|
}
|
||||||
|
gcMetrics.close();
|
||||||
|
heapPressureMetrics.close();
|
||||||
|
compositeRegistry.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public MeterRegistry getRegistry() {
|
||||||
|
return compositeRegistry;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user