From d34f225b55607ad62afba7dd9889f3a23941263a Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 23 Oct 2024 20:16:48 +0200 Subject: [PATCH] Bugfixes --- .../core/client/EmbeddedConnection.java | 6 ++-- .../core/client/GrpcConnection.java | 3 ++ .../core/common/RocksDBAsyncAPI.java | 11 ++++++-- .../rockserver/core/impl/EmbeddedDB.java | 28 +++++++++---------- .../rockserver/core/server/GrpcServer.java | 27 +++++++++++++++++- src/main/proto/rocksdb.proto | 2 ++ 6 files changed, 56 insertions(+), 21 deletions(-) diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java index d21b53c..9f7b1d5 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java @@ -15,14 +15,11 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.stream.Stream; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { @@ -97,6 +94,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { public RA requestAsync(RocksDBAPICommand req) { return (RA) switch (req) { case RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch putBatch -> this.putBatchAsync(putBatch.columnId(), putBatch.batchPublisher(), putBatch.mode()); + case RocksDBAPICommand.RocksDBAPICommandStream.GetRange getRange -> this.getRangeAsync(getRange.arena(), getRange.transactionId(), getRange.columnId(), getRange.startKeysInclusive(), getRange.endKeysExclusive(), getRange.reverse(), getRange.requestType(), getRange.timeoutMs()); case RocksDBAPICommand.RocksDBAPICommandSingle _ -> CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor); case RocksDBAPICommand.RocksDBAPICommandStream _ -> throw RocksDBException.of(RocksDBException.RocksDBErrorType.NOT_IMPLEMENTED, "The request of type " + req.getClass().getName() + " is not implemented in class " + this.getClass().getName()); }; @@ -187,6 +185,6 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { @Override public Publisher getRangeAsync(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange requestType, long timeoutMs) throws RocksDBException { - return db.getRangeAsync(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs); + return db.getRangeAsyncInternal(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs); } } diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java index e1bc50b..66400ef 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -411,6 +411,9 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { result.hasFirst() ? mapKV(arena, result.getFirst()) : null, result.hasLast() ? mapKV(arena, result.getLast()) : null )); + case RequestType.RequestEntriesCount _ -> + toResponse(this.futureStub.reduceRangeEntriesCount(request), EntriesCount::getCount); + default -> throw new UnsupportedOperationException(); }; } diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java index fabeae3..3b34041 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java @@ -160,7 +160,14 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler { boolean reverse, RequestType.RequestGetRange requestType, long timeoutMs) throws RocksDBException { - throw RocksDBException.of(RocksDBException.RocksDBErrorType.NOT_IMPLEMENTED, - "GetRangeStream is not implemented"); + return requestAsync(new GetRange<>(arena, + transactionId, + columnId, + startKeysInclusive, + endKeysExclusive, + reverse, + requestType, + timeoutMs + )); } } diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index de9aa40..2d736ac 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -975,11 +975,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } try (var ro = new ReadOptions()) { - MemorySegment calculatedStartKey = startKeysInclusive != null ? col.calculateKey(arena, startKeysInclusive.keys()) : null; - MemorySegment calculatedEndKey = endKeysExclusive != null ? col.calculateKey(arena, endKeysExclusive.keys()) : null; + MemorySegment calculatedStartKey = startKeysInclusive != null && startKeysInclusive.keys().length > 0 ? col.calculateKey(arena, startKeysInclusive.keys()) : null; + MemorySegment calculatedEndKey = endKeysExclusive != null && endKeysExclusive.keys().length > 0 ? col.calculateKey(arena, endKeysExclusive.keys()) : null; try (var startKeySlice = calculatedStartKey != null ? toDirectSlice(calculatedStartKey) : null; var endKeySlice = calculatedEndKey != null ? toDirectSlice(calculatedEndKey) : null) { - if (startKeysInclusive != null) { + if (startKeySlice != null) { ro.setIterateLowerBound(startKeySlice); } if (endKeySlice != null) { @@ -996,16 +996,16 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { try (it) { return (T) switch (requestType) { case RequestEntriesCount _ -> { - long count = 0; - it.seekToFirst(); - if (calculatedStartKey != null || calculatedEndKey != null) { + if (calculatedStartKey != null || calculatedEndKey != null || path == null) { + long count = 0; + it.seekToFirst(); while (it.isValid()) { count++; it.next(); } yield count; } else { - Map props = null; + Map props ; try { props = db.get().getPropertiesOfAllTables(col.cfh()); } catch (org.rocksdb.RocksDBException e) { @@ -1054,12 +1054,12 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { @Override public Stream getRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange requestType, long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { return Flux - .from(this.getRangeAsync(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs)) + .from(this.getRangeAsyncInternal(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs)) .toStream(); } /** See: {@link it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange}. */ - public Publisher getRangeAsync(Arena arena, + public Publisher getRangeAsyncInternal(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @@ -1071,8 +1071,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { AbstractSlice endKeySlice, RocksIterator it) { public void close() { ro.close(); - startKeySlice.close(); - endKeySlice.close(); + if (startKeySlice != null) startKeySlice.close(); + if (endKeySlice != null) endKeySlice.close(); it.close(); } } @@ -1088,13 +1088,13 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { var ro = new ReadOptions(); try { - MemorySegment calculatedStartKey = startKeysInclusive != null ? col.calculateKey(arena, startKeysInclusive.keys()) : null; - MemorySegment calculatedEndKey = endKeysExclusive != null ? col.calculateKey(arena, endKeysExclusive.keys()) : null; + MemorySegment calculatedStartKey = startKeysInclusive != null && startKeysInclusive.keys().length > 0 ? col.calculateKey(arena, startKeysInclusive.keys()) : null; + MemorySegment calculatedEndKey = endKeysExclusive != null && endKeysExclusive.keys().length > 0 ? col.calculateKey(arena, endKeysExclusive.keys()) : null; var startKeySlice = calculatedStartKey != null ? toDirectSlice(calculatedStartKey) : null; try { var endKeySlice = calculatedEndKey != null ? toDirectSlice(calculatedEndKey) : null; try { - if (startKeysInclusive != null) { + if (startKeySlice != null) { ro.setIterateLowerBound(startKeySlice); } if (endKeySlice != null) { diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index a1e0ddb..cd673a1 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -489,6 +489,25 @@ public class GrpcServer extends Server { }); } + @Override + public Mono reduceRangeEntriesCount(GetRangeRequest request) { + return executeSync(() -> { + try (var arena = Arena.ofConfined()) { + long entriesCount + = api.reduceRange(arena, + request.getTransactionId(), + request.getColumnId(), + mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive), + mapKeys(arena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive), + request.getReverse(), + RequestType.entriesCount(), + request.getTimeoutMs() + ); + return EntriesCount.newBuilder().setCount(entriesCount).build(); + } + }); + } + @Override public Flux getAllInRange(GetRangeRequest request) { var arena = Arena.ofAuto(); @@ -502,7 +521,13 @@ public class GrpcServer extends Server { RequestType.allInRange(), request.getTimeoutMs() )) - .map(GrpcServerImpl::unmapKV); + .map(GrpcServerImpl::unmapKV) + .onErrorResume(ex -> { + if (!(ex instanceof RocksDBException)) { + LOG.error("Unexpected error during request: {}", request, ex); + } + return Mono.error(ex); + }); } private static void closeArenaSafe(Arena autoArena) { diff --git a/src/main/proto/rocksdb.proto b/src/main/proto/rocksdb.proto index 920eb6a..140580d 100644 --- a/src/main/proto/rocksdb.proto +++ b/src/main/proto/rocksdb.proto @@ -96,6 +96,7 @@ message SubsequentRequest {int64 iterationId = 1; int64 skipCount = 2; int64 tak message GetRangeRequest {int64 transactionId = 1; int64 columnId = 2; repeated bytes startKeysInclusive = 3; repeated bytes endKeysExclusive = 4; bool reverse = 5; int64 timeoutMs = 6;} message FirstAndLast {optional KV first = 1; optional KV last = 2;} +message EntriesCount {int64 count = 1;} service RocksDBService { rpc openTransaction(OpenTransactionRequest) returns (OpenTransactionResponse); @@ -125,5 +126,6 @@ service RocksDBService { rpc subsequentExists(SubsequentRequest) returns (PreviousPresence); rpc subsequentMultiGet(SubsequentRequest) returns (stream KV); rpc reduceRangeFirstAndLast(GetRangeRequest) returns (FirstAndLast); + rpc reduceRangeEntriesCount(GetRangeRequest) returns (EntriesCount); rpc getAllInRange(GetRangeRequest) returns (stream KV); }