diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java index 82d4133..8acc2c1 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -10,17 +10,14 @@ import com.google.protobuf.Empty; import com.google.protobuf.UnsafeByteOperations; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.Status; import io.grpc.stub.*; import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.ColumnSchema; import it.cavallium.rockserver.core.common.KVBatch; import it.cavallium.rockserver.core.common.PutBatchMode; import it.cavallium.rockserver.core.common.RequestType.RequestChanged; -import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; import it.cavallium.rockserver.core.common.RequestType.RequestDelta; import it.cavallium.rockserver.core.common.RequestType.RequestExists; -import it.cavallium.rockserver.core.common.RequestType.RequestForUpdate; import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestMulti; import it.cavallium.rockserver.core.common.RequestType.RequestNothing; @@ -35,7 +32,6 @@ import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBS import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub; import it.unimi.dsi.fastutil.ints.IntArrayList; -import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.net.URI; @@ -46,7 +42,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -274,6 +269,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { var responseobserver = new ClientResponseObserver() { private ClientCallStreamObserver requestStream; private Subscription subscription; + private int sendingRequests = 0; @Override public void beforeStart(ClientCallStreamObserver requestStream) { @@ -292,11 +288,17 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { @Override public void onNext(KVBatch batch) { + sendingRequests--; var request = PutBatchRequest.newBuilder(); request.setData(mapKVBatch(batch)); requestStream.onNext(request.build()); + request = null; + batch = null; if (requestStream.isReady()) { - subscription.request(1); + if (sendingRequests == 0) { + sendingRequests++; + subscription.request(1); + } } } @@ -331,8 +333,11 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { @Override public void run() { - // Start generating values from where we left off on a non-gRPC thread. - subscription.request(1); + if (sendingRequests == 0) { + // Start generating values from where we left off on a non-gRPC thread. + sendingRequests++; + subscription.request(1); + } } }); }