reactive GRPC connection

This commit is contained in:
Andrea Cavalli 2024-10-22 00:45:46 +02:00
parent b8b552cb18
commit c6b4e62d74
2 changed files with 58 additions and 174 deletions

View File

@ -6,11 +6,9 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.*;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@ -35,7 +33,6 @@ import it.cavallium.rockserver.core.common.api.proto.*;
import it.cavallium.rockserver.core.common.api.proto.ColumnHashType;
import it.cavallium.rockserver.core.common.api.proto.Delta;
import it.cavallium.rockserver.core.common.api.proto.KV;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceBlockingStub;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub;
import it.unimi.dsi.fastutil.ints.Int2ObjectFunction;
@ -53,15 +50,15 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import static it.cavallium.rockserver.core.common.Utils.toMemorySegment;
@ -70,7 +67,6 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
private static final Logger LOG = LoggerFactory.getLogger(GrpcConnection.class);
private static final Executor DIRECT_EXECUTOR = MoreExecutors.directExecutor();
private final ManagedChannel channel;
private final RocksDBServiceBlockingStub blockingStub;
private final RocksDBServiceStub asyncStub;
private final RocksDBServiceFutureStub futureStub;
private final ReactorRocksDBServiceGrpc.ReactorRocksDBServiceStub reactiveStub;
@ -100,7 +96,6 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
.channelType(NioSocketChannel.class);
}
this.channel = channelBuilder.build();
this.blockingStub = RocksDBServiceGrpc.newBlockingStub(channel);
this.asyncStub = RocksDBServiceGrpc.newStub(channel);
this.futureStub = RocksDBServiceGrpc.newFutureStub(channel);
this.reactiveStub = ReactorRocksDBServiceGrpc.newReactorStub(channel);
@ -227,7 +222,8 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
};
}
@Override
@SuppressWarnings("unchecked")
@Override
public <T> CompletableFuture<List<T>> putMultiAsync(Arena arena,
long transactionOrUpdateId,
long columnId,
@ -240,23 +236,9 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
+ count + " != " + allValues.size());
}
CompletableFuture<List<T>> responseObserver;
if (requestType instanceof RequestType.RequestNothing<?> && transactionOrUpdateId == 0L) {
return putBatchAsync(columnId, subscriber -> {
var sub = new Subscription() {
@Override
public void request(long l) {
}
@Override
public void cancel() {
}
};
subscriber.onSubscribe(sub);
subscriber.onNext(new KVBatch(allKeys, allValues));
}, PutBatchMode.WRITE_BATCH).thenApply(_ -> List.of());
return putBatchAsync(columnId, Flux.just(new KVBatch(allKeys, allValues)), PutBatchMode.WRITE_BATCH)
.thenApply(_ -> List.of());
}
var initialRequest = PutMultiRequest.newBuilder()
@ -266,154 +248,46 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
.build())
.build();
StreamObserver<PutMultiRequest> requestPublisher = switch (requestType) {
case RequestNothing<?> _ -> {
var thisResponseObserver = new CollectListStreamObserver<Empty>(0);
//noinspection unchecked
responseObserver = (CompletableFuture<List<T>>) (CompletableFuture<?>) thisResponseObserver;
yield this.asyncStub.putMulti(thisResponseObserver);
}
case RequestPrevious<?> _ -> {
var thisResponseObserver = new CollectListMappedStreamObserver<Previous, @Nullable MemorySegment>(
GrpcConnection::mapPrevious, count);
//noinspection unchecked
responseObserver = (CompletableFuture<List<T>>) (CompletableFuture<?>) thisResponseObserver;
yield this.asyncStub.putMultiGetPrevious(thisResponseObserver);
}
case RequestDelta<?> _ -> {
var thisResponseObserver = new CollectListMappedStreamObserver<>(GrpcConnection::mapDelta, count);
//noinspection unchecked
responseObserver = (CompletableFuture<List<T>>) (CompletableFuture<?>) thisResponseObserver;
yield this.asyncStub.putMultiGetDelta(thisResponseObserver);
}
case RequestChanged<?> _ -> {
var thisResponseObserver = new CollectListMappedStreamObserver<>(Changed::getChanged, count);
//noinspection unchecked
responseObserver = (CompletableFuture<List<T>>) (CompletableFuture<?>) thisResponseObserver;
yield this.asyncStub.putMultiGetChanged(thisResponseObserver);
}
case RequestPreviousPresence<?> _ -> {
var thisResponseObserver = new CollectListMappedStreamObserver<>(PreviousPresence::getPresent, count);
//noinspection unchecked
responseObserver = (CompletableFuture<List<T>>) (CompletableFuture<?>) thisResponseObserver;
yield this.asyncStub.putMultiGetPreviousPresence(thisResponseObserver);
}
};
Mono<PutMultiRequest> initialRequestMono = Mono.just(initialRequest);
Flux<PutMultiRequest> dataRequestsFlux = Flux.fromIterable(() -> GrpcConnection
.map(allKeys.iterator(), allValues.iterator(), (keys, value) -> PutMultiRequest.newBuilder()
.setData(mapKV(keys, value))
.build()));
var inputRequests = initialRequestMono.concatWith(dataRequestsFlux);
requestPublisher.onNext(initialRequest);
var it1 = allKeys.iterator();
var it2 = allValues.iterator();
while (it1.hasNext()) {
var keys = it1.next();
var value = it2.next();
requestPublisher.onNext(PutMultiRequest.newBuilder()
.setData(mapKV(keys, value))
.build());
}
return responseObserver;
return (CompletableFuture<List<T>>) (switch (requestType) {
case RequestNothing<?> _ ->
this.reactiveStub.putMulti(inputRequests)
.ignoreElement()
.toFuture();
case RequestPrevious<?> _ ->
this.reactiveStub.putMultiGetPrevious(inputRequests)
.collect(() -> new ArrayList<@Nullable MemorySegment>(),
(list, value) -> list.add(GrpcConnection.mapPrevious(value)))
.toFuture();
case RequestDelta<?> _ ->
this.reactiveStub.putMultiGetDelta(inputRequests)
.map(GrpcConnection::mapDelta)
.collectList()
.toFuture();
case RequestChanged<?> _ ->
this.reactiveStub.putMultiGetChanged(inputRequests)
.map(Changed::getChanged)
.collectList()
.toFuture();
case RequestPreviousPresence<?> _ ->
this.reactiveStub.putMultiGetPreviousPresence(inputRequests)
.map(PreviousPresence::getPresent)
.collectList()
.toFuture();
});
}
@Override
public CompletableFuture<Void> putBatchAsync(long columnId,
@NotNull Publisher<@NotNull KVBatch> batchPublisher,
@NotNull PutBatchMode mode) throws RocksDBException {
var cf = new CompletableFuture<Void>();
var responseobserver = new ClientResponseObserver<PutBatchRequest, Empty>() {
private ClientCallStreamObserver<PutBatchRequest> requestStream;
private Subscription subscription;
private int sendingRequests = 0;
@Override
public void beforeStart(ClientCallStreamObserver<PutBatchRequest> requestStream) {
this.requestStream = requestStream;
// Set up manual flow control for the response stream. It feels backwards to configure the response
// stream's flow control using the request stream's observer, but this is the way it is.
requestStream.disableAutoRequestWithInitial(1);
var subscriber = new Subscriber<KVBatch>() {
private volatile boolean finalized;
@Override
public void onSubscribe(Subscription subscription2) {
subscription = subscription2;
}
@Override
public void onNext(KVBatch batch) {
sendingRequests--;
var request = PutBatchRequest.newBuilder();
request.setData(mapKVBatch(batch));
requestStream.onNext(request.build());
request = null;
batch = null;
if (requestStream.isReady()) {
if (sendingRequests == 0) {
sendingRequests++;
subscription.request(1);
}
}
}
@Override
public void onError(Throwable throwable) {
this.finalized = true;
requestStream.onError(throwable);
}
@Override
public void onComplete() {
this.finalized = true;
requestStream.onCompleted();
}
};
batchPublisher.subscribe(subscriber);
// Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked
// when the consuming side has enough buffer space to receive more messages.
//
// Messages are serialized into a transport-specific transmit buffer. Depending on the size of this buffer,
// MANY messages may be buffered, however, they haven't yet been sent to the server. The server must call
// request() to pull a buffered message from the client.
//
// Note: the onReadyHandler's invocation is serialized on the same thread pool as the incoming
// StreamObserver's onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent
// additional messages from being processed by the incoming StreamObserver. The onReadyHandler must return
// in a timely manner or else message processing throughput will suffer.
requestStream.setOnReadyHandler(new Runnable() {
@Override
public void run() {
if (sendingRequests == 0) {
// Start generating values from where we left off on a non-gRPC thread.
sendingRequests++;
subscription.request(1);
}
}
});
}
@Override
public void onNext(Empty empty) {}
@Override
public void onError(Throwable throwable) {
cf.completeExceptionally(throwable);
}
@Override
public void onCompleted() {
cf.complete(null);
}
};
var requestStream = asyncStub.putBatch(responseobserver);
requestStream.onNext(PutBatchRequest.newBuilder()
var initialRequest = Mono.just(PutBatchRequest.newBuilder()
.setInitialRequest(PutBatchInitialRequest.newBuilder()
.setColumnId(columnId)
.setMode(switch (mode) {
@ -424,8 +298,13 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
})
.build())
.build());
return cf;
var nextRequests = Flux.from(batchPublisher).map(batch -> {
var request = PutBatchRequest.newBuilder();
request.setData(mapKVBatch(batch));
return request.build();
});
var inputFlux = initialRequest.concatWith(nextRequests);
return reactiveStub.putBatch(inputFlux).then().toFuture();
}
@SuppressWarnings("unchecked")
@ -560,6 +439,18 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
);
}
public static <A, B, C> Iterator<C> map(Iterator<A> a, Iterator<B> b, BiFunction<A, B, C> f) {
return new Iterator<>() {
public boolean hasNext() {
return a.hasNext() && b.hasNext(); // This uses the shorter of the two `Iterator`s.
}
public C next() {
return f.apply(a.next(), b.next());
}
};
}
@Nullable
private static MemorySegment mapPrevious(Previous x) {
return x.hasPrevious() ? mapByteString(x.getPrevious()) : null;

View File

@ -37,7 +37,6 @@ import it.cavallium.rockserver.core.common.api.proto.*;
import it.cavallium.rockserver.core.common.api.proto.Delta;
import it.cavallium.rockserver.core.common.api.proto.FirstAndLast;
import it.cavallium.rockserver.core.common.api.proto.KV;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceImplBase;
import it.unimi.dsi.fastutil.ints.Int2IntFunction;
import it.unimi.dsi.fastutil.ints.Int2ObjectFunction;
import it.unimi.dsi.fastutil.ints.IntArrayList;
@ -53,15 +52,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;