diff --git a/pom.xml b/pom.xml index 03460ee..899f8a0 100644 --- a/pom.xml +++ b/pom.xml @@ -172,7 +172,7 @@ io.projectreactor reactor-core - 3.6.4 + 3.6.8 com.salesforce.servicelibs diff --git a/src/main/java/it/cavallium/rockserver/core/common/RequestType.java b/src/main/java/it/cavallium/rockserver/core/common/RequestType.java index ddc8073..fdef751 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RequestType.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RequestType.java @@ -19,7 +19,8 @@ public sealed interface RequestType { CHANGED(new RequestChanged()), PREVIOUS_PRESENCE(new RequestPreviousPresence()), FIRST_AND_LAST(new RequestGetFirstAndLast()), - ALL_IN_RANGE(new RequestGetAllInRange()); + ALL_IN_RANGE(new RequestGetAllInRange()), + ENTRIES_COUNT(new RequestEntriesCount()); private final RequestType requestType; @@ -99,6 +100,11 @@ public sealed interface RequestType { return (RequestGetFirstAndLast) RequestGetFirstAndLast.INSTANCE; } + @SuppressWarnings("unchecked") + static RequestEntriesCount entriesCount() { + return (RequestEntriesCount) RequestEntriesCount.INSTANCE; + } + @SuppressWarnings("unchecked") static RequestGetAllInRange allInRange() { return (RequestGetAllInRange) RequestGetAllInRange.INSTANCE; @@ -222,6 +228,16 @@ public sealed interface RequestType { } } + record RequestEntriesCount() implements RequestReduceRange { + + private static final RequestEntriesCount INSTANCE = new RequestEntriesCount<>(); + + @Override + public RequestTypeId getRequestTypeId() { + return RequestTypeId.ENTRIES_COUNT; + } + } + record RequestGetAllInRange() implements RequestGetRange { private static final RequestGetAllInRange INSTANCE = new RequestGetAllInRange<>(); diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java index 0b09c62..9baf225 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java @@ -33,7 +33,8 @@ public class RocksDBException extends RuntimeException { SST_WRITE_4, SST_GET_SIZE_FAILED, UNSUPPORTED_COLUMN_TYPE, - NOT_IMPLEMENTED + NOT_IMPLEMENTED, + GET_PROPERTY_ERROR } public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) { diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index 5536a1c..de9aa40 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -6,8 +6,10 @@ import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue; import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue; import it.cavallium.rockserver.core.common.*; +import it.cavallium.rockserver.core.common.RequestType.RequestEntriesCount; import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestPut; +import it.cavallium.rockserver.core.common.RocksDBException; import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; import it.cavallium.rockserver.core.config.*; import it.cavallium.rockserver.core.impl.rocksdb.*; @@ -42,7 +44,6 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.rocksdb.*; -import org.rocksdb.RocksDBException; import org.rocksdb.Status.Code; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,7 +105,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { var columnSchemasColumnDescriptor = new ColumnFamilyDescriptor(COLUMN_SCHEMAS_COLUMN); try { columnSchemasColumnDescriptorHandle = db.get().createColumnFamily(columnSchemasColumnDescriptor); - } catch (RocksDBException e) { + } catch (org.rocksdb.RocksDBException e) { throw new IOException("Cannot create system column", e); } } else { @@ -186,7 +187,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } logger.info("Registered column: " + column); return id; - } catch (RocksDBException e) { + } catch (org.rocksdb.RocksDBException e) { throw new RuntimeException(e); } } @@ -201,7 +202,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { String name; try { name = new String(col.cfh().getName(), StandardCharsets.UTF_8); - } catch (RocksDBException e) { + } catch (org.rocksdb.RocksDBException e) { throw new RuntimeException(e); } // Unregister the column name from the index avoiding race conditions @@ -282,7 +283,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } else { // Transaction not found if (commit) { - throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.TX_NOT_FOUND, "Transaction not found: " + transactionId); + throw RocksDBException.of(RocksDBErrorType.TX_NOT_FOUND, "Transaction not found: " + transactionId); } else { return true; } @@ -308,7 +309,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { // Close the transaction operation ops.endOp(); return true; - } catch (RocksDBException e) { + } catch (org.rocksdb.RocksDBException e) { // Close the transaction operation ops.endOp(); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COMMIT_FAILED, "Transaction close failed"); @@ -323,11 +324,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { this.closeTransaction(updateId, false); } - private boolean commitTxOptimistically(@NotNull Tx tx) throws RocksDBException { + private boolean commitTxOptimistically(@NotNull Tx tx) throws org.rocksdb.RocksDBException { try { tx.val().commit(); return true; - } catch (RocksDBException ex) { + } catch (org.rocksdb.RocksDBException ex) { var status = ex.getStatus() != null ? ex.getStatus().getCode() : Code.Ok; if (status == Code.Busy || status == Code.TryAgain) { return false; @@ -365,7 +366,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { byte[] value = encodeColumnSchema(schema); db.get().put(columnSchemasColumnDescriptorHandle, key, value); return registerColumn(new ColumnInstance(cf, schema)); - } catch (RocksDBException | GestaltException e) { + } catch (org.rocksdb.RocksDBException | GestaltException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_CREATE_FAIL, e); } } @@ -384,7 +385,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { try { db.get().dropColumnFamily(col.cfh()); unregisterColumn(columnId).close(); - } catch (RocksDBException e) { + } catch (org.rocksdb.RocksDBException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_DELETE_FAIL, e); } } @@ -606,7 +607,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { return SSTWriter.open(tempSSTsPath, db, col, columnConifg, forceNoOptions, ingestBehind, refs); } catch (IOException ex) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_2, ex); - } catch (RocksDBException ex) { + } catch (org.rocksdb.RocksDBException ex) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_3, ex); } } @@ -710,7 +711,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { var k = Utils.toByteArray(calculatedKey); var v = Utils.toByteArray(bucket.toSegment(arena)); ((Tx) dbWriter).val().put(col.cfh(), k, v); - } catch (RocksDBException e) { + } catch (org.rocksdb.RocksDBException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_1, e); } } else { @@ -720,7 +721,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { byte[] previousValueByteArray; previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValue = transformResultValue(col, toMemorySegment(arena, previousValueByteArray)); - } catch (RocksDBException e) { + } catch (org.rocksdb.RocksDBException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); } } else if (RequestType.requiresGettingPreviousPresence(callback)) { @@ -730,7 +731,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { byte[] previousValueByteArray; previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValue = previousValueByteArray != null ? MemorySegment.NULL : null; - } catch (RocksDBException e) { + } catch (org.rocksdb.RocksDBException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); } } else { @@ -845,7 +846,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { foundValue = null; } existsValue = foundValue != null; - } catch (RocksDBException e) { + } catch (org.rocksdb.RocksDBException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.GET_1, e); } } else { @@ -855,7 +856,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { try (var readOptions = new ReadOptions()) { foundValue = dbGet(tx, col, arena, readOptions, calculatedKey); existsValue = foundValue != null; - } catch (RocksDBException e) { + } catch (org.rocksdb.RocksDBException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); } } else if (callback instanceof RequestType.RequestExists) { @@ -965,10 +966,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { try { var col = getColumn(columnId); - if (requestType instanceof RequestType.RequestGetFirstAndLast) { + if (requestType instanceof RequestType.RequestGetFirstAndLast + || requestType instanceof RequestType.RequestEntriesCount) { if (col.hasBuckets()) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.UNSUPPORTED_COLUMN_TYPE, - "Can't get the first and last range element of a column with buckets"); + "Can't execute this request type on a column with buckets"); } } @@ -993,6 +995,29 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } try (it) { return (T) switch (requestType) { + case RequestEntriesCount _ -> { + long count = 0; + it.seekToFirst(); + if (calculatedStartKey != null || calculatedEndKey != null) { + while (it.isValid()) { + count++; + it.next(); + } + yield count; + } else { + Map props = null; + try { + props = db.get().getPropertiesOfAllTables(col.cfh()); + } catch (org.rocksdb.RocksDBException e) { + throw RocksDBException.of(RocksDBErrorType.GET_PROPERTY_ERROR, e); + } + long entries = 0; + for (TableProperties tableProperties : props.values()) { + entries += tableProperties.getNumEntries(); + } + yield entries; + } + } case RequestType.RequestGetFirstAndLast _ -> { if (!reverse) { it.seekToFirst(); @@ -1128,7 +1153,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { ColumnInstance col, Arena arena, ReadOptions readOptions, - MemorySegment calculatedKey) throws RocksDBException { + MemorySegment calculatedKey) throws org.rocksdb.RocksDBException { if (tx != null) { byte[] previousRawBucketByteArray; if (tx.isFromGetForUpdate()) { @@ -1150,7 +1175,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { @Nullable private MemorySegment dbGetDirect(Arena arena, ColumnFamilyHandle cfh, ReadOptions readOptions, MemorySegment calculatedKey) - throws RocksDBException { + throws org.rocksdb.RocksDBException { // Get the key nio buffer to pass to RocksDB ByteBuffer keyNioBuffer = calculatedKey.asByteBuffer();