Implement grpc client connection
This commit is contained in:
parent
2a91716819
commit
044c4e04bd
@ -4,6 +4,7 @@ import static it.cavallium.rockserver.core.client.EmbeddedConnection.PRIVATE_MEM
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
import it.cavallium.rockserver.core.common.Utils;
|
||||
import it.cavallium.rockserver.core.common.Utils.HostAndPort;
|
||||
import it.cavallium.rockserver.core.impl.rocksdb.RocksDBLoader;
|
||||
import it.cavallium.rockserver.core.server.ServerBuilder;
|
||||
import java.io.IOException;
|
||||
@ -95,6 +96,10 @@ public class Main {
|
||||
case "unix" -> clientBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(databaseUrl.getPath())));
|
||||
case "file" -> clientBuilder.setEmbeddedPath(Path.of((databaseUrl.getAuthority() != null ? databaseUrl.getAuthority() : "") + databaseUrl.getPath()).normalize());
|
||||
case "memory" -> clientBuilder.setEmbeddedInMemory(true);
|
||||
case "http" -> {
|
||||
clientBuilder.setHttpAddress(Utils.parseHostAndPort(databaseUrl));
|
||||
clientBuilder.setUseThrift(false);
|
||||
}
|
||||
case "rocksdb" -> clientBuilder.setAddress(Utils.parseHostAndPort(databaseUrl));
|
||||
case null, default -> throw new IllegalArgumentException("Invalid scheme \"" + databaseUrlScheme + "\" for database url url: " + databaseUrl);
|
||||
}
|
||||
@ -130,7 +135,7 @@ public class Main {
|
||||
switch (thriftListenUrlScheme) {
|
||||
case "unix" -> serverBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(listenUrl.getPath())));
|
||||
case "http" -> {
|
||||
serverBuilder.setHttpAddress(listenUrl.getHost(), Utils.parsePort(listenUrl));
|
||||
serverBuilder.setHttpAddress(new HostAndPort(listenUrl.getHost(), Utils.parsePort(listenUrl)));
|
||||
serverBuilder.setUseThrift(useThrift);
|
||||
}
|
||||
case "rocksdb" -> serverBuilder.setAddress(Utils.parseHostAndPort(listenUrl));
|
||||
|
@ -1,18 +1,20 @@
|
||||
package it.cavallium.rockserver.core.client;
|
||||
|
||||
import it.cavallium.rockserver.core.common.Utils.HostAndPort;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnixDomainSocketAddress;
|
||||
import java.nio.file.Path;
|
||||
|
||||
public class ClientBuilder {
|
||||
|
||||
private InetSocketAddress iNetAddress;
|
||||
private HostAndPort httpAddress;
|
||||
private HostAndPort iNetAddress;
|
||||
private UnixDomainSocketAddress unixAddress;
|
||||
private Path embeddedPath;
|
||||
private String name;
|
||||
private Path embeddedConfig;
|
||||
private boolean embeddedInMemory;
|
||||
private boolean useThrift;
|
||||
|
||||
public void setEmbeddedPath(Path path) {
|
||||
this.embeddedPath = path;
|
||||
@ -26,10 +28,18 @@ public class ClientBuilder {
|
||||
this.unixAddress = address;
|
||||
}
|
||||
|
||||
public void setAddress(InetSocketAddress address) {
|
||||
public void setHttpAddress(HostAndPort httpAddress) {
|
||||
this.httpAddress = httpAddress;
|
||||
}
|
||||
|
||||
public void setAddress(HostAndPort address) {
|
||||
this.iNetAddress = address;
|
||||
}
|
||||
|
||||
public void setUseThrift(boolean useThrift) {
|
||||
this.useThrift = useThrift;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
@ -45,6 +55,12 @@ public class ClientBuilder {
|
||||
return new EmbeddedConnection(embeddedPath, name, embeddedConfig);
|
||||
} else if (unixAddress != null) {
|
||||
throw new UnsupportedOperationException("Not implemented: unix socket");
|
||||
} else if (httpAddress != null) {
|
||||
if (useThrift) {
|
||||
throw new UnsupportedOperationException("Not implemented: thrift http2 address");
|
||||
} else {
|
||||
return new GrpcConnection(name, httpAddress);
|
||||
}
|
||||
} else if (iNetAddress != null) {
|
||||
throw new UnsupportedOperationException("Not implemented: inet address");
|
||||
} else {
|
||||
|
@ -0,0 +1,38 @@
|
||||
package it.cavallium.rockserver.core.client;
|
||||
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Function;
|
||||
|
||||
class CollectListMappedStreamObserver<T, U> extends CompletableFuture<List<U>> implements StreamObserver<T> {
|
||||
|
||||
private final Function<T, U> mapper;
|
||||
private final List<U> list;
|
||||
|
||||
public CollectListMappedStreamObserver(Function<T, U> mapper) {
|
||||
this.mapper = mapper;
|
||||
this.list = new ArrayList<>();
|
||||
}
|
||||
|
||||
public CollectListMappedStreamObserver(Function<T, U> mapper, int size) {
|
||||
this.mapper = mapper;
|
||||
this.list = new ArrayList<>(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(T t) {
|
||||
this.list.add(mapper.apply(t));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
this.completeExceptionally(throwable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
this.complete(this.list);
|
||||
}
|
||||
}
|
@ -0,0 +1,34 @@
|
||||
package it.cavallium.rockserver.core.client;
|
||||
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
class CollectListStreamObserver<T> extends CompletableFuture<List<T>> implements StreamObserver<T> {
|
||||
|
||||
private final List<T> list;
|
||||
|
||||
public CollectListStreamObserver() {
|
||||
this.list = new ArrayList<>();
|
||||
}
|
||||
|
||||
public CollectListStreamObserver(int size) {
|
||||
this.list = new ArrayList<>(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(T t) {
|
||||
this.list.add(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
this.completeExceptionally(throwable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
this.complete(this.list);
|
||||
}
|
||||
}
|
@ -0,0 +1,468 @@
|
||||
package it.cavallium.rockserver.core.client;
|
||||
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Empty;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import it.cavallium.rockserver.core.common.ColumnSchema;
|
||||
import it.cavallium.rockserver.core.common.Keys;
|
||||
import it.cavallium.rockserver.core.common.RequestType;
|
||||
import it.cavallium.rockserver.core.common.RequestType.RequestChanged;
|
||||
import it.cavallium.rockserver.core.common.RequestType.RequestCurrent;
|
||||
import it.cavallium.rockserver.core.common.RequestType.RequestDelta;
|
||||
import it.cavallium.rockserver.core.common.RequestType.RequestExists;
|
||||
import it.cavallium.rockserver.core.common.RequestType.RequestForUpdate;
|
||||
import it.cavallium.rockserver.core.common.RequestType.RequestGet;
|
||||
import it.cavallium.rockserver.core.common.RequestType.RequestMulti;
|
||||
import it.cavallium.rockserver.core.common.RequestType.RequestNothing;
|
||||
import it.cavallium.rockserver.core.common.RequestType.RequestPrevious;
|
||||
import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence;
|
||||
import it.cavallium.rockserver.core.common.RequestType.RequestPut;
|
||||
import it.cavallium.rockserver.core.common.RocksDBAPI;
|
||||
import it.cavallium.rockserver.core.common.RocksDBAPICommand;
|
||||
import it.cavallium.rockserver.core.common.RocksDBAsyncAPI;
|
||||
import it.cavallium.rockserver.core.common.RocksDBException;
|
||||
import it.cavallium.rockserver.core.common.RocksDBSyncAPI;
|
||||
import it.cavallium.rockserver.core.common.UpdateContext;
|
||||
import it.cavallium.rockserver.core.common.Utils.HostAndPort;
|
||||
import it.cavallium.rockserver.core.common.api.proto.Changed;
|
||||
import it.cavallium.rockserver.core.common.api.proto.CloseFailedUpdateRequest;
|
||||
import it.cavallium.rockserver.core.common.api.proto.CloseIteratorRequest;
|
||||
import it.cavallium.rockserver.core.common.api.proto.CloseTransactionRequest;
|
||||
import it.cavallium.rockserver.core.common.api.proto.CloseTransactionResponse;
|
||||
import it.cavallium.rockserver.core.common.api.proto.ColumnHashType;
|
||||
import it.cavallium.rockserver.core.common.api.proto.CreateColumnRequest;
|
||||
import it.cavallium.rockserver.core.common.api.proto.CreateColumnResponse;
|
||||
import it.cavallium.rockserver.core.common.api.proto.DeleteColumnRequest;
|
||||
import it.cavallium.rockserver.core.common.api.proto.Delta;
|
||||
import it.cavallium.rockserver.core.common.api.proto.GetColumnIdRequest;
|
||||
import it.cavallium.rockserver.core.common.api.proto.GetColumnIdResponse;
|
||||
import it.cavallium.rockserver.core.common.api.proto.GetRequest;
|
||||
import it.cavallium.rockserver.core.common.api.proto.GetResponse;
|
||||
import it.cavallium.rockserver.core.common.api.proto.KV;
|
||||
import it.cavallium.rockserver.core.common.api.proto.OpenIteratorRequest;
|
||||
import it.cavallium.rockserver.core.common.api.proto.OpenIteratorResponse;
|
||||
import it.cavallium.rockserver.core.common.api.proto.OpenTransactionRequest;
|
||||
import it.cavallium.rockserver.core.common.api.proto.OpenTransactionResponse;
|
||||
import it.cavallium.rockserver.core.common.api.proto.Previous;
|
||||
import it.cavallium.rockserver.core.common.api.proto.PreviousPresence;
|
||||
import it.cavallium.rockserver.core.common.api.proto.PutMultiInitialRequest;
|
||||
import it.cavallium.rockserver.core.common.api.proto.PutMultiRequest;
|
||||
import it.cavallium.rockserver.core.common.api.proto.PutRequest;
|
||||
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc;
|
||||
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceBlockingStub;
|
||||
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub;
|
||||
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub;
|
||||
import it.cavallium.rockserver.core.common.api.proto.SeekToRequest;
|
||||
import it.cavallium.rockserver.core.common.api.proto.SubsequentRequest;
|
||||
import it.unimi.dsi.fastutil.ints.IntArrayList;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
public class GrpcConnection extends BaseConnection implements RocksDBAPI {
|
||||
|
||||
private static final Executor DIRECT_EXECUTOR = MoreExecutors.directExecutor();
|
||||
private final ManagedChannel channel;
|
||||
private final RocksDBServiceBlockingStub blockingStub;
|
||||
private final RocksDBServiceStub asyncStub;
|
||||
private final RocksDBServiceFutureStub futureStub;
|
||||
private final URI address;
|
||||
|
||||
public GrpcConnection(String name, HostAndPort address) {
|
||||
super(name);
|
||||
var channelBuilder = ManagedChannelBuilder
|
||||
.forTarget(address.toString())
|
||||
.usePlaintext();
|
||||
this.channel = channelBuilder.build();
|
||||
this.blockingStub = RocksDBServiceGrpc.newBlockingStub(channel);
|
||||
this.asyncStub = RocksDBServiceGrpc.newStub(channel);
|
||||
this.futureStub = RocksDBServiceGrpc.newFutureStub(channel);
|
||||
this.address = URI.create("http://" + address.host() + ":" + address.port());
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getUrl() {
|
||||
return address;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RocksDBSyncAPI getSyncApi() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RocksDBAsyncAPI getAsyncApi() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> R requestSync(RocksDBAPICommand<R> req) {
|
||||
var asyncResponse = requestAsync(req);
|
||||
return asyncResponse
|
||||
.toCompletableFuture()
|
||||
.join();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Long> openTransactionAsync(long timeoutMs) throws RocksDBException {
|
||||
var request = OpenTransactionRequest.newBuilder()
|
||||
.setTimeoutMs(timeoutMs)
|
||||
.build();
|
||||
return toResponse(this.futureStub.openTransaction(request), OpenTransactionResponse::getTransactionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Boolean> closeTransactionAsync(long transactionId, boolean commit) throws RocksDBException {
|
||||
var request = CloseTransactionRequest.newBuilder()
|
||||
.setTransactionId(transactionId)
|
||||
.setCommit(commit)
|
||||
.build();
|
||||
return toResponse(this.futureStub.closeTransaction(request), CloseTransactionResponse::getSuccessful);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Void> closeFailedUpdateAsync(long updateId) throws RocksDBException {
|
||||
var request = CloseFailedUpdateRequest.newBuilder()
|
||||
.setUpdateId(updateId)
|
||||
.build();
|
||||
return toResponse(this.futureStub.closeFailedUpdate(request), _ -> null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Long> createColumnAsync(String name, @NotNull ColumnSchema schema) throws RocksDBException {
|
||||
var request = CreateColumnRequest.newBuilder()
|
||||
.setName(name)
|
||||
.setSchema(mapColumnSchema(schema))
|
||||
.build();
|
||||
return toResponse(this.futureStub.createColumn(request), CreateColumnResponse::getColumnId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Void> deleteColumnAsync(long columnId) throws RocksDBException {
|
||||
var request = DeleteColumnRequest.newBuilder()
|
||||
.setColumnId(columnId)
|
||||
.build();
|
||||
return toResponse(this.futureStub.deleteColumn(request), _ -> null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Long> getColumnIdAsync(@NotNull String name) throws RocksDBException {
|
||||
var request = GetColumnIdRequest.newBuilder()
|
||||
.setName(name)
|
||||
.build();
|
||||
return toResponse(this.futureStub.getColumnId(request), GetColumnIdResponse::getColumnId);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T> CompletionStage<T> putAsync(Arena arena,
|
||||
long transactionOrUpdateId,
|
||||
long columnId,
|
||||
@NotNull Keys keys,
|
||||
@NotNull MemorySegment value,
|
||||
RequestPut<? super MemorySegment, T> requestType) throws RocksDBException {
|
||||
var request = PutRequest.newBuilder()
|
||||
.setTransactionOrUpdateId(transactionOrUpdateId)
|
||||
.setColumnId(columnId)
|
||||
.setData(mapKV(keys, value))
|
||||
.build();
|
||||
return (CompletionStage<T>) switch (requestType) {
|
||||
case RequestNothing<?> _ -> toResponse(this.futureStub.put(request), _ -> null);
|
||||
case RequestPrevious<?> _ ->
|
||||
toResponse(this.futureStub.putGetPrevious(request), GrpcConnection::mapPrevious);
|
||||
case RequestDelta<?> _ ->
|
||||
toResponse(this.futureStub.putGetDelta(request), GrpcConnection::mapDelta);
|
||||
case RequestChanged<?> _ ->
|
||||
toResponse(this.futureStub.putGetChanged(request), Changed::getChanged);
|
||||
case RequestType.RequestPreviousPresence<?> _ ->
|
||||
toResponse(this.futureStub.putGetPreviousPresence(request), PreviousPresence::getPresent);
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletionStage<List<T>> putMultiAsync(Arena arena,
|
||||
long transactionOrUpdateId,
|
||||
long columnId,
|
||||
@NotNull List<@NotNull Keys> allKeys,
|
||||
@NotNull List<@NotNull MemorySegment> allValues,
|
||||
RequestPut<? super MemorySegment, T> requestType) throws RocksDBException {
|
||||
var count = allKeys.size();
|
||||
if (count != allValues.size()) {
|
||||
throw new IllegalArgumentException("Keys length is different than values length! "
|
||||
+ count + " != " + allValues.size());
|
||||
}
|
||||
|
||||
var initialRequest = PutMultiRequest.newBuilder()
|
||||
.setInitialRequest(PutMultiInitialRequest.newBuilder()
|
||||
.setTransactionOrUpdateId(transactionOrUpdateId)
|
||||
.setColumnId(columnId)
|
||||
.build())
|
||||
.build();
|
||||
|
||||
CompletableFuture<List<T>> responseObserver;
|
||||
|
||||
StreamObserver<PutMultiRequest> requestPublisher = switch (requestType) {
|
||||
case RequestNothing<?> _ -> {
|
||||
var thisResponseObserver = new CollectListStreamObserver<Empty>(0);
|
||||
//noinspection unchecked
|
||||
responseObserver = (CompletableFuture<List<T>>) (CompletableFuture<?>) thisResponseObserver;
|
||||
yield this.asyncStub.putMulti(thisResponseObserver);
|
||||
}
|
||||
case RequestPrevious<?> _ -> {
|
||||
var thisResponseObserver = new CollectListMappedStreamObserver<Previous, @Nullable MemorySegment>(
|
||||
GrpcConnection::mapPrevious, count);
|
||||
//noinspection unchecked
|
||||
responseObserver = (CompletableFuture<List<T>>) (CompletableFuture<?>) thisResponseObserver;
|
||||
yield this.asyncStub.putMultiGetPrevious(thisResponseObserver);
|
||||
}
|
||||
case RequestDelta<?> _ -> {
|
||||
var thisResponseObserver = new CollectListMappedStreamObserver<>(GrpcConnection::mapDelta, count);
|
||||
//noinspection unchecked
|
||||
responseObserver = (CompletableFuture<List<T>>) (CompletableFuture<?>) thisResponseObserver;
|
||||
yield this.asyncStub.putMultiGetDelta(thisResponseObserver);
|
||||
}
|
||||
case RequestChanged<?> _ -> {
|
||||
var thisResponseObserver = new CollectListMappedStreamObserver<>(Changed::getChanged, count);
|
||||
//noinspection unchecked
|
||||
responseObserver = (CompletableFuture<List<T>>) (CompletableFuture<?>) thisResponseObserver;
|
||||
yield this.asyncStub.putMultiGetChanged(thisResponseObserver);
|
||||
}
|
||||
case RequestPreviousPresence<?> _ -> {
|
||||
var thisResponseObserver = new CollectListMappedStreamObserver<>(PreviousPresence::getPresent, count);
|
||||
//noinspection unchecked
|
||||
responseObserver = (CompletableFuture<List<T>>) (CompletableFuture<?>) thisResponseObserver;
|
||||
yield this.asyncStub.putMultiGetPreviousPresence(thisResponseObserver);
|
||||
}
|
||||
};
|
||||
|
||||
requestPublisher.onNext(initialRequest);
|
||||
|
||||
var it1 = allKeys.iterator();
|
||||
var it2 = allValues.iterator();
|
||||
|
||||
while (it1.hasNext()) {
|
||||
var keys = it1.next();
|
||||
var value = it2.next();
|
||||
requestPublisher.onNext(PutMultiRequest.newBuilder()
|
||||
.setData(mapKV(keys, value))
|
||||
.build());
|
||||
}
|
||||
|
||||
return responseObserver;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T> CompletionStage<T> getAsync(Arena arena,
|
||||
long transactionOrUpdateId,
|
||||
long columnId,
|
||||
@NotNull Keys keys,
|
||||
RequestGet<? super MemorySegment, T> requestType) throws RocksDBException {
|
||||
var request = GetRequest.newBuilder()
|
||||
.setTransactionOrUpdateId(transactionOrUpdateId)
|
||||
.setColumnId(columnId)
|
||||
.addAllKeys(mapKeys(keys))
|
||||
.build();
|
||||
if (requestType instanceof RequestType.RequestForUpdate<?>) {
|
||||
return toResponse(this.futureStub.getForUpdate(request), x -> (T) new UpdateContext<>(
|
||||
x.hasPrevious() ? mapByteString(x.getPrevious()) : null,
|
||||
x.getUpdateId()
|
||||
));
|
||||
} else {
|
||||
return toResponse(this.futureStub.get(request), x -> switch (requestType) {
|
||||
case RequestNothing<?> _ -> null;
|
||||
case RequestType.RequestCurrent<?> _ -> x.hasValue() ? (T) mapByteString(x.getValue()) : null;
|
||||
case RequestType.RequestForUpdate<?> _ -> throw new IllegalStateException();
|
||||
case RequestType.RequestExists<?> _ -> (T) (Boolean) x.hasValue();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Long> openIteratorAsync(Arena arena,
|
||||
long transactionId,
|
||||
long columnId,
|
||||
@NotNull Keys startKeysInclusive,
|
||||
@Nullable Keys endKeysExclusive,
|
||||
boolean reverse,
|
||||
long timeoutMs) throws RocksDBException {
|
||||
var request = OpenIteratorRequest.newBuilder()
|
||||
.setTransactionId(transactionId)
|
||||
.setColumnId(columnId)
|
||||
.addAllStartKeysInclusive(mapKeys(startKeysInclusive))
|
||||
.addAllEndKeysExclusive(mapKeys(endKeysExclusive))
|
||||
.setReverse(reverse)
|
||||
.setTimeoutMs(timeoutMs)
|
||||
.build();
|
||||
return toResponse(this.futureStub.openIterator(request), OpenIteratorResponse::getIteratorId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Void> closeIteratorAsync(long iteratorId) throws RocksDBException {
|
||||
var request = CloseIteratorRequest.newBuilder()
|
||||
.setIteratorId(iteratorId)
|
||||
.build();
|
||||
return toResponse(this.futureStub.closeIterator(request), _ -> null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Void> seekToAsync(Arena arena, long iterationId, @NotNull Keys keys) throws RocksDBException {
|
||||
var request = SeekToRequest.newBuilder()
|
||||
.setIterationId(iterationId)
|
||||
.addAllKeys(mapKeys(keys))
|
||||
.build();
|
||||
return toResponse(this.futureStub.seekTo(request), _ -> null);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T> CompletionStage<T> subsequentAsync(Arena arena,
|
||||
long iterationId,
|
||||
long skipCount,
|
||||
long takeCount,
|
||||
@NotNull RequestType.RequestIterate<? super MemorySegment, T> requestType) throws RocksDBException {
|
||||
var request = SubsequentRequest.newBuilder()
|
||||
.setIterationId(iterationId)
|
||||
.setSkipCount(skipCount)
|
||||
.setTakeCount(takeCount)
|
||||
.build();
|
||||
return switch (requestType) {
|
||||
case RequestNothing<?> _ -> toResponse(this.futureStub.subsequent(request), _ -> null);
|
||||
case RequestExists<?> _ ->
|
||||
(CompletionStage<T>) toResponse(this.futureStub.subsequentExists(request), PreviousPresence::getPresent);
|
||||
case RequestMulti<?> _ -> {
|
||||
CollectListMappedStreamObserver<KV, MemorySegment> responseObserver
|
||||
= new CollectListMappedStreamObserver<>(kv -> mapByteString(kv.getValue()));
|
||||
this.asyncStub.subsequentMultiGet(request, responseObserver);
|
||||
yield (CompletionStage<T>) responseObserver;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static it.cavallium.rockserver.core.common.Delta<MemorySegment> mapDelta(Delta x) {
|
||||
return new it.cavallium.rockserver.core.common.Delta<>(
|
||||
x.hasPrevious() ? mapByteString(x.getPrevious()) : null,
|
||||
x.hasCurrent() ? mapByteString(x.getCurrent()) : null
|
||||
);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static MemorySegment mapPrevious(Previous x) {
|
||||
return x.hasPrevious() ? mapByteString(x.getPrevious()) : null;
|
||||
}
|
||||
|
||||
private static MemorySegment mapByteString(ByteString data) {
|
||||
return data != null ? MemorySegment.ofBuffer(data.asReadOnlyByteBuffer()) : null;
|
||||
}
|
||||
|
||||
private static KV mapKV(@NotNull Keys keys, @NotNull MemorySegment value) {
|
||||
return KV.newBuilder()
|
||||
.addAllKeys(mapKeys(keys))
|
||||
.setValue(mapValue(value))
|
||||
.build();
|
||||
}
|
||||
|
||||
private static Iterable<? extends ByteString> mapKeys(Keys keys) {
|
||||
if (keys == null) return List.of();
|
||||
return Iterables.transform(Arrays.asList(keys.keys()), k -> UnsafeByteOperations.unsafeWrap(k.asByteBuffer()));
|
||||
}
|
||||
|
||||
private static ByteString mapValue(@NotNull MemorySegment value) {
|
||||
return UnsafeByteOperations.unsafeWrap(value.asByteBuffer());
|
||||
}
|
||||
|
||||
private static it.cavallium.rockserver.core.common.api.proto.ColumnSchema mapColumnSchema(@NotNull ColumnSchema schema) {
|
||||
return it.cavallium.rockserver.core.common.api.proto.ColumnSchema.newBuilder()
|
||||
.addAllFixedKeys(mapFixedKeys(schema))
|
||||
.addAllVariableTailKeys(mapVariableTailKeys(schema))
|
||||
.setHasValue(schema.hasValue())
|
||||
.build();
|
||||
}
|
||||
|
||||
private static Iterable<Integer> mapFixedKeys(@NotNull ColumnSchema schema) {
|
||||
var result = new IntArrayList(schema.fixedLengthKeysCount());
|
||||
for (int i = 0; i < schema.fixedLengthKeysCount(); i++) {
|
||||
result.add(schema.key(i));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static Iterable<ColumnHashType> mapVariableTailKeys(@NotNull ColumnSchema schema) {
|
||||
var result = new ArrayList<ColumnHashType>(schema.variableTailKeys().size());
|
||||
for (it.cavallium.rockserver.core.common.ColumnHashType variableTailKey : schema.variableTailKeys()) {
|
||||
result.add(switch (variableTailKey) {
|
||||
case XXHASH32 -> ColumnHashType.XXHASH32;
|
||||
case XXHASH8 -> ColumnHashType.XXHASH8;
|
||||
case ALLSAME8 -> ColumnHashType.ALLSAME8;
|
||||
});
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static <T, U> CompletableFuture<U> toResponse(ListenableFuture<T> listenableFuture, Function<T, U> mapper) {
|
||||
var cf = new CompletableFuture<U>() {
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
boolean cancelled = listenableFuture.cancel(mayInterruptIfRunning);
|
||||
super.cancel(cancelled);
|
||||
return cancelled;
|
||||
}
|
||||
};
|
||||
|
||||
Futures.addCallback(listenableFuture, new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(T result) {
|
||||
cf.complete(mapper.apply(result));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable t) {
|
||||
cf.completeExceptionally(t);
|
||||
}
|
||||
}, DIRECT_EXECUTOR);
|
||||
|
||||
return cf;
|
||||
}
|
||||
|
||||
private static <T> CompletableFuture<T> toResponse(ListenableFuture<T> listenableFuture) {
|
||||
var cf = new CompletableFuture<T>() {
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
boolean cancelled = listenableFuture.cancel(mayInterruptIfRunning);
|
||||
super.cancel(cancelled);
|
||||
return cancelled;
|
||||
}
|
||||
};
|
||||
|
||||
Futures.addCallback(listenableFuture, new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(T result) {
|
||||
cf.complete(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable t) {
|
||||
cf.completeExceptionally(t);
|
||||
}
|
||||
}, DIRECT_EXECUTOR);
|
||||
|
||||
return cf;
|
||||
}
|
||||
}
|
@ -7,8 +7,6 @@ import static java.util.Objects.requireNonNullElse;
|
||||
import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.lang.foreign.ValueLayout;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
@ -149,8 +147,8 @@ public class Utils {
|
||||
== -1;
|
||||
}
|
||||
|
||||
public static InetSocketAddress parseHostAndPort(URI uri) {
|
||||
return new InetSocketAddress(uri.getHost(), parsePort(uri));
|
||||
public static HostAndPort parseHostAndPort(URI uri) {
|
||||
return new HostAndPort(uri.getHost(), parsePort(uri));
|
||||
}
|
||||
|
||||
public static int parsePort(URI uri) {
|
||||
@ -166,4 +164,6 @@ public class Utils {
|
||||
var b = s.toArray(BIG_ENDIAN_BYTES);
|
||||
return HexFormat.of().formatHex(b);
|
||||
}
|
||||
|
||||
public record HostAndPort(String host, int port) {}
|
||||
}
|
||||
|
@ -408,6 +408,11 @@ public class GrpcServer extends Server {
|
||||
responseObserver));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subsequentMultiGet(SubsequentRequest request, StreamObserver<KV> responseObserver) {
|
||||
subsequentMultiPage(request, responseObserver, 0);
|
||||
}
|
||||
|
||||
public void subsequentMultiPage(SubsequentRequest request, StreamObserver<KV> responseObserver, int pageIndex) {
|
||||
final long pageSize = 16L;
|
||||
if (request.getTakeCount() > pageIndex * pageSize) {
|
||||
@ -438,11 +443,6 @@ public class GrpcServer extends Server {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subsequentMultiGet(SubsequentRequest request, StreamObserver<KV> responseObserver) {
|
||||
subsequentMultiPage(request, responseObserver, 0);
|
||||
}
|
||||
|
||||
// mappers
|
||||
|
||||
private static ColumnSchema mapColumnSchema(it.cavallium.rockserver.core.common.api.proto.ColumnSchema schema) {
|
||||
|
@ -3,6 +3,7 @@ package it.cavallium.rockserver.core.server;
|
||||
import it.cavallium.rockserver.core.client.ClientBuilder;
|
||||
import it.cavallium.rockserver.core.client.EmbeddedConnection;
|
||||
import it.cavallium.rockserver.core.client.RocksDBConnection;
|
||||
import it.cavallium.rockserver.core.common.Utils.HostAndPort;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnixDomainSocketAddress;
|
||||
@ -10,10 +11,9 @@ import java.nio.file.Path;
|
||||
|
||||
public class ServerBuilder {
|
||||
|
||||
private InetSocketAddress iNetAddress;
|
||||
private HostAndPort iNetAddress;
|
||||
private UnixDomainSocketAddress unixAddress;
|
||||
private String http2Host;
|
||||
private int http2Port;
|
||||
private HostAndPort http2Address;
|
||||
private boolean useThrift;
|
||||
private RocksDBConnection client;
|
||||
|
||||
@ -21,13 +21,12 @@ public class ServerBuilder {
|
||||
this.unixAddress = address;
|
||||
}
|
||||
|
||||
public void setAddress(InetSocketAddress address) {
|
||||
public void setAddress(HostAndPort address) {
|
||||
this.iNetAddress = address;
|
||||
}
|
||||
|
||||
public void setHttpAddress(String host, int port) {
|
||||
this.http2Host = host;
|
||||
this.http2Port = port;
|
||||
public void setHttpAddress(HostAndPort address) {
|
||||
this.http2Address = address;
|
||||
}
|
||||
|
||||
public void setUseThrift(boolean useThrift) {
|
||||
@ -39,11 +38,11 @@ public class ServerBuilder {
|
||||
}
|
||||
|
||||
public Server build() throws IOException {
|
||||
if (http2Host != null) {
|
||||
if (http2Address != null) {
|
||||
if (useThrift) {
|
||||
return new ThriftServer(client, http2Host, http2Port);
|
||||
return new ThriftServer(client, http2Address.host(), http2Address.port());
|
||||
} else {
|
||||
return new GrpcServer(client, http2Host, http2Port);
|
||||
return new GrpcServer(client, http2Address.host(), http2Address.port());
|
||||
}
|
||||
} else if (unixAddress != null) {
|
||||
throw new UnsupportedOperationException("Not implemented: unix socket");
|
||||
|
@ -20,6 +20,7 @@ module rockserver.core {
|
||||
requires io.jstach.rainbowgum.pattern;
|
||||
requires org.graalvm.nativeimage;
|
||||
requires io.netty.common;
|
||||
requires proto.google.common.protos;
|
||||
|
||||
exports it.cavallium.rockserver.core.client;
|
||||
exports it.cavallium.rockserver.core.common;
|
||||
|
@ -90,9 +90,13 @@ service RocksDBService {
|
||||
rpc put(PutRequest) returns (google.protobuf.Empty);
|
||||
rpc putMulti(stream PutMultiRequest) returns (google.protobuf.Empty);
|
||||
rpc putGetPrevious(PutRequest) returns (Previous);
|
||||
rpc putMultiGetPrevious(stream PutMultiRequest) returns (stream Previous);
|
||||
rpc putGetDelta(PutRequest) returns (Delta);
|
||||
rpc putMultiGetDelta(stream PutMultiRequest) returns (stream Delta);
|
||||
rpc putGetChanged(PutRequest) returns (Changed);
|
||||
rpc putMultiGetChanged(stream PutMultiRequest) returns (stream Changed);
|
||||
rpc putGetPreviousPresence(PutRequest) returns (PreviousPresence);
|
||||
rpc putMultiGetPreviousPresence(stream PutMultiRequest) returns (stream PreviousPresence);
|
||||
rpc get(GetRequest) returns (GetResponse);
|
||||
rpc getForUpdate(GetRequest) returns (UpdateBegin);
|
||||
rpc exists(GetRequest) returns (PreviousPresence);
|
||||
|
Loading…
Reference in New Issue
Block a user