Get entries count

This commit is contained in:
Andrea Cavalli 2024-10-23 16:25:02 +02:00
parent 9725686ad6
commit d4ae772d80
4 changed files with 65 additions and 23 deletions

View File

@ -172,7 +172,7 @@
<dependency> <dependency>
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId> <artifactId>reactor-core</artifactId>
<version>3.6.4</version> <version>3.6.8</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.salesforce.servicelibs</groupId> <groupId>com.salesforce.servicelibs</groupId>

View File

@ -19,7 +19,8 @@ public sealed interface RequestType<METHOD_DATA_TYPE, RESULT_TYPE> {
CHANGED(new RequestChanged()), CHANGED(new RequestChanged()),
PREVIOUS_PRESENCE(new RequestPreviousPresence()), PREVIOUS_PRESENCE(new RequestPreviousPresence()),
FIRST_AND_LAST(new RequestGetFirstAndLast()), FIRST_AND_LAST(new RequestGetFirstAndLast()),
ALL_IN_RANGE(new RequestGetAllInRange()); ALL_IN_RANGE(new RequestGetAllInRange()),
ENTRIES_COUNT(new RequestEntriesCount());
private final RequestType requestType; private final RequestType requestType;
@ -99,6 +100,11 @@ public sealed interface RequestType<METHOD_DATA_TYPE, RESULT_TYPE> {
return (RequestGetFirstAndLast<T>) RequestGetFirstAndLast.INSTANCE; return (RequestGetFirstAndLast<T>) RequestGetFirstAndLast.INSTANCE;
} }
@SuppressWarnings("unchecked")
static <T> RequestEntriesCount<T> entriesCount() {
return (RequestEntriesCount<T>) RequestEntriesCount.INSTANCE;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
static <T> RequestGetAllInRange<T> allInRange() { static <T> RequestGetAllInRange<T> allInRange() {
return (RequestGetAllInRange<T>) RequestGetAllInRange.INSTANCE; return (RequestGetAllInRange<T>) RequestGetAllInRange.INSTANCE;
@ -222,6 +228,16 @@ public sealed interface RequestType<METHOD_DATA_TYPE, RESULT_TYPE> {
} }
} }
record RequestEntriesCount<T>() implements RequestReduceRange<T, Long> {
private static final RequestEntriesCount<Object> INSTANCE = new RequestEntriesCount<>();
@Override
public RequestTypeId getRequestTypeId() {
return RequestTypeId.ENTRIES_COUNT;
}
}
record RequestGetAllInRange<T>() implements RequestGetRange<T, T> { record RequestGetAllInRange<T>() implements RequestGetRange<T, T> {
private static final RequestGetAllInRange<Object> INSTANCE = new RequestGetAllInRange<>(); private static final RequestGetAllInRange<Object> INSTANCE = new RequestGetAllInRange<>();

View File

@ -33,7 +33,8 @@ public class RocksDBException extends RuntimeException {
SST_WRITE_4, SST_WRITE_4,
SST_GET_SIZE_FAILED, SST_GET_SIZE_FAILED,
UNSUPPORTED_COLUMN_TYPE, UNSUPPORTED_COLUMN_TYPE,
NOT_IMPLEMENTED NOT_IMPLEMENTED,
GET_PROPERTY_ERROR
} }
public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) { public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) {

View File

@ -6,8 +6,10 @@ import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue;
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue; import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue;
import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.RequestType.RequestEntriesCount;
import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut; import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.RocksDBException;
import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType;
import it.cavallium.rockserver.core.config.*; import it.cavallium.rockserver.core.config.*;
import it.cavallium.rockserver.core.impl.rocksdb.*; import it.cavallium.rockserver.core.impl.rocksdb.*;
@ -42,7 +44,6 @@ import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.rocksdb.*; import org.rocksdb.*;
import org.rocksdb.RocksDBException;
import org.rocksdb.Status.Code; import org.rocksdb.Status.Code;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -104,7 +105,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
var columnSchemasColumnDescriptor = new ColumnFamilyDescriptor(COLUMN_SCHEMAS_COLUMN); var columnSchemasColumnDescriptor = new ColumnFamilyDescriptor(COLUMN_SCHEMAS_COLUMN);
try { try {
columnSchemasColumnDescriptorHandle = db.get().createColumnFamily(columnSchemasColumnDescriptor); columnSchemasColumnDescriptorHandle = db.get().createColumnFamily(columnSchemasColumnDescriptor);
} catch (RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
throw new IOException("Cannot create system column", e); throw new IOException("Cannot create system column", e);
} }
} else { } else {
@ -186,7 +187,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
} }
logger.info("Registered column: " + column); logger.info("Registered column: " + column);
return id; return id;
} catch (RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@ -201,7 +202,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
String name; String name;
try { try {
name = new String(col.cfh().getName(), StandardCharsets.UTF_8); name = new String(col.cfh().getName(), StandardCharsets.UTF_8);
} catch (RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// Unregister the column name from the index avoiding race conditions // Unregister the column name from the index avoiding race conditions
@ -282,7 +283,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
} else { } else {
// Transaction not found // Transaction not found
if (commit) { if (commit) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.TX_NOT_FOUND, "Transaction not found: " + transactionId); throw RocksDBException.of(RocksDBErrorType.TX_NOT_FOUND, "Transaction not found: " + transactionId);
} else { } else {
return true; return true;
} }
@ -308,7 +309,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
// Close the transaction operation // Close the transaction operation
ops.endOp(); ops.endOp();
return true; return true;
} catch (RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
// Close the transaction operation // Close the transaction operation
ops.endOp(); ops.endOp();
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COMMIT_FAILED, "Transaction close failed"); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COMMIT_FAILED, "Transaction close failed");
@ -323,11 +324,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
this.closeTransaction(updateId, false); this.closeTransaction(updateId, false);
} }
private boolean commitTxOptimistically(@NotNull Tx tx) throws RocksDBException { private boolean commitTxOptimistically(@NotNull Tx tx) throws org.rocksdb.RocksDBException {
try { try {
tx.val().commit(); tx.val().commit();
return true; return true;
} catch (RocksDBException ex) { } catch (org.rocksdb.RocksDBException ex) {
var status = ex.getStatus() != null ? ex.getStatus().getCode() : Code.Ok; var status = ex.getStatus() != null ? ex.getStatus().getCode() : Code.Ok;
if (status == Code.Busy || status == Code.TryAgain) { if (status == Code.Busy || status == Code.TryAgain) {
return false; return false;
@ -365,7 +366,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
byte[] value = encodeColumnSchema(schema); byte[] value = encodeColumnSchema(schema);
db.get().put(columnSchemasColumnDescriptorHandle, key, value); db.get().put(columnSchemasColumnDescriptorHandle, key, value);
return registerColumn(new ColumnInstance(cf, schema)); return registerColumn(new ColumnInstance(cf, schema));
} catch (RocksDBException | GestaltException e) { } catch (org.rocksdb.RocksDBException | GestaltException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_CREATE_FAIL, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_CREATE_FAIL, e);
} }
} }
@ -384,7 +385,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
try { try {
db.get().dropColumnFamily(col.cfh()); db.get().dropColumnFamily(col.cfh());
unregisterColumn(columnId).close(); unregisterColumn(columnId).close();
} catch (RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_DELETE_FAIL, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_DELETE_FAIL, e);
} }
} }
@ -606,7 +607,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
return SSTWriter.open(tempSSTsPath, db, col, columnConifg, forceNoOptions, ingestBehind, refs); return SSTWriter.open(tempSSTsPath, db, col, columnConifg, forceNoOptions, ingestBehind, refs);
} catch (IOException ex) { } catch (IOException ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_2, ex); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_2, ex);
} catch (RocksDBException ex) { } catch (org.rocksdb.RocksDBException ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_3, ex); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_3, ex);
} }
} }
@ -710,7 +711,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
var k = Utils.toByteArray(calculatedKey); var k = Utils.toByteArray(calculatedKey);
var v = Utils.toByteArray(bucket.toSegment(arena)); var v = Utils.toByteArray(bucket.toSegment(arena));
((Tx) dbWriter).val().put(col.cfh(), k, v); ((Tx) dbWriter).val().put(col.cfh(), k, v);
} catch (RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_1, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_1, e);
} }
} else { } else {
@ -720,7 +721,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
byte[] previousValueByteArray; byte[] previousValueByteArray;
previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
previousValue = transformResultValue(col, toMemorySegment(arena, previousValueByteArray)); previousValue = transformResultValue(col, toMemorySegment(arena, previousValueByteArray));
} catch (RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e);
} }
} else if (RequestType.requiresGettingPreviousPresence(callback)) { } else if (RequestType.requiresGettingPreviousPresence(callback)) {
@ -730,7 +731,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
byte[] previousValueByteArray; byte[] previousValueByteArray;
previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
previousValue = previousValueByteArray != null ? MemorySegment.NULL : null; previousValue = previousValueByteArray != null ? MemorySegment.NULL : null;
} catch (RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e);
} }
} else { } else {
@ -845,7 +846,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
foundValue = null; foundValue = null;
} }
existsValue = foundValue != null; existsValue = foundValue != null;
} catch (RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.GET_1, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.GET_1, e);
} }
} else { } else {
@ -855,7 +856,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
try (var readOptions = new ReadOptions()) { try (var readOptions = new ReadOptions()) {
foundValue = dbGet(tx, col, arena, readOptions, calculatedKey); foundValue = dbGet(tx, col, arena, readOptions, calculatedKey);
existsValue = foundValue != null; existsValue = foundValue != null;
} catch (RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e);
} }
} else if (callback instanceof RequestType.RequestExists<?>) { } else if (callback instanceof RequestType.RequestExists<?>) {
@ -965,10 +966,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
try { try {
var col = getColumn(columnId); var col = getColumn(columnId);
if (requestType instanceof RequestType.RequestGetFirstAndLast<?>) { if (requestType instanceof RequestType.RequestGetFirstAndLast<?>
|| requestType instanceof RequestType.RequestEntriesCount<?>) {
if (col.hasBuckets()) { if (col.hasBuckets()) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.UNSUPPORTED_COLUMN_TYPE, throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.UNSUPPORTED_COLUMN_TYPE,
"Can't get the first and last range element of a column with buckets"); "Can't execute this request type on a column with buckets");
} }
} }
@ -993,6 +995,29 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
} }
try (it) { try (it) {
return (T) switch (requestType) { return (T) switch (requestType) {
case RequestEntriesCount<?> _ -> {
long count = 0;
it.seekToFirst();
if (calculatedStartKey != null || calculatedEndKey != null) {
while (it.isValid()) {
count++;
it.next();
}
yield count;
} else {
Map<String, TableProperties> props = null;
try {
props = db.get().getPropertiesOfAllTables(col.cfh());
} catch (org.rocksdb.RocksDBException e) {
throw RocksDBException.of(RocksDBErrorType.GET_PROPERTY_ERROR, e);
}
long entries = 0;
for (TableProperties tableProperties : props.values()) {
entries += tableProperties.getNumEntries();
}
yield entries;
}
}
case RequestType.RequestGetFirstAndLast<?> _ -> { case RequestType.RequestGetFirstAndLast<?> _ -> {
if (!reverse) { if (!reverse) {
it.seekToFirst(); it.seekToFirst();
@ -1128,7 +1153,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
ColumnInstance col, ColumnInstance col,
Arena arena, Arena arena,
ReadOptions readOptions, ReadOptions readOptions,
MemorySegment calculatedKey) throws RocksDBException { MemorySegment calculatedKey) throws org.rocksdb.RocksDBException {
if (tx != null) { if (tx != null) {
byte[] previousRawBucketByteArray; byte[] previousRawBucketByteArray;
if (tx.isFromGetForUpdate()) { if (tx.isFromGetForUpdate()) {
@ -1150,7 +1175,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
@Nullable @Nullable
private MemorySegment dbGetDirect(Arena arena, ColumnFamilyHandle cfh, ReadOptions readOptions, MemorySegment calculatedKey) private MemorySegment dbGetDirect(Arena arena, ColumnFamilyHandle cfh, ReadOptions readOptions, MemorySegment calculatedKey)
throws RocksDBException { throws org.rocksdb.RocksDBException {
// Get the key nio buffer to pass to RocksDB // Get the key nio buffer to pass to RocksDB
ByteBuffer keyNioBuffer = calculatedKey.asByteBuffer(); ByteBuffer keyNioBuffer = calculatedKey.asByteBuffer();