Add GetAllInRange, RequestReduceRange, use reactive grpc server

This commit is contained in:
Andrea Cavalli 2024-10-21 13:22:31 +02:00
parent 9e06f9b9c2
commit b8b552cb18
14 changed files with 450 additions and 576 deletions

39
pom.xml
View File

@ -119,6 +119,12 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.netty</groupId>
@ -140,11 +146,23 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
@ -156,6 +174,18 @@
<artifactId>reactor-core</artifactId>
<version>3.6.4</version>
</dependency>
<dependency>
<groupId>com.salesforce.servicelibs</groupId>
<artifactId>reactor-grpc-stub</artifactId>
<version>1.2.4</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
@ -244,6 +274,15 @@
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protocPlugins>
<protocPlugin>
<id>reactor-grpc</id>
<groupId>com.salesforce.servicelibs</groupId>
<artifactId>reactor-grpc</artifactId>
<version>1.2.4</version>
<mainClass>com.salesforce.reactorgrpc.ReactorGrpcGenerator</mainClass>
</protocPlugin>
</protocPlugins>
</configuration>
<executions>
<execution>

View File

@ -13,7 +13,6 @@ module rockserver.core {
requires io.grpc.protobuf;
requires io.grpc.stub;
requires io.grpc;
requires jsr305;
requires com.google.common;
requires io.grpc.netty;
requires io.jstach.rainbowgum;
@ -31,6 +30,8 @@ module rockserver.core {
requires org.reactivestreams;
requires io.netty.transport.unix.common;
requires reactor.core;
requires reactor.grpc.stub;
requires java.annotation;
exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common;

View File

@ -13,7 +13,6 @@ module rockserver.core {
requires io.grpc.protobuf;
requires io.grpc.stub;
requires io.grpc;
requires jsr305;
requires com.google.common;
requires io.grpc.netty;
requires io.netty.common;
@ -28,6 +27,8 @@ module rockserver.core {
requires io.netty.transport.classes.epoll;
requires org.reactivestreams;
requires io.netty.transport.unix.common;
requires reactor.grpc.stub;
requires java.annotation;
exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common;

View File

@ -88,7 +88,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
}
@Override
public <R, RS> RS requestSync(RocksDBAPICommand<R, RS, ?> req) {
public <R, RS, RA> RS requestSync(RocksDBAPICommand<R, RS, RA> req) {
return req.handleSync(this);
}
@ -176,7 +176,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
}
@Override
public <T> T reduceRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange<? super KV, T> requestType, long timeoutMs) throws RocksDBException {
public <T> T reduceRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestReduceRange<? super KV, T> requestType, long timeoutMs) throws RocksDBException {
return db.reduceRange(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs);
}
}

View File

@ -73,6 +73,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
private final RocksDBServiceBlockingStub blockingStub;
private final RocksDBServiceStub asyncStub;
private final RocksDBServiceFutureStub futureStub;
private final ReactorRocksDBServiceGrpc.ReactorRocksDBServiceStub reactiveStub;
private final URI address;
private GrpcConnection(String name, SocketAddress socketAddress, URI address) {
@ -102,6 +103,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
this.blockingStub = RocksDBServiceGrpc.newBlockingStub(channel);
this.asyncStub = RocksDBServiceGrpc.newStub(channel);
this.futureStub = RocksDBServiceGrpc.newFutureStub(channel);
this.reactiveStub = ReactorRocksDBServiceGrpc.newReactorStub(channel);
this.address = address;
}
@ -516,7 +518,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
@SuppressWarnings("unchecked")
@Override
public <T> CompletableFuture<T> reduceRangeAsync(Arena arena, long transactionId, long columnId, @NotNull Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange<? super it.cavallium.rockserver.core.common.KV, T> requestType, long timeoutMs) throws RocksDBException {
public <T> CompletableFuture<T> reduceRangeAsync(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestReduceRange<? super it.cavallium.rockserver.core.common.KV, T> requestType, long timeoutMs) throws RocksDBException {
var request = GetRangeRequest.newBuilder()
.setTransactionId(transactionId)
.setColumnId(columnId)
@ -527,16 +529,28 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
.build();
return (CompletableFuture<T>) switch (requestType) {
case RequestType.RequestGetFirstAndLast<?> _ ->
toResponse(this.futureStub.getRangeFirstAndLast(request), result -> new FirstAndLast<>(
toResponse(this.futureStub.reduceRangeFirstAndLast(request), result -> new FirstAndLast<>(
result.hasFirst() ? mapKV(arena, result.getFirst()) : null,
result.hasLast() ? mapKV(arena, result.getLast()) : null
));
};
}
@SuppressWarnings("unchecked")
@Override
public <T> Publisher<T> getRangeStream(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange<? super it.cavallium.rockserver.core.common.KV, T> requestType, long timeoutMs) throws RocksDBException {
// todo: implement
public <T> Publisher<T> getRangeAsync(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange<? super it.cavallium.rockserver.core.common.KV, T> requestType, long timeoutMs) throws RocksDBException {
var request = GetRangeRequest.newBuilder()
.setTransactionId(transactionId)
.setColumnId(columnId)
.addAllStartKeysInclusive(mapKeys(startKeysInclusive))
.addAllEndKeysExclusive(mapKeys(endKeysExclusive))
.setReverse(reverse)
.setTimeoutMs(timeoutMs)
.build();
return (Publisher<T>) switch (requestType) {
case RequestType.RequestGetAllInRange<?> _ -> reactiveStub.getAllInRange(request)
.map(kv -> mapKV(arena, kv));
};
}
private static it.cavallium.rockserver.core.common.Delta<MemorySegment> mapDelta(Delta x) {

View File

@ -6,11 +6,12 @@ import it.cavallium.rockserver.core.common.RocksDBSyncAPI;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
public class LoggingClient implements RocksDBConnection {
@ -55,9 +56,9 @@ public class LoggingClient implements RocksDBConnection {
}
@Override
public <R> R requestSync(RocksDBAPICommand<R> req) {
public <RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_RESULT> SYNC_RESULT requestSync(RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_RESULT> req) {
logger.trace("Request input (sync): {}", req);
R result;
SYNC_RESULT result;
try {
result = syncApi.requestSync(req);
} catch (Throwable e) {
@ -77,16 +78,37 @@ public class LoggingClient implements RocksDBConnection {
this.asyncApi = asyncApi;
}
@SuppressWarnings("unchecked")
@Override
public <R> CompletableFuture<R> requestAsync(RocksDBAPICommand<R> req) {
public <RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_RESULT> ASYNC_RESULT requestAsync(RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_RESULT> req) {
if (!logger.isEnabledForLevel(Level.TRACE)) {
return asyncApi.requestAsync(req);
} else {
logger.trace("Request input (async): {}", req);
return asyncApi.requestAsync(req).whenComplete((result, e) -> {
var r = asyncApi.requestAsync(req);
return switch (req) {
case RocksDBAPICommand.RocksDBAPICommandSingle<?> _ ->
(ASYNC_RESULT) ((CompletableFuture<?>) r).whenComplete((result, e) -> {
if (e != null) {
logger.trace("Request failed: {} Error: {}", req, e.getMessage());
} else {
logger.trace("Request executed: {} Result: {}", req, result);
}
});
case RocksDBAPICommand.RocksDBAPICommandStream<?> _ ->
(ASYNC_RESULT) Flux.from((Publisher<?>) r).doOnEach(signal -> {
if (signal.isOnNext()) {
logger.trace("Request: {} Partial result: {}", req, signal);
} else if (signal.isOnError()) {
var e = signal.getThrowable();
assert e != null;
logger.trace("Request failed: {} Error: {}", req, e.getMessage());
} else if (signal.isOnComplete()) {
logger.trace("Request executed: {} Result: terminated successfully", req);
}
});
};
}
}
}
}

View File

@ -1,7 +1,6 @@
package it.cavallium.rockserver.core.common;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -99,6 +98,11 @@ public sealed interface RequestType<METHOD_DATA_TYPE, RESULT_TYPE> {
return (RequestGetFirstAndLast<T>) RequestGetFirstAndLast.INSTANCE;
}
@SuppressWarnings("unchecked")
static <T> RequestGetAllInRange<T> allInRange() {
return (RequestGetAllInRange<T>) RequestGetAllInRange.INSTANCE;
}
@SuppressWarnings("unchecked")
static <T> RequestNothing<T> none() {
return (RequestNothing<T>) RequestNothing.INSTANCE;
@ -110,6 +114,8 @@ public sealed interface RequestType<METHOD_DATA_TYPE, RESULT_TYPE> {
sealed interface RequestGet<T, U> extends RequestType<T, U> {}
sealed interface RequestReduceRange<T, U> extends RequestType<T, U> {}
sealed interface RequestGetRange<T, U> extends RequestType<T, U> {}
sealed interface RequestIterate<T, U> extends RequestType<T, U> {}
@ -205,7 +211,7 @@ public sealed interface RequestType<METHOD_DATA_TYPE, RESULT_TYPE> {
}
}
record RequestGetFirstAndLast<T>() implements RequestGetRange<T, FirstAndLast<T>> {
record RequestGetFirstAndLast<T>() implements RequestReduceRange<T, FirstAndLast<T>> {
private static final RequestGetFirstAndLast<Object> INSTANCE = new RequestGetFirstAndLast<>();
@ -214,4 +220,14 @@ public sealed interface RequestType<METHOD_DATA_TYPE, RESULT_TYPE> {
return RequestTypeId.FIRST_AND_LAST;
}
}
record RequestGetAllInRange<T>() implements RequestGetRange<T, FirstAndLast<T>> {
private static final RequestGetAllInRange<Object> INSTANCE = new RequestGetAllInRange<>();
@Override
public RequestTypeId getRequestTypeId() {
return RequestTypeId.FIRST_AND_LAST;
}
}
}

View File

@ -410,7 +410,7 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
@Nullable Keys startKeysInclusive,
@Nullable Keys endKeysExclusive,
boolean reverse,
RequestType.RequestGetRange<? super KV, T> requestType,
RequestType.RequestReduceRange<? super KV, T> requestType,
long timeoutMs) implements RocksDBAPICommandSingle<T> {
@Override

View File

@ -17,6 +17,7 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSi
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.SeekTo;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Subsequent;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.List;
@ -137,7 +138,7 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
@Nullable Keys startKeysInclusive,
@Nullable Keys endKeysExclusive,
boolean reverse,
RequestType.RequestGetRange<? super KV, T> requestType,
RequestType.RequestReduceRange<? super KV, T> requestType,
long timeoutMs) throws RocksDBException {
return requestAsync(new ReduceRange<>(arena,
transactionId,

View File

@ -131,7 +131,7 @@ public interface RocksDBSyncAPI extends RocksDBSyncAPIRequestHandler {
@Nullable Keys startKeysInclusive,
@Nullable Keys endKeysExclusive,
boolean reverse,
@NotNull RequestType.RequestGetRange<? super KV, T> requestType,
@NotNull RequestType.RequestReduceRange<? super KV, T> requestType,
long timeoutMs) throws RocksDBException {
return requestSync(new ReduceRange<>(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs));
}

View File

@ -955,7 +955,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
@Nullable Keys startKeysInclusive,
@Nullable Keys endKeysExclusive,
boolean reverse,
RequestType.@NotNull RequestGetRange<? super KV, T> requestType,
RequestType.@NotNull RequestReduceRange<? super KV, T> requestType,
long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
ops.beginOp();
try {

View File

@ -48,8 +48,10 @@ import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -62,6 +64,11 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class GrpcServer extends Server {
@ -69,7 +76,7 @@ public class GrpcServer extends Server {
private final GrpcServerImpl grpc;
private final EventLoopGroup elg;
private final ExecutorService executor;
private final Scheduler executor;
private final io.grpc.Server server;
public GrpcServer(RocksDBConnection client, SocketAddress socketAddress) throws IOException {
@ -85,7 +92,7 @@ public class GrpcServer extends Server {
channelType = NioServerSocketChannel.class;
}
this.elg = elg;
this.executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() * 2);
this.executor = Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors() * 2, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "server-db-executor");
this.server = NettyServerBuilder
.forAddress(socketAddress)
.bossEventLoopGroup(elg)
@ -104,7 +111,7 @@ public class GrpcServer extends Server {
server.start();
}
private final class GrpcServerImpl extends RocksDBServiceImplBase {
private final class GrpcServerImpl extends ReactorRocksDBServiceGrpc.RocksDBServiceImplBase {
private final RocksDBAsyncAPI asyncApi;
private final RocksDBSyncAPI api;
@ -116,93 +123,58 @@ public class GrpcServer extends Server {
// functions
@Override
public void openTransaction(OpenTransactionRequest request,
StreamObserver<OpenTransactionResponse> responseObserver) {
executor.execute(() -> {
try {
public Mono<OpenTransactionResponse> openTransaction(OpenTransactionRequest request) {
return executeSync(() -> {
var txId = api.openTransaction(request.getTimeoutMs());
responseObserver.onNext(OpenTransactionResponse.newBuilder().setTransactionId(txId).build());
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
return OpenTransactionResponse.newBuilder().setTransactionId(txId).build();
});
}
@Override
public void closeTransaction(CloseTransactionRequest request,
StreamObserver<CloseTransactionResponse> responseObserver) {
executor.execute(() -> {
try {
public Mono<CloseTransactionResponse> closeTransaction(CloseTransactionRequest request) {
return executeSync(() -> {
var committed = api.closeTransaction(request.getTransactionId(), request.getCommit());
var response = CloseTransactionResponse.newBuilder().setSuccessful(committed).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
return CloseTransactionResponse.newBuilder().setSuccessful(committed).build();
});
}
@Override
public void closeFailedUpdate(CloseFailedUpdateRequest request, StreamObserver<Empty> responseObserver) {
executor.execute(() -> {
try {
public Mono<Empty> closeFailedUpdate(CloseFailedUpdateRequest request) {
return executeSync(() -> {
api.closeFailedUpdate(request.getUpdateId());
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
return Empty.getDefaultInstance();
});
}
@Override
public void createColumn(CreateColumnRequest request, StreamObserver<CreateColumnResponse> responseObserver) {
executor.execute(() -> {
try {
public Mono<CreateColumnResponse> createColumn(CreateColumnRequest request) {
return executeSync(() -> {
var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema()));
var response = CreateColumnResponse.newBuilder().setColumnId(colId).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
return CreateColumnResponse.newBuilder().setColumnId(colId).build();
});
}
@Override
public void deleteColumn(DeleteColumnRequest request, StreamObserver<Empty> responseObserver) {
executor.execute(() -> {
try {
public Mono<Empty> deleteColumn(DeleteColumnRequest request) {
return executeSync(() -> {
api.deleteColumn(request.getColumnId());
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
return Empty.getDefaultInstance();
});
}
@Override
public void getColumnId(GetColumnIdRequest request, StreamObserver<GetColumnIdResponse> responseObserver) {
executor.execute(() -> {
try {
public Mono<GetColumnIdResponse> getColumnId(GetColumnIdRequest request) {
return executeSync(() -> {
var colId = api.getColumnId(request.getName());
var response = GetColumnIdResponse.newBuilder().setColumnId(colId).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
return GetColumnIdResponse.newBuilder().setColumnId(colId).build();
});
}
@Override
public void put(PutRequest request, StreamObserver<Empty> responseObserver) {
executor.execute(() -> {
try {
public Mono<Empty> put(PutRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
api.put(arena,
request.getTransactionOrUpdateId(),
@ -212,209 +184,81 @@ public class GrpcServer extends Server {
new RequestNothing<>()
);
}
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
return Empty.getDefaultInstance();
});
}
@Override
public StreamObserver<PutBatchRequest> putBatch(StreamObserver<Empty> responseObserver) {
final ServerCallStreamObserver<Empty> serverCallStreamObserver =
(ServerCallStreamObserver<Empty>) responseObserver;
serverCallStreamObserver.disableAutoRequest();
serverCallStreamObserver.request(1);
var requestObserver = new StreamObserver<PutBatchRequest>() {
enum State {
BEFORE_INITIAL_REQUEST,
RECEIVING_DATA,
RECEIVED_ALL
public Mono<Empty> putBatch(Flux<PutBatchRequest> request) {
return request.switchOnFirst((firstSignal, nextRequests) -> {
if (firstSignal.isOnNext()) {
var firstValue = firstSignal.get();
assert firstValue != null;
if (!firstValue.hasInitialRequest()) {
return Mono.<Empty>error(RocksDBException.of(
RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request"));
}
private final ExecutorService sstExecutor = Executors.newSingleThreadExecutor();
final AtomicInteger pendingRequests = new AtomicInteger();
State state = State.BEFORE_INITIAL_REQUEST;
private PutBatchInitialRequest initialRequest;
private Subscriber<? super KVBatch> putBatchInputsSubscriber;
@Override
public void onNext(PutBatchRequest putBatchRequest) {
if (state == State.BEFORE_INITIAL_REQUEST) {
if (!putBatchRequest.hasInitialRequest()) {
serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request"));
}
initialRequest = putBatchRequest.getInitialRequest();
try {
asyncApi.putBatchAsync(initialRequest.getColumnId(),
subscriber2 -> {
putBatchInputsSubscriber = subscriber2;
subscriber2.onSubscribe(new Subscription() {
@Override
public void request(long l) {
serverCallStreamObserver.request(Math.toIntExact(l));
}
@Override
public void cancel() {
serverCallStreamObserver.onError(new IOException("Cancelled"));
}
});
},
switch (initialRequest.getMode()) {
var initialRequest = firstValue.getInitialRequest();
var mode = switch (initialRequest.getMode()) {
case WRITE_BATCH -> PutBatchMode.WRITE_BATCH;
case WRITE_BATCH_NO_WAL -> PutBatchMode.WRITE_BATCH_NO_WAL;
case SST_INGESTION -> PutBatchMode.SST_INGESTION;
case SST_INGEST_BEHIND -> PutBatchMode.SST_INGEST_BEHIND;
case UNRECOGNIZED -> throw new UnsupportedOperationException("Unrecognized request \"mode\"");
}
).whenComplete((_, ex) -> {
if (ex != null) {
handleError(serverCallStreamObserver, ex);
} else {
serverCallStreamObserver.onNext(Empty.getDefaultInstance());
serverCallStreamObserver.onCompleted();
}
});
} catch (Throwable ex) {
handleError(serverCallStreamObserver, ex);
}
state = State.RECEIVING_DATA;
} else if (state == State.RECEIVING_DATA) {
pendingRequests.incrementAndGet();
var kvBatch = putBatchRequest.getData();
sstExecutor.execute(() -> {
try {
try (var arena = Arena.ofConfined()) {
putBatchInputsSubscriber.onNext(mapKVBatch(arena, kvBatch.getEntriesCount(), kvBatch::getEntries));
}
checkCompleted(true);
} catch (Throwable ex) {
putBatchInputsSubscriber.onError(ex);
}
});
} else {
serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Invalid request"));
}
}
@Override
public void onError(Throwable throwable) {
sstExecutor.execute(() -> {
state = State.RECEIVED_ALL;
doFinally();
if (putBatchInputsSubscriber != null) {
putBatchInputsSubscriber.onError(throwable);
} else {
serverCallStreamObserver.onError(throwable);
}
});
}
@Override
public void onCompleted() {
sstExecutor.execute(() -> {
if (state == State.BEFORE_INITIAL_REQUEST) {
serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request"));
} else if (state == State.RECEIVING_DATA) {
state = State.RECEIVED_ALL;
checkCompleted(false);
} else {
putBatchInputsSubscriber.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, "Unknown state during onComplete: " + state));
}
});
}
private void checkCompleted(boolean requestDone) {
if ((requestDone ? pendingRequests.decrementAndGet() : pendingRequests.get()) == 0
&& state == State.RECEIVED_ALL) {
doFinally();
putBatchInputsSubscriber.onComplete();
}
}
private void doFinally() {
sstExecutor.shutdown();
}
};
return requestObserver;
var batches = nextRequests.map(putBatchRequest -> {
var batch = putBatchRequest.getData();
return mapKVBatch(Arena.ofAuto(), batch.getEntriesCount(), batch::getEntries);
});
return Mono.fromFuture(asyncApi.putBatchAsync(initialRequest.getColumnId(), batches, mode));
} else if (firstSignal.isOnComplete()) {
return Mono.just(RocksDBException.of(
RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request"));
} else {
return nextRequests;
}
}).then(Mono.just(Empty.getDefaultInstance()));
}
@Override
public StreamObserver<PutMultiRequest> putMulti(StreamObserver<Empty> responseObserver) {
return new StreamObserver<>() {
private boolean initialRequestDone = false;
private long requestsCount = 0;
private boolean requestsCountFinalized;
private final AtomicLong processedRequestsCount = new AtomicLong();
private PutMultiInitialRequest initialRequest;
public Mono<Empty> putMulti(Flux<PutMultiRequest> request) {
return request.switchOnFirst((firstSignal, nextRequests) -> {
if (firstSignal.isOnNext()) {
var firstValue = firstSignal.get();
assert firstValue != null;
if (!firstValue.hasInitialRequest()) {
return Mono.<Empty>error(RocksDBException.of(
RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request"));
}
var initialRequest = firstValue.getInitialRequest();
@Override
public void onNext(PutMultiRequest request) {
switch (request.getPutMultiRequestTypeCase()) {
case INITIALREQUEST -> {
if (initialRequestDone) {
throw new UnsupportedOperationException("Initial request already done!");
}
this.initialRequest = request.getInitialRequest();
this.initialRequestDone = true;
}
case DATA -> {
if (!initialRequestDone) {
throw new UnsupportedOperationException("Initial request already done!");
}
++requestsCount;
executor.execute(() -> {
try {
return nextRequests
.publishOn(executor)
.doOnNext(putRequest -> {
var data = putRequest.getData();
try (var arena = Arena.ofConfined()) {
api.put(arena,
initialRequest.getTransactionOrUpdateId(),
initialRequest.getColumnId(),
mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys),
toMemorySegment(arena, request.getData().getValue()),
mapKeys(arena, data.getKeysCount(), data::getKeys),
toMemorySegment(arena, data.getValue()),
new RequestNothing<>());
}
} catch (RocksDBException ex) {
handleError(responseObserver, ex);
return;
}
var newProcessedRequestCount = processedRequestsCount.incrementAndGet();
if (requestsCountFinalized) {
if (newProcessedRequestCount == requestsCount) {
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
}
});
} else if (firstSignal.isOnComplete()) {
return Mono.just(RocksDBException.of(
RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request"));
} else {
return nextRequests;
}
case null, default ->
throw new UnsupportedOperationException("Unsupported operation: "
+ request.getPutMultiRequestTypeCase());
}
}).then(Mono.just(Empty.getDefaultInstance()));
}
@Override
public void onError(Throwable t) {
responseObserver.onError(t);
}
@Override
public void onCompleted() {
requestsCountFinalized = true;
if (requestsCount == 0) {
responseObserver.onCompleted();
}
}
};
}
@Override
public void putGetPrevious(PutRequest request, StreamObserver<Previous> responseObserver) {
executor.execute(() -> {
try {
public Mono<Previous> putGetPrevious(PutRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
var prev = api.put(arena,
request.getTransactionOrUpdateId(),
@ -427,20 +271,14 @@ public class GrpcServer extends Server {
if (prev != null) {
prevBuilder.setPrevious(ByteString.copyFrom(prev.asByteBuffer()));
}
var response = prevBuilder.build();
responseObserver.onNext(response);
}
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
return prevBuilder.build();
}
});
}
@Override
public void putGetDelta(PutRequest request, StreamObserver<Delta> responseObserver) {
executor.execute(() -> {
try {
public Mono<Delta> putGetDelta(PutRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
var delta = api.put(arena,
request.getTransactionOrUpdateId(),
@ -456,20 +294,14 @@ public class GrpcServer extends Server {
if (delta.current() != null) {
deltaBuilder.setCurrent(ByteString.copyFrom(delta.current().asByteBuffer()));
}
var response = deltaBuilder.build();
responseObserver.onNext(response);
}
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
return deltaBuilder.build();
}
});
}
@Override
public void putGetChanged(PutRequest request, StreamObserver<Changed> responseObserver) {
executor.execute(() -> {
try {
public Mono<Changed> putGetChanged(PutRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
var changed = api.put(arena,
request.getTransactionOrUpdateId(),
@ -478,20 +310,14 @@ public class GrpcServer extends Server {
toMemorySegment(arena, request.getData().getValue()),
new RequestChanged<>()
);
var response = Changed.newBuilder().setChanged(changed).build();
responseObserver.onNext(response);
}
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
return Changed.newBuilder().setChanged(changed).build();
}
});
}
@Override
public void putGetPreviousPresence(PutRequest request, StreamObserver<PreviousPresence> responseObserver) {
executor.execute(() -> {
try {
public Mono<PreviousPresence> putGetPreviousPresence(PutRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
var present = api.put(arena,
request.getTransactionOrUpdateId(),
@ -500,20 +326,14 @@ public class GrpcServer extends Server {
toMemorySegment(arena, request.getData().getValue()),
new RequestPreviousPresence<>()
);
var response = PreviousPresence.newBuilder().setPresent(present).build();
responseObserver.onNext(response);
}
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
return PreviousPresence.newBuilder().setPresent(present).build();
}
});
}
@Override
public void get(GetRequest request, StreamObserver<GetResponse> responseObserver) {
executor.execute(() -> {
try {
public Mono<GetResponse> get(GetRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
var current = api.get(arena,
request.getTransactionOrUpdateId(),
@ -525,20 +345,14 @@ public class GrpcServer extends Server {
if (current != null) {
responseBuilder.setValue(ByteString.copyFrom(current.asByteBuffer()));
}
var response = responseBuilder.build();
responseObserver.onNext(response);
}
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
return responseBuilder.build();
}
});
}
@Override
public void getForUpdate(GetRequest request, StreamObserver<UpdateBegin> responseObserver) {
executor.execute(() -> {
try {
public Mono<UpdateBegin> getForUpdate(GetRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
var forUpdate = api.get(arena,
request.getTransactionOrUpdateId(),
@ -551,20 +365,14 @@ public class GrpcServer extends Server {
if (forUpdate.previous() != null) {
responseBuilder.setPrevious(ByteString.copyFrom(forUpdate.previous().asByteBuffer()));
}
var response = responseBuilder.build();
responseObserver.onNext(response);
}
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
return responseBuilder.build();
}
});
}
@Override
public void exists(GetRequest request, StreamObserver<PreviousPresence> responseObserver) {
executor.execute(() -> {
try {
public Mono<PreviousPresence> exists(GetRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
var exists = api.get(arena,
request.getTransactionOrUpdateId(),
@ -572,19 +380,14 @@ public class GrpcServer extends Server {
mapKeys(arena, request.getKeysCount(), request::getKeys),
new RequestExists<>()
);
responseObserver.onNext(PreviousPresence.newBuilder().setPresent(exists).build());
}
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
return PreviousPresence.newBuilder().setPresent(exists).build();
}
});
}
@Override
public void openIterator(OpenIteratorRequest request, StreamObserver<OpenIteratorResponse> responseObserver) {
executor.execute(() -> {
try {
public Mono<OpenIteratorResponse> openIterator(OpenIteratorRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
var iteratorId = api.openIterator(arena,
request.getTransactionId(),
@ -594,84 +397,58 @@ public class GrpcServer extends Server {
request.getReverse(),
request.getTimeoutMs()
);
responseObserver.onNext(OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build());
}
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
return OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build();
}
});
}
@Override
public void closeIterator(CloseIteratorRequest request, StreamObserver<Empty> responseObserver) {
executor.execute(() -> {
try {
public Mono<Empty> closeIterator(CloseIteratorRequest request) {
return executeSync(() -> {
api.closeIterator(request.getIteratorId());
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
return Empty.getDefaultInstance();
});
}
@Override
public void seekTo(SeekToRequest request, StreamObserver<Empty> responseObserver) {
executor.execute(() -> {
try {
public Mono<Empty> seekTo(SeekToRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys));
}
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
return Empty.getDefaultInstance();
});
}
@Override
public void subsequent(SubsequentRequest request, StreamObserver<Empty> responseObserver) {
executor.execute(() -> {
try {
public Mono<Empty> subsequent(SubsequentRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
api.subsequent(arena, request.getIterationId(),
request.getSkipCount(),
request.getTakeCount(),
new RequestNothing<>());
}
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
return Empty.getDefaultInstance();
});
}
@Override
public void subsequentExists(SubsequentRequest request, StreamObserver<PreviousPresence> responseObserver) {
executor.execute(() -> {
try {
public Mono<PreviousPresence> subsequentExists(SubsequentRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
var exists = api.subsequent(arena, request.getIterationId(),
request.getSkipCount(),
request.getTakeCount(),
new RequestExists<>());
var response = PreviousPresence.newBuilder().setPresent(exists).build();
responseObserver.onNext(response);
}
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
return PreviousPresence.newBuilder().setPresent(exists).build();
}
});
}
@Override
public void subsequentMultiGet(SubsequentRequest request, StreamObserver<KV> responseObserver) {
executor.execute(() -> {
try {
public Flux<KV> subsequentMultiGet(SubsequentRequest request) {
return Flux.create(emitter -> {
try (var arena = Arena.ofConfined()) {
int pageIndex = 0;
final long pageSize = 16L;
@ -685,7 +462,7 @@ public class GrpcServer extends Server {
for (MemorySegment entry : response) {
Keys keys = null; // todo: implement
MemorySegment value = entry;
responseObserver.onNext(KV.newBuilder()
emitter.next(KV.newBuilder()
.addAllKeys(null) // todo: implement
.setValue(ByteString.copyFrom(value.asByteBuffer()))
.build());
@ -693,17 +470,13 @@ public class GrpcServer extends Server {
pageIndex++;
}
}
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
});
emitter.complete();
}, FluxSink.OverflowStrategy.BUFFER);
}
@Override
public void getRangeFirstAndLast(GetRangeRequest request, StreamObserver<FirstAndLast> responseObserver) {
executor.execute(() -> {
try {
public Mono<FirstAndLast> reduceRangeFirstAndLast(GetRangeRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
it.cavallium.rockserver.core.common.FirstAndLast<it.cavallium.rockserver.core.common.KV> firstAndLast
= api.reduceRange(arena,
@ -715,14 +488,10 @@ public class GrpcServer extends Server {
RequestType.firstAndLast(),
request.getTimeoutMs()
);
responseObserver.onNext(FirstAndLast.newBuilder()
return FirstAndLast.newBuilder()
.setFirst(unmapKV(firstAndLast.first()))
.setLast(unmapKV(firstAndLast.last()))
.build());
}
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
.build();
}
});
}
@ -737,6 +506,12 @@ public class GrpcServer extends Server {
}
}
// utils
private <T> Mono<T> executeSync(Callable<T> callable) {
return Mono.fromCallable(callable).subscribeOn(executor);
}
// mappers
private static KV unmapKV(it.cavallium.rockserver.core.common.KV kv) {
@ -859,7 +634,11 @@ public class GrpcServer extends Server {
throw new RuntimeException(e);
}
elg.close();
executor.close();
executor.disposeGracefully().timeout(Duration.ofMinutes(2)).onErrorResume(ex -> {
LOG.error("Grpc server executor shutdown timed out, terminating...", ex);
executor.dispose();
return Mono.empty();
}).block();
super.close();
}
}

View File

@ -124,6 +124,6 @@ service RocksDBService {
rpc subsequent(SubsequentRequest) returns (google.protobuf.Empty);
rpc subsequentExists(SubsequentRequest) returns (PreviousPresence);
rpc subsequentMultiGet(SubsequentRequest) returns (stream KV);
rpc getRangeFirstAndLast(GetRangeRequest) returns (FirstAndLast);
rpc getRangeStream(GetRangeRequest) returns (stream KV);
rpc reduceRangeFirstAndLast(GetRangeRequest) returns (FirstAndLast);
rpc getAllInRange(GetRangeRequest) returns (stream KV);
}

View File

@ -13,7 +13,6 @@ module rockserver.core {
requires io.grpc.protobuf;
requires io.grpc.stub;
requires io.grpc;
requires jsr305;
requires com.google.common;
requires io.grpc.netty;
requires io.jstach.rainbowgum;
@ -30,6 +29,8 @@ module rockserver.core {
requires io.netty.transport.classes.epoll;
requires org.reactivestreams;
requires io.netty.transport.unix.common;
requires reactor.grpc.stub;
requires java.annotation;
exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common;