Fix too many requests

This commit is contained in:
Andrea Cavalli 2024-10-02 18:00:54 +02:00
parent 1afa4c183a
commit b4c610be08

View File

@ -10,17 +10,14 @@ import com.google.protobuf.Empty;
import com.google.protobuf.UnsafeByteOperations; import com.google.protobuf.UnsafeByteOperations;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.stub.*; import io.grpc.stub.*;
import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.ColumnSchema; import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.KVBatch; import it.cavallium.rockserver.core.common.KVBatch;
import it.cavallium.rockserver.core.common.PutBatchMode; import it.cavallium.rockserver.core.common.PutBatchMode;
import it.cavallium.rockserver.core.common.RequestType.RequestChanged; import it.cavallium.rockserver.core.common.RequestType.RequestChanged;
import it.cavallium.rockserver.core.common.RequestType.RequestCurrent;
import it.cavallium.rockserver.core.common.RequestType.RequestDelta; import it.cavallium.rockserver.core.common.RequestType.RequestDelta;
import it.cavallium.rockserver.core.common.RequestType.RequestExists; import it.cavallium.rockserver.core.common.RequestType.RequestExists;
import it.cavallium.rockserver.core.common.RequestType.RequestForUpdate;
import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestMulti; import it.cavallium.rockserver.core.common.RequestType.RequestMulti;
import it.cavallium.rockserver.core.common.RequestType.RequestNothing; import it.cavallium.rockserver.core.common.RequestType.RequestNothing;
@ -35,7 +32,6 @@ import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBS
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub;
import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList;
import java.io.IOException;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import java.net.URI; import java.net.URI;
@ -46,7 +42,6 @@ import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function; import java.util.function.Function;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
@ -274,6 +269,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
var responseobserver = new ClientResponseObserver<PutBatchRequest, Empty>() { var responseobserver = new ClientResponseObserver<PutBatchRequest, Empty>() {
private ClientCallStreamObserver<PutBatchRequest> requestStream; private ClientCallStreamObserver<PutBatchRequest> requestStream;
private Subscription subscription; private Subscription subscription;
private int sendingRequests = 0;
@Override @Override
public void beforeStart(ClientCallStreamObserver<PutBatchRequest> requestStream) { public void beforeStart(ClientCallStreamObserver<PutBatchRequest> requestStream) {
@ -292,11 +288,17 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
@Override @Override
public void onNext(KVBatch batch) { public void onNext(KVBatch batch) {
sendingRequests--;
var request = PutBatchRequest.newBuilder(); var request = PutBatchRequest.newBuilder();
request.setData(mapKVBatch(batch)); request.setData(mapKVBatch(batch));
requestStream.onNext(request.build()); requestStream.onNext(request.build());
request = null;
batch = null;
if (requestStream.isReady()) { if (requestStream.isReady()) {
subscription.request(1); if (sendingRequests == 0) {
sendingRequests++;
subscription.request(1);
}
} }
} }
@ -331,8 +333,11 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
@Override @Override
public void run() { public void run() {
// Start generating values from where we left off on a non-gRPC thread. if (sendingRequests == 0) {
subscription.request(1); // Start generating values from where we left off on a non-gRPC thread.
sendingRequests++;
subscription.request(1);
}
} }
}); });
} }