diff --git a/pom.xml b/pom.xml
index 63fd1a2..3181318 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,6 +119,12 @@
io.grpc
grpc-netty
${grpc.version}
+
+
+ com.google.code.findbugs
+ jsr305
+
+
io.netty
@@ -140,11 +146,23 @@
io.grpc
grpc-protobuf
${grpc.version}
+
+
+ com.google.code.findbugs
+ jsr305
+
+
io.grpc
grpc-stub
${grpc.version}
+
+
+ com.google.code.findbugs
+ jsr305
+
+
org.reactivestreams
@@ -156,6 +174,18 @@
reactor-core
3.6.4
+
+ com.salesforce.servicelibs
+ reactor-grpc-stub
+ 1.2.4
+ provided
+
+
+ com.google.code.findbugs
+ jsr305
+
+
+
org.lz4
@@ -244,6 +274,15 @@
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
grpc-java
io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+
+
+ reactor-grpc
+ com.salesforce.servicelibs
+ reactor-grpc
+ 1.2.4
+ com.salesforce.reactorgrpc.ReactorGrpcGenerator
+
+
diff --git a/src/fatjar/java/module-info.java b/src/fatjar/java/module-info.java
index cef924d..8827a36 100644
--- a/src/fatjar/java/module-info.java
+++ b/src/fatjar/java/module-info.java
@@ -13,7 +13,6 @@ module rockserver.core {
requires io.grpc.protobuf;
requires io.grpc.stub;
requires io.grpc;
- requires jsr305;
requires com.google.common;
requires io.grpc.netty;
requires io.jstach.rainbowgum;
@@ -31,6 +30,8 @@ module rockserver.core {
requires org.reactivestreams;
requires io.netty.transport.unix.common;
requires reactor.core;
+ requires reactor.grpc.stub;
+ requires java.annotation;
exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common;
diff --git a/src/library/java/module-info.java b/src/library/java/module-info.java
index bb8c172..626ad40 100644
--- a/src/library/java/module-info.java
+++ b/src/library/java/module-info.java
@@ -13,7 +13,6 @@ module rockserver.core {
requires io.grpc.protobuf;
requires io.grpc.stub;
requires io.grpc;
- requires jsr305;
requires com.google.common;
requires io.grpc.netty;
requires io.netty.common;
@@ -28,6 +27,8 @@ module rockserver.core {
requires io.netty.transport.classes.epoll;
requires org.reactivestreams;
requires io.netty.transport.unix.common;
+ requires reactor.grpc.stub;
+ requires java.annotation;
exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common;
diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java
index 67693c3..e289b60 100644
--- a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java
+++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java
@@ -88,7 +88,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
}
@Override
- public RS requestSync(RocksDBAPICommand req) {
+ public RS requestSync(RocksDBAPICommand req) {
return req.handleSync(this);
}
@@ -176,7 +176,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
}
@Override
- public T reduceRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange super KV, T> requestType, long timeoutMs) throws RocksDBException {
+ public T reduceRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestReduceRange super KV, T> requestType, long timeoutMs) throws RocksDBException {
return db.reduceRange(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs);
}
}
diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java
index 33b2aa8..45461ea 100644
--- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java
+++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java
@@ -73,6 +73,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
private final RocksDBServiceBlockingStub blockingStub;
private final RocksDBServiceStub asyncStub;
private final RocksDBServiceFutureStub futureStub;
+ private final ReactorRocksDBServiceGrpc.ReactorRocksDBServiceStub reactiveStub;
private final URI address;
private GrpcConnection(String name, SocketAddress socketAddress, URI address) {
@@ -102,6 +103,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
this.blockingStub = RocksDBServiceGrpc.newBlockingStub(channel);
this.asyncStub = RocksDBServiceGrpc.newStub(channel);
this.futureStub = RocksDBServiceGrpc.newFutureStub(channel);
+ this.reactiveStub = ReactorRocksDBServiceGrpc.newReactorStub(channel);
this.address = address;
}
@@ -515,8 +517,8 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
}
@SuppressWarnings("unchecked")
- @Override
- public CompletableFuture reduceRangeAsync(Arena arena, long transactionId, long columnId, @NotNull Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange super it.cavallium.rockserver.core.common.KV, T> requestType, long timeoutMs) throws RocksDBException {
+ @Override
+ public CompletableFuture reduceRangeAsync(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestReduceRange super it.cavallium.rockserver.core.common.KV, T> requestType, long timeoutMs) throws RocksDBException {
var request = GetRangeRequest.newBuilder()
.setTransactionId(transactionId)
.setColumnId(columnId)
@@ -527,16 +529,28 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
.build();
return (CompletableFuture) switch (requestType) {
case RequestType.RequestGetFirstAndLast> _ ->
- toResponse(this.futureStub.getRangeFirstAndLast(request), result -> new FirstAndLast<>(
+ toResponse(this.futureStub.reduceRangeFirstAndLast(request), result -> new FirstAndLast<>(
result.hasFirst() ? mapKV(arena, result.getFirst()) : null,
result.hasLast() ? mapKV(arena, result.getLast()) : null
));
};
}
- @Override
- public Publisher getRangeStream(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange super it.cavallium.rockserver.core.common.KV, T> requestType, long timeoutMs) throws RocksDBException {
- // todo: implement
+ @SuppressWarnings("unchecked")
+ @Override
+ public Publisher getRangeAsync(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange super it.cavallium.rockserver.core.common.KV, T> requestType, long timeoutMs) throws RocksDBException {
+ var request = GetRangeRequest.newBuilder()
+ .setTransactionId(transactionId)
+ .setColumnId(columnId)
+ .addAllStartKeysInclusive(mapKeys(startKeysInclusive))
+ .addAllEndKeysExclusive(mapKeys(endKeysExclusive))
+ .setReverse(reverse)
+ .setTimeoutMs(timeoutMs)
+ .build();
+ return (Publisher) switch (requestType) {
+ case RequestType.RequestGetAllInRange> _ -> reactiveStub.getAllInRange(request)
+ .map(kv -> mapKV(arena, kv));
+ };
}
private static it.cavallium.rockserver.core.common.Delta mapDelta(Delta x) {
diff --git a/src/main/java/it/cavallium/rockserver/core/client/LoggingClient.java b/src/main/java/it/cavallium/rockserver/core/client/LoggingClient.java
index 8966efc..c6c190c 100644
--- a/src/main/java/it/cavallium/rockserver/core/client/LoggingClient.java
+++ b/src/main/java/it/cavallium/rockserver/core/client/LoggingClient.java
@@ -6,11 +6,12 @@ import it.cavallium.rockserver.core.common.RocksDBSyncAPI;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.function.Supplier;
-import java.util.logging.Level;
+
+import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+import reactor.core.publisher.Flux;
public class LoggingClient implements RocksDBConnection {
@@ -55,9 +56,9 @@ public class LoggingClient implements RocksDBConnection {
}
@Override
- public R requestSync(RocksDBAPICommand req) {
+ public SYNC_RESULT requestSync(RocksDBAPICommand req) {
logger.trace("Request input (sync): {}", req);
- R result;
+ SYNC_RESULT result;
try {
result = syncApi.requestSync(req);
} catch (Throwable e) {
@@ -77,16 +78,37 @@ public class LoggingClient implements RocksDBConnection {
this.asyncApi = asyncApi;
}
- @Override
- public CompletableFuture requestAsync(RocksDBAPICommand req) {
- logger.trace("Request input (async): {}", req);
- return asyncApi.requestAsync(req).whenComplete((result, e) -> {
- if (e != null) {
- logger.trace("Request failed: {} Error: {}", req, e.getMessage());
- } else {
- logger.trace("Request executed: {} Result: {}", req, result);
- }
- });
+ @SuppressWarnings("unchecked")
+ @Override
+ public ASYNC_RESULT requestAsync(RocksDBAPICommand req) {
+ if (!logger.isEnabledForLevel(Level.TRACE)) {
+ return asyncApi.requestAsync(req);
+ } else {
+ logger.trace("Request input (async): {}", req);
+ var r = asyncApi.requestAsync(req);
+ return switch (req) {
+ case RocksDBAPICommand.RocksDBAPICommandSingle> _ ->
+ (ASYNC_RESULT) ((CompletableFuture>) r).whenComplete((result, e) -> {
+ if (e != null) {
+ logger.trace("Request failed: {} Error: {}", req, e.getMessage());
+ } else {
+ logger.trace("Request executed: {} Result: {}", req, result);
+ }
+ });
+ case RocksDBAPICommand.RocksDBAPICommandStream> _ ->
+ (ASYNC_RESULT) Flux.from((Publisher>) r).doOnEach(signal -> {
+ if (signal.isOnNext()) {
+ logger.trace("Request: {} Partial result: {}", req, signal);
+ } else if (signal.isOnError()) {
+ var e = signal.getThrowable();
+ assert e != null;
+ logger.trace("Request failed: {} Error: {}", req, e.getMessage());
+ } else if (signal.isOnComplete()) {
+ logger.trace("Request executed: {} Result: terminated successfully", req);
+ }
+ });
+ };
+ }
}
}
}
diff --git a/src/main/java/it/cavallium/rockserver/core/common/RequestType.java b/src/main/java/it/cavallium/rockserver/core/common/RequestType.java
index e5a21f9..73b9a57 100644
--- a/src/main/java/it/cavallium/rockserver/core/common/RequestType.java
+++ b/src/main/java/it/cavallium/rockserver/core/common/RequestType.java
@@ -1,7 +1,6 @@
package it.cavallium.rockserver.core.common;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -99,6 +98,11 @@ public sealed interface RequestType {
return (RequestGetFirstAndLast) RequestGetFirstAndLast.INSTANCE;
}
+ @SuppressWarnings("unchecked")
+ static RequestGetAllInRange allInRange() {
+ return (RequestGetAllInRange) RequestGetAllInRange.INSTANCE;
+ }
+
@SuppressWarnings("unchecked")
static RequestNothing none() {
return (RequestNothing) RequestNothing.INSTANCE;
@@ -110,6 +114,8 @@ public sealed interface RequestType {
sealed interface RequestGet extends RequestType {}
+ sealed interface RequestReduceRange extends RequestType {}
+
sealed interface RequestGetRange extends RequestType {}
sealed interface RequestIterate extends RequestType {}
@@ -205,7 +211,7 @@ public sealed interface RequestType {
}
}
- record RequestGetFirstAndLast() implements RequestGetRange> {
+ record RequestGetFirstAndLast() implements RequestReduceRange> {
private static final RequestGetFirstAndLast