diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java index 45461ea..a18f176 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -6,11 +6,9 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; -import com.google.protobuf.Empty; import com.google.protobuf.UnsafeByteOperations; import io.grpc.ManagedChannel; import io.grpc.netty.NettyChannelBuilder; -import io.grpc.stub.*; import io.netty.channel.epoll.EpollDomainSocketChannel; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -35,7 +33,6 @@ import it.cavallium.rockserver.core.common.api.proto.*; import it.cavallium.rockserver.core.common.api.proto.ColumnHashType; import it.cavallium.rockserver.core.common.api.proto.Delta; import it.cavallium.rockserver.core.common.api.proto.KV; -import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceBlockingStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub; import it.unimi.dsi.fastutil.ints.Int2ObjectFunction; @@ -53,15 +50,15 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import static it.cavallium.rockserver.core.common.Utils.toMemorySegment; @@ -70,7 +67,6 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { private static final Logger LOG = LoggerFactory.getLogger(GrpcConnection.class); private static final Executor DIRECT_EXECUTOR = MoreExecutors.directExecutor(); private final ManagedChannel channel; - private final RocksDBServiceBlockingStub blockingStub; private final RocksDBServiceStub asyncStub; private final RocksDBServiceFutureStub futureStub; private final ReactorRocksDBServiceGrpc.ReactorRocksDBServiceStub reactiveStub; @@ -100,7 +96,6 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { .channelType(NioSocketChannel.class); } this.channel = channelBuilder.build(); - this.blockingStub = RocksDBServiceGrpc.newBlockingStub(channel); this.asyncStub = RocksDBServiceGrpc.newStub(channel); this.futureStub = RocksDBServiceGrpc.newFutureStub(channel); this.reactiveStub = ReactorRocksDBServiceGrpc.newReactorStub(channel); @@ -227,7 +222,8 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { }; } - @Override + @SuppressWarnings("unchecked") + @Override public CompletableFuture> putMultiAsync(Arena arena, long transactionOrUpdateId, long columnId, @@ -240,23 +236,9 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { + count + " != " + allValues.size()); } - CompletableFuture> responseObserver; - if (requestType instanceof RequestType.RequestNothing && transactionOrUpdateId == 0L) { - return putBatchAsync(columnId, subscriber -> { - var sub = new Subscription() { - @Override - public void request(long l) { - } - - @Override - public void cancel() { - - } - }; - subscriber.onSubscribe(sub); - subscriber.onNext(new KVBatch(allKeys, allValues)); - }, PutBatchMode.WRITE_BATCH).thenApply(_ -> List.of()); + return putBatchAsync(columnId, Flux.just(new KVBatch(allKeys, allValues)), PutBatchMode.WRITE_BATCH) + .thenApply(_ -> List.of()); } var initialRequest = PutMultiRequest.newBuilder() @@ -266,154 +248,46 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { .build()) .build(); - StreamObserver requestPublisher = switch (requestType) { - case RequestNothing _ -> { - var thisResponseObserver = new CollectListStreamObserver(0); - //noinspection unchecked - responseObserver = (CompletableFuture>) (CompletableFuture) thisResponseObserver; - yield this.asyncStub.putMulti(thisResponseObserver); - } - case RequestPrevious _ -> { - var thisResponseObserver = new CollectListMappedStreamObserver( - GrpcConnection::mapPrevious, count); - //noinspection unchecked - responseObserver = (CompletableFuture>) (CompletableFuture) thisResponseObserver; - yield this.asyncStub.putMultiGetPrevious(thisResponseObserver); - } - case RequestDelta _ -> { - var thisResponseObserver = new CollectListMappedStreamObserver<>(GrpcConnection::mapDelta, count); - //noinspection unchecked - responseObserver = (CompletableFuture>) (CompletableFuture) thisResponseObserver; - yield this.asyncStub.putMultiGetDelta(thisResponseObserver); - } - case RequestChanged _ -> { - var thisResponseObserver = new CollectListMappedStreamObserver<>(Changed::getChanged, count); - //noinspection unchecked - responseObserver = (CompletableFuture>) (CompletableFuture) thisResponseObserver; - yield this.asyncStub.putMultiGetChanged(thisResponseObserver); - } - case RequestPreviousPresence _ -> { - var thisResponseObserver = new CollectListMappedStreamObserver<>(PreviousPresence::getPresent, count); - //noinspection unchecked - responseObserver = (CompletableFuture>) (CompletableFuture) thisResponseObserver; - yield this.asyncStub.putMultiGetPreviousPresence(thisResponseObserver); - } - }; + Mono initialRequestMono = Mono.just(initialRequest); + Flux dataRequestsFlux = Flux.fromIterable(() -> GrpcConnection + .map(allKeys.iterator(), allValues.iterator(), (keys, value) -> PutMultiRequest.newBuilder() + .setData(mapKV(keys, value)) + .build())); + var inputRequests = initialRequestMono.concatWith(dataRequestsFlux); - requestPublisher.onNext(initialRequest); - - var it1 = allKeys.iterator(); - var it2 = allValues.iterator(); - - while (it1.hasNext()) { - var keys = it1.next(); - var value = it2.next(); - requestPublisher.onNext(PutMultiRequest.newBuilder() - .setData(mapKV(keys, value)) - .build()); - } - - return responseObserver; + return (CompletableFuture>) (switch (requestType) { + case RequestNothing _ -> + this.reactiveStub.putMulti(inputRequests) + .ignoreElement() + .toFuture(); + case RequestPrevious _ -> + this.reactiveStub.putMultiGetPrevious(inputRequests) + .collect(() -> new ArrayList<@Nullable MemorySegment>(), + (list, value) -> list.add(GrpcConnection.mapPrevious(value))) + .toFuture(); + case RequestDelta _ -> + this.reactiveStub.putMultiGetDelta(inputRequests) + .map(GrpcConnection::mapDelta) + .collectList() + .toFuture(); + case RequestChanged _ -> + this.reactiveStub.putMultiGetChanged(inputRequests) + .map(Changed::getChanged) + .collectList() + .toFuture(); + case RequestPreviousPresence _ -> + this.reactiveStub.putMultiGetPreviousPresence(inputRequests) + .map(PreviousPresence::getPresent) + .collectList() + .toFuture(); + }); } @Override public CompletableFuture putBatchAsync(long columnId, @NotNull Publisher<@NotNull KVBatch> batchPublisher, @NotNull PutBatchMode mode) throws RocksDBException { - var cf = new CompletableFuture(); - var responseobserver = new ClientResponseObserver() { - private ClientCallStreamObserver requestStream; - private Subscription subscription; - private int sendingRequests = 0; - - @Override - public void beforeStart(ClientCallStreamObserver requestStream) { - this.requestStream = requestStream; - // Set up manual flow control for the response stream. It feels backwards to configure the response - // stream's flow control using the request stream's observer, but this is the way it is. - requestStream.disableAutoRequestWithInitial(1); - - var subscriber = new Subscriber() { - private volatile boolean finalized; - - @Override - public void onSubscribe(Subscription subscription2) { - subscription = subscription2; - } - - @Override - public void onNext(KVBatch batch) { - sendingRequests--; - var request = PutBatchRequest.newBuilder(); - request.setData(mapKVBatch(batch)); - requestStream.onNext(request.build()); - request = null; - batch = null; - if (requestStream.isReady()) { - if (sendingRequests == 0) { - sendingRequests++; - subscription.request(1); - } - } - } - - @Override - public void onError(Throwable throwable) { - this.finalized = true; - requestStream.onError(throwable); - } - - @Override - public void onComplete() { - this.finalized = true; - requestStream.onCompleted(); - } - }; - - - batchPublisher.subscribe(subscriber); - - // Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked - // when the consuming side has enough buffer space to receive more messages. - // - // Messages are serialized into a transport-specific transmit buffer. Depending on the size of this buffer, - // MANY messages may be buffered, however, they haven't yet been sent to the server. The server must call - // request() to pull a buffered message from the client. - // - // Note: the onReadyHandler's invocation is serialized on the same thread pool as the incoming - // StreamObserver's onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent - // additional messages from being processed by the incoming StreamObserver. The onReadyHandler must return - // in a timely manner or else message processing throughput will suffer. - requestStream.setOnReadyHandler(new Runnable() { - - @Override - public void run() { - if (sendingRequests == 0) { - // Start generating values from where we left off on a non-gRPC thread. - sendingRequests++; - subscription.request(1); - } - } - }); - } - - @Override - public void onNext(Empty empty) {} - - @Override - public void onError(Throwable throwable) { - cf.completeExceptionally(throwable); - } - - @Override - public void onCompleted() { - cf.complete(null); - } - }; - - var requestStream = asyncStub.putBatch(responseobserver); - - requestStream.onNext(PutBatchRequest.newBuilder() + var initialRequest = Mono.just(PutBatchRequest.newBuilder() .setInitialRequest(PutBatchInitialRequest.newBuilder() .setColumnId(columnId) .setMode(switch (mode) { @@ -424,8 +298,13 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { }) .build()) .build()); - - return cf; + var nextRequests = Flux.from(batchPublisher).map(batch -> { + var request = PutBatchRequest.newBuilder(); + request.setData(mapKVBatch(batch)); + return request.build(); + }); + var inputFlux = initialRequest.concatWith(nextRequests); + return reactiveStub.putBatch(inputFlux).then().toFuture(); } @SuppressWarnings("unchecked") @@ -560,6 +439,18 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { ); } + public static Iterator map(Iterator a, Iterator b, BiFunction f) { + return new Iterator<>() { + public boolean hasNext() { + return a.hasNext() && b.hasNext(); // This uses the shorter of the two `Iterator`s. + } + + public C next() { + return f.apply(a.next(), b.next()); + } + }; + } + @Nullable private static MemorySegment mapPrevious(Previous x) { return x.hasPrevious() ? mapByteString(x.getPrevious()) : null; diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index a44d4cc..2305ce8 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -37,7 +37,6 @@ import it.cavallium.rockserver.core.common.api.proto.*; import it.cavallium.rockserver.core.common.api.proto.Delta; import it.cavallium.rockserver.core.common.api.proto.FirstAndLast; import it.cavallium.rockserver.core.common.api.proto.KV; -import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceImplBase; import it.unimi.dsi.fastutil.ints.Int2IntFunction; import it.unimi.dsi.fastutil.ints.Int2ObjectFunction; import it.unimi.dsi.fastutil.ints.IntArrayList; @@ -53,15 +52,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux;