From 044c4e04bd6ff0ae6a3e0633f99baa719ca3f4cc Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 13 Sep 2024 19:40:45 +0200 Subject: [PATCH] Implement grpc client connection --- .../it/cavallium/rockserver/core/Main.java | 7 +- .../rockserver/core/client/ClientBuilder.java | 22 +- .../CollectListMappedStreamObserver.java | 38 ++ .../client/CollectListStreamObserver.java | 34 ++ .../core/client/GrpcConnection.java | 468 ++++++++++++++++++ .../rockserver/core/common/Utils.java | 8 +- .../rockserver/core/server/GrpcServer.java | 10 +- .../rockserver/core/server/ServerBuilder.java | 19 +- src/main/java/module-info.java | 1 + src/main/proto/rocksdb.proto | 4 + 10 files changed, 588 insertions(+), 23 deletions(-) create mode 100644 src/main/java/it/cavallium/rockserver/core/client/CollectListMappedStreamObserver.java create mode 100644 src/main/java/it/cavallium/rockserver/core/client/CollectListStreamObserver.java create mode 100644 src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java diff --git a/src/main/java/it/cavallium/rockserver/core/Main.java b/src/main/java/it/cavallium/rockserver/core/Main.java index fe7fb76..8acc49c 100644 --- a/src/main/java/it/cavallium/rockserver/core/Main.java +++ b/src/main/java/it/cavallium/rockserver/core/Main.java @@ -4,6 +4,7 @@ import static it.cavallium.rockserver.core.client.EmbeddedConnection.PRIVATE_MEM import static java.util.Objects.requireNonNull; import it.cavallium.rockserver.core.common.Utils; +import it.cavallium.rockserver.core.common.Utils.HostAndPort; import it.cavallium.rockserver.core.impl.rocksdb.RocksDBLoader; import it.cavallium.rockserver.core.server.ServerBuilder; import java.io.IOException; @@ -95,6 +96,10 @@ public class Main { case "unix" -> clientBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(databaseUrl.getPath()))); case "file" -> clientBuilder.setEmbeddedPath(Path.of((databaseUrl.getAuthority() != null ? databaseUrl.getAuthority() : "") + databaseUrl.getPath()).normalize()); case "memory" -> clientBuilder.setEmbeddedInMemory(true); + case "http" -> { + clientBuilder.setHttpAddress(Utils.parseHostAndPort(databaseUrl)); + clientBuilder.setUseThrift(false); + } case "rocksdb" -> clientBuilder.setAddress(Utils.parseHostAndPort(databaseUrl)); case null, default -> throw new IllegalArgumentException("Invalid scheme \"" + databaseUrlScheme + "\" for database url url: " + databaseUrl); } @@ -130,7 +135,7 @@ public class Main { switch (thriftListenUrlScheme) { case "unix" -> serverBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(listenUrl.getPath()))); case "http" -> { - serverBuilder.setHttpAddress(listenUrl.getHost(), Utils.parsePort(listenUrl)); + serverBuilder.setHttpAddress(new HostAndPort(listenUrl.getHost(), Utils.parsePort(listenUrl))); serverBuilder.setUseThrift(useThrift); } case "rocksdb" -> serverBuilder.setAddress(Utils.parseHostAndPort(listenUrl)); diff --git a/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java b/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java index 8818c22..c339362 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java +++ b/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java @@ -1,18 +1,20 @@ package it.cavallium.rockserver.core.client; +import it.cavallium.rockserver.core.common.Utils.HostAndPort; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.UnixDomainSocketAddress; import java.nio.file.Path; public class ClientBuilder { - private InetSocketAddress iNetAddress; + private HostAndPort httpAddress; + private HostAndPort iNetAddress; private UnixDomainSocketAddress unixAddress; private Path embeddedPath; private String name; private Path embeddedConfig; private boolean embeddedInMemory; + private boolean useThrift; public void setEmbeddedPath(Path path) { this.embeddedPath = path; @@ -26,10 +28,18 @@ public class ClientBuilder { this.unixAddress = address; } - public void setAddress(InetSocketAddress address) { + public void setHttpAddress(HostAndPort httpAddress) { + this.httpAddress = httpAddress; + } + + public void setAddress(HostAndPort address) { this.iNetAddress = address; } + public void setUseThrift(boolean useThrift) { + this.useThrift = useThrift; + } + public void setName(String name) { this.name = name; } @@ -45,6 +55,12 @@ public class ClientBuilder { return new EmbeddedConnection(embeddedPath, name, embeddedConfig); } else if (unixAddress != null) { throw new UnsupportedOperationException("Not implemented: unix socket"); + } else if (httpAddress != null) { + if (useThrift) { + throw new UnsupportedOperationException("Not implemented: thrift http2 address"); + } else { + return new GrpcConnection(name, httpAddress); + } } else if (iNetAddress != null) { throw new UnsupportedOperationException("Not implemented: inet address"); } else { diff --git a/src/main/java/it/cavallium/rockserver/core/client/CollectListMappedStreamObserver.java b/src/main/java/it/cavallium/rockserver/core/client/CollectListMappedStreamObserver.java new file mode 100644 index 0000000..aa58652 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/client/CollectListMappedStreamObserver.java @@ -0,0 +1,38 @@ +package it.cavallium.rockserver.core.client; + +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +class CollectListMappedStreamObserver extends CompletableFuture> implements StreamObserver { + + private final Function mapper; + private final List list; + + public CollectListMappedStreamObserver(Function mapper) { + this.mapper = mapper; + this.list = new ArrayList<>(); + } + + public CollectListMappedStreamObserver(Function mapper, int size) { + this.mapper = mapper; + this.list = new ArrayList<>(size); + } + + @Override + public void onNext(T t) { + this.list.add(mapper.apply(t)); + } + + @Override + public void onError(Throwable throwable) { + this.completeExceptionally(throwable); + } + + @Override + public void onCompleted() { + this.complete(this.list); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/client/CollectListStreamObserver.java b/src/main/java/it/cavallium/rockserver/core/client/CollectListStreamObserver.java new file mode 100644 index 0000000..ef83b41 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/client/CollectListStreamObserver.java @@ -0,0 +1,34 @@ +package it.cavallium.rockserver.core.client; + +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +class CollectListStreamObserver extends CompletableFuture> implements StreamObserver { + + private final List list; + + public CollectListStreamObserver() { + this.list = new ArrayList<>(); + } + + public CollectListStreamObserver(int size) { + this.list = new ArrayList<>(size); + } + + @Override + public void onNext(T t) { + this.list.add(t); + } + + @Override + public void onError(Throwable throwable) { + this.completeExceptionally(throwable); + } + + @Override + public void onCompleted() { + this.complete(this.list); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java new file mode 100644 index 0000000..4501fd5 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -0,0 +1,468 @@ +package it.cavallium.rockserver.core.client; + +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; +import com.google.protobuf.UnsafeByteOperations; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import it.cavallium.rockserver.core.common.ColumnSchema; +import it.cavallium.rockserver.core.common.Keys; +import it.cavallium.rockserver.core.common.RequestType; +import it.cavallium.rockserver.core.common.RequestType.RequestChanged; +import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; +import it.cavallium.rockserver.core.common.RequestType.RequestDelta; +import it.cavallium.rockserver.core.common.RequestType.RequestExists; +import it.cavallium.rockserver.core.common.RequestType.RequestForUpdate; +import it.cavallium.rockserver.core.common.RequestType.RequestGet; +import it.cavallium.rockserver.core.common.RequestType.RequestMulti; +import it.cavallium.rockserver.core.common.RequestType.RequestNothing; +import it.cavallium.rockserver.core.common.RequestType.RequestPrevious; +import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence; +import it.cavallium.rockserver.core.common.RequestType.RequestPut; +import it.cavallium.rockserver.core.common.RocksDBAPI; +import it.cavallium.rockserver.core.common.RocksDBAPICommand; +import it.cavallium.rockserver.core.common.RocksDBAsyncAPI; +import it.cavallium.rockserver.core.common.RocksDBException; +import it.cavallium.rockserver.core.common.RocksDBSyncAPI; +import it.cavallium.rockserver.core.common.UpdateContext; +import it.cavallium.rockserver.core.common.Utils.HostAndPort; +import it.cavallium.rockserver.core.common.api.proto.Changed; +import it.cavallium.rockserver.core.common.api.proto.CloseFailedUpdateRequest; +import it.cavallium.rockserver.core.common.api.proto.CloseIteratorRequest; +import it.cavallium.rockserver.core.common.api.proto.CloseTransactionRequest; +import it.cavallium.rockserver.core.common.api.proto.CloseTransactionResponse; +import it.cavallium.rockserver.core.common.api.proto.ColumnHashType; +import it.cavallium.rockserver.core.common.api.proto.CreateColumnRequest; +import it.cavallium.rockserver.core.common.api.proto.CreateColumnResponse; +import it.cavallium.rockserver.core.common.api.proto.DeleteColumnRequest; +import it.cavallium.rockserver.core.common.api.proto.Delta; +import it.cavallium.rockserver.core.common.api.proto.GetColumnIdRequest; +import it.cavallium.rockserver.core.common.api.proto.GetColumnIdResponse; +import it.cavallium.rockserver.core.common.api.proto.GetRequest; +import it.cavallium.rockserver.core.common.api.proto.GetResponse; +import it.cavallium.rockserver.core.common.api.proto.KV; +import it.cavallium.rockserver.core.common.api.proto.OpenIteratorRequest; +import it.cavallium.rockserver.core.common.api.proto.OpenIteratorResponse; +import it.cavallium.rockserver.core.common.api.proto.OpenTransactionRequest; +import it.cavallium.rockserver.core.common.api.proto.OpenTransactionResponse; +import it.cavallium.rockserver.core.common.api.proto.Previous; +import it.cavallium.rockserver.core.common.api.proto.PreviousPresence; +import it.cavallium.rockserver.core.common.api.proto.PutMultiInitialRequest; +import it.cavallium.rockserver.core.common.api.proto.PutMultiRequest; +import it.cavallium.rockserver.core.common.api.proto.PutRequest; +import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc; +import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceBlockingStub; +import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub; +import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub; +import it.cavallium.rockserver.core.common.api.proto.SeekToRequest; +import it.cavallium.rockserver.core.common.api.proto.SubsequentRequest; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.function.Function; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class GrpcConnection extends BaseConnection implements RocksDBAPI { + + private static final Executor DIRECT_EXECUTOR = MoreExecutors.directExecutor(); + private final ManagedChannel channel; + private final RocksDBServiceBlockingStub blockingStub; + private final RocksDBServiceStub asyncStub; + private final RocksDBServiceFutureStub futureStub; + private final URI address; + + public GrpcConnection(String name, HostAndPort address) { + super(name); + var channelBuilder = ManagedChannelBuilder + .forTarget(address.toString()) + .usePlaintext(); + this.channel = channelBuilder.build(); + this.blockingStub = RocksDBServiceGrpc.newBlockingStub(channel); + this.asyncStub = RocksDBServiceGrpc.newStub(channel); + this.futureStub = RocksDBServiceGrpc.newFutureStub(channel); + this.address = URI.create("http://" + address.host() + ":" + address.port()); + } + + @Override + public URI getUrl() { + return address; + } + + @Override + public RocksDBSyncAPI getSyncApi() { + return this; + } + + @Override + public RocksDBAsyncAPI getAsyncApi() { + return this; + } + + @Override + public R requestSync(RocksDBAPICommand req) { + var asyncResponse = requestAsync(req); + return asyncResponse + .toCompletableFuture() + .join(); + } + + @Override + public CompletionStage openTransactionAsync(long timeoutMs) throws RocksDBException { + var request = OpenTransactionRequest.newBuilder() + .setTimeoutMs(timeoutMs) + .build(); + return toResponse(this.futureStub.openTransaction(request), OpenTransactionResponse::getTransactionId); + } + + @Override + public CompletionStage closeTransactionAsync(long transactionId, boolean commit) throws RocksDBException { + var request = CloseTransactionRequest.newBuilder() + .setTransactionId(transactionId) + .setCommit(commit) + .build(); + return toResponse(this.futureStub.closeTransaction(request), CloseTransactionResponse::getSuccessful); + } + + @Override + public CompletionStage closeFailedUpdateAsync(long updateId) throws RocksDBException { + var request = CloseFailedUpdateRequest.newBuilder() + .setUpdateId(updateId) + .build(); + return toResponse(this.futureStub.closeFailedUpdate(request), _ -> null); + } + + @Override + public CompletionStage createColumnAsync(String name, @NotNull ColumnSchema schema) throws RocksDBException { + var request = CreateColumnRequest.newBuilder() + .setName(name) + .setSchema(mapColumnSchema(schema)) + .build(); + return toResponse(this.futureStub.createColumn(request), CreateColumnResponse::getColumnId); + } + + @Override + public CompletionStage deleteColumnAsync(long columnId) throws RocksDBException { + var request = DeleteColumnRequest.newBuilder() + .setColumnId(columnId) + .build(); + return toResponse(this.futureStub.deleteColumn(request), _ -> null); + } + + @Override + public CompletionStage getColumnIdAsync(@NotNull String name) throws RocksDBException { + var request = GetColumnIdRequest.newBuilder() + .setName(name) + .build(); + return toResponse(this.futureStub.getColumnId(request), GetColumnIdResponse::getColumnId); + } + + @SuppressWarnings("unchecked") + @Override + public CompletionStage putAsync(Arena arena, + long transactionOrUpdateId, + long columnId, + @NotNull Keys keys, + @NotNull MemorySegment value, + RequestPut requestType) throws RocksDBException { + var request = PutRequest.newBuilder() + .setTransactionOrUpdateId(transactionOrUpdateId) + .setColumnId(columnId) + .setData(mapKV(keys, value)) + .build(); + return (CompletionStage) switch (requestType) { + case RequestNothing _ -> toResponse(this.futureStub.put(request), _ -> null); + case RequestPrevious _ -> + toResponse(this.futureStub.putGetPrevious(request), GrpcConnection::mapPrevious); + case RequestDelta _ -> + toResponse(this.futureStub.putGetDelta(request), GrpcConnection::mapDelta); + case RequestChanged _ -> + toResponse(this.futureStub.putGetChanged(request), Changed::getChanged); + case RequestType.RequestPreviousPresence _ -> + toResponse(this.futureStub.putGetPreviousPresence(request), PreviousPresence::getPresent); + }; + } + + @Override + public CompletionStage> putMultiAsync(Arena arena, + long transactionOrUpdateId, + long columnId, + @NotNull List<@NotNull Keys> allKeys, + @NotNull List<@NotNull MemorySegment> allValues, + RequestPut requestType) throws RocksDBException { + var count = allKeys.size(); + if (count != allValues.size()) { + throw new IllegalArgumentException("Keys length is different than values length! " + + count + " != " + allValues.size()); + } + + var initialRequest = PutMultiRequest.newBuilder() + .setInitialRequest(PutMultiInitialRequest.newBuilder() + .setTransactionOrUpdateId(transactionOrUpdateId) + .setColumnId(columnId) + .build()) + .build(); + + CompletableFuture> responseObserver; + + StreamObserver requestPublisher = switch (requestType) { + case RequestNothing _ -> { + var thisResponseObserver = new CollectListStreamObserver(0); + //noinspection unchecked + responseObserver = (CompletableFuture>) (CompletableFuture) thisResponseObserver; + yield this.asyncStub.putMulti(thisResponseObserver); + } + case RequestPrevious _ -> { + var thisResponseObserver = new CollectListMappedStreamObserver( + GrpcConnection::mapPrevious, count); + //noinspection unchecked + responseObserver = (CompletableFuture>) (CompletableFuture) thisResponseObserver; + yield this.asyncStub.putMultiGetPrevious(thisResponseObserver); + } + case RequestDelta _ -> { + var thisResponseObserver = new CollectListMappedStreamObserver<>(GrpcConnection::mapDelta, count); + //noinspection unchecked + responseObserver = (CompletableFuture>) (CompletableFuture) thisResponseObserver; + yield this.asyncStub.putMultiGetDelta(thisResponseObserver); + } + case RequestChanged _ -> { + var thisResponseObserver = new CollectListMappedStreamObserver<>(Changed::getChanged, count); + //noinspection unchecked + responseObserver = (CompletableFuture>) (CompletableFuture) thisResponseObserver; + yield this.asyncStub.putMultiGetChanged(thisResponseObserver); + } + case RequestPreviousPresence _ -> { + var thisResponseObserver = new CollectListMappedStreamObserver<>(PreviousPresence::getPresent, count); + //noinspection unchecked + responseObserver = (CompletableFuture>) (CompletableFuture) thisResponseObserver; + yield this.asyncStub.putMultiGetPreviousPresence(thisResponseObserver); + } + }; + + requestPublisher.onNext(initialRequest); + + var it1 = allKeys.iterator(); + var it2 = allValues.iterator(); + + while (it1.hasNext()) { + var keys = it1.next(); + var value = it2.next(); + requestPublisher.onNext(PutMultiRequest.newBuilder() + .setData(mapKV(keys, value)) + .build()); + } + + return responseObserver; + } + + @SuppressWarnings("unchecked") + @Override + public CompletionStage getAsync(Arena arena, + long transactionOrUpdateId, + long columnId, + @NotNull Keys keys, + RequestGet requestType) throws RocksDBException { + var request = GetRequest.newBuilder() + .setTransactionOrUpdateId(transactionOrUpdateId) + .setColumnId(columnId) + .addAllKeys(mapKeys(keys)) + .build(); + if (requestType instanceof RequestType.RequestForUpdate) { + return toResponse(this.futureStub.getForUpdate(request), x -> (T) new UpdateContext<>( + x.hasPrevious() ? mapByteString(x.getPrevious()) : null, + x.getUpdateId() + )); + } else { + return toResponse(this.futureStub.get(request), x -> switch (requestType) { + case RequestNothing _ -> null; + case RequestType.RequestCurrent _ -> x.hasValue() ? (T) mapByteString(x.getValue()) : null; + case RequestType.RequestForUpdate _ -> throw new IllegalStateException(); + case RequestType.RequestExists _ -> (T) (Boolean) x.hasValue(); + }); + } + } + + @Override + public CompletionStage openIteratorAsync(Arena arena, + long transactionId, + long columnId, + @NotNull Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + long timeoutMs) throws RocksDBException { + var request = OpenIteratorRequest.newBuilder() + .setTransactionId(transactionId) + .setColumnId(columnId) + .addAllStartKeysInclusive(mapKeys(startKeysInclusive)) + .addAllEndKeysExclusive(mapKeys(endKeysExclusive)) + .setReverse(reverse) + .setTimeoutMs(timeoutMs) + .build(); + return toResponse(this.futureStub.openIterator(request), OpenIteratorResponse::getIteratorId); + } + + @Override + public CompletionStage closeIteratorAsync(long iteratorId) throws RocksDBException { + var request = CloseIteratorRequest.newBuilder() + .setIteratorId(iteratorId) + .build(); + return toResponse(this.futureStub.closeIterator(request), _ -> null); + } + + @Override + public CompletionStage seekToAsync(Arena arena, long iterationId, @NotNull Keys keys) throws RocksDBException { + var request = SeekToRequest.newBuilder() + .setIterationId(iterationId) + .addAllKeys(mapKeys(keys)) + .build(); + return toResponse(this.futureStub.seekTo(request), _ -> null); + } + + @SuppressWarnings("unchecked") + @Override + public CompletionStage subsequentAsync(Arena arena, + long iterationId, + long skipCount, + long takeCount, + @NotNull RequestType.RequestIterate requestType) throws RocksDBException { + var request = SubsequentRequest.newBuilder() + .setIterationId(iterationId) + .setSkipCount(skipCount) + .setTakeCount(takeCount) + .build(); + return switch (requestType) { + case RequestNothing _ -> toResponse(this.futureStub.subsequent(request), _ -> null); + case RequestExists _ -> + (CompletionStage) toResponse(this.futureStub.subsequentExists(request), PreviousPresence::getPresent); + case RequestMulti _ -> { + CollectListMappedStreamObserver responseObserver + = new CollectListMappedStreamObserver<>(kv -> mapByteString(kv.getValue())); + this.asyncStub.subsequentMultiGet(request, responseObserver); + yield (CompletionStage) responseObserver; + } + }; + } + + private static it.cavallium.rockserver.core.common.Delta mapDelta(Delta x) { + return new it.cavallium.rockserver.core.common.Delta<>( + x.hasPrevious() ? mapByteString(x.getPrevious()) : null, + x.hasCurrent() ? mapByteString(x.getCurrent()) : null + ); + } + + @Nullable + private static MemorySegment mapPrevious(Previous x) { + return x.hasPrevious() ? mapByteString(x.getPrevious()) : null; + } + + private static MemorySegment mapByteString(ByteString data) { + return data != null ? MemorySegment.ofBuffer(data.asReadOnlyByteBuffer()) : null; + } + + private static KV mapKV(@NotNull Keys keys, @NotNull MemorySegment value) { + return KV.newBuilder() + .addAllKeys(mapKeys(keys)) + .setValue(mapValue(value)) + .build(); + } + + private static Iterable mapKeys(Keys keys) { + if (keys == null) return List.of(); + return Iterables.transform(Arrays.asList(keys.keys()), k -> UnsafeByteOperations.unsafeWrap(k.asByteBuffer())); + } + + private static ByteString mapValue(@NotNull MemorySegment value) { + return UnsafeByteOperations.unsafeWrap(value.asByteBuffer()); + } + + private static it.cavallium.rockserver.core.common.api.proto.ColumnSchema mapColumnSchema(@NotNull ColumnSchema schema) { + return it.cavallium.rockserver.core.common.api.proto.ColumnSchema.newBuilder() + .addAllFixedKeys(mapFixedKeys(schema)) + .addAllVariableTailKeys(mapVariableTailKeys(schema)) + .setHasValue(schema.hasValue()) + .build(); + } + + private static Iterable mapFixedKeys(@NotNull ColumnSchema schema) { + var result = new IntArrayList(schema.fixedLengthKeysCount()); + for (int i = 0; i < schema.fixedLengthKeysCount(); i++) { + result.add(schema.key(i)); + } + return result; + } + + private static Iterable mapVariableTailKeys(@NotNull ColumnSchema schema) { + var result = new ArrayList(schema.variableTailKeys().size()); + for (it.cavallium.rockserver.core.common.ColumnHashType variableTailKey : schema.variableTailKeys()) { + result.add(switch (variableTailKey) { + case XXHASH32 -> ColumnHashType.XXHASH32; + case XXHASH8 -> ColumnHashType.XXHASH8; + case ALLSAME8 -> ColumnHashType.ALLSAME8; + }); + } + return result; + } + + private static CompletableFuture toResponse(ListenableFuture listenableFuture, Function mapper) { + var cf = new CompletableFuture() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = listenableFuture.cancel(mayInterruptIfRunning); + super.cancel(cancelled); + return cancelled; + } + }; + + Futures.addCallback(listenableFuture, new FutureCallback<>() { + @Override + public void onSuccess(T result) { + cf.complete(mapper.apply(result)); + } + + @Override + public void onFailure(@NotNull Throwable t) { + cf.completeExceptionally(t); + } + }, DIRECT_EXECUTOR); + + return cf; + } + + private static CompletableFuture toResponse(ListenableFuture listenableFuture) { + var cf = new CompletableFuture() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = listenableFuture.cancel(mayInterruptIfRunning); + super.cancel(cancelled); + return cancelled; + } + }; + + Futures.addCallback(listenableFuture, new FutureCallback<>() { + @Override + public void onSuccess(T result) { + cf.complete(result); + } + + @Override + public void onFailure(@NotNull Throwable t) { + cf.completeExceptionally(t); + } + }, DIRECT_EXECUTOR); + + return cf; + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/common/Utils.java b/src/main/java/it/cavallium/rockserver/core/common/Utils.java index 1de1399..8fdd177 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/Utils.java +++ b/src/main/java/it/cavallium/rockserver/core/common/Utils.java @@ -7,8 +7,6 @@ import static java.util.Objects.requireNonNullElse; import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.net.InetSocketAddress; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; @@ -149,8 +147,8 @@ public class Utils { == -1; } - public static InetSocketAddress parseHostAndPort(URI uri) { - return new InetSocketAddress(uri.getHost(), parsePort(uri)); + public static HostAndPort parseHostAndPort(URI uri) { + return new HostAndPort(uri.getHost(), parsePort(uri)); } public static int parsePort(URI uri) { @@ -166,4 +164,6 @@ public class Utils { var b = s.toArray(BIG_ENDIAN_BYTES); return HexFormat.of().formatHex(b); } + + public record HostAndPort(String host, int port) {} } diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index 83ecd8b..b3d2771 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -408,6 +408,11 @@ public class GrpcServer extends Server { responseObserver)); } + @Override + public void subsequentMultiGet(SubsequentRequest request, StreamObserver responseObserver) { + subsequentMultiPage(request, responseObserver, 0); + } + public void subsequentMultiPage(SubsequentRequest request, StreamObserver responseObserver, int pageIndex) { final long pageSize = 16L; if (request.getTakeCount() > pageIndex * pageSize) { @@ -438,11 +443,6 @@ public class GrpcServer extends Server { } } - @Override - public void subsequentMultiGet(SubsequentRequest request, StreamObserver responseObserver) { - subsequentMultiPage(request, responseObserver, 0); - } - // mappers private static ColumnSchema mapColumnSchema(it.cavallium.rockserver.core.common.api.proto.ColumnSchema schema) { diff --git a/src/main/java/it/cavallium/rockserver/core/server/ServerBuilder.java b/src/main/java/it/cavallium/rockserver/core/server/ServerBuilder.java index 24915c3..d819128 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/ServerBuilder.java +++ b/src/main/java/it/cavallium/rockserver/core/server/ServerBuilder.java @@ -3,6 +3,7 @@ package it.cavallium.rockserver.core.server; import it.cavallium.rockserver.core.client.ClientBuilder; import it.cavallium.rockserver.core.client.EmbeddedConnection; import it.cavallium.rockserver.core.client.RocksDBConnection; +import it.cavallium.rockserver.core.common.Utils.HostAndPort; import java.io.IOException; import java.net.InetSocketAddress; import java.net.UnixDomainSocketAddress; @@ -10,10 +11,9 @@ import java.nio.file.Path; public class ServerBuilder { - private InetSocketAddress iNetAddress; + private HostAndPort iNetAddress; private UnixDomainSocketAddress unixAddress; - private String http2Host; - private int http2Port; + private HostAndPort http2Address; private boolean useThrift; private RocksDBConnection client; @@ -21,13 +21,12 @@ public class ServerBuilder { this.unixAddress = address; } - public void setAddress(InetSocketAddress address) { + public void setAddress(HostAndPort address) { this.iNetAddress = address; } - public void setHttpAddress(String host, int port) { - this.http2Host = host; - this.http2Port = port; + public void setHttpAddress(HostAndPort address) { + this.http2Address = address; } public void setUseThrift(boolean useThrift) { @@ -39,11 +38,11 @@ public class ServerBuilder { } public Server build() throws IOException { - if (http2Host != null) { + if (http2Address != null) { if (useThrift) { - return new ThriftServer(client, http2Host, http2Port); + return new ThriftServer(client, http2Address.host(), http2Address.port()); } else { - return new GrpcServer(client, http2Host, http2Port); + return new GrpcServer(client, http2Address.host(), http2Address.port()); } } else if (unixAddress != null) { throw new UnsupportedOperationException("Not implemented: unix socket"); diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index a121a61..21d4406 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -20,6 +20,7 @@ module rockserver.core { requires io.jstach.rainbowgum.pattern; requires org.graalvm.nativeimage; requires io.netty.common; + requires proto.google.common.protos; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common; diff --git a/src/main/proto/rocksdb.proto b/src/main/proto/rocksdb.proto index 32094c9..8ec8830 100644 --- a/src/main/proto/rocksdb.proto +++ b/src/main/proto/rocksdb.proto @@ -90,9 +90,13 @@ service RocksDBService { rpc put(PutRequest) returns (google.protobuf.Empty); rpc putMulti(stream PutMultiRequest) returns (google.protobuf.Empty); rpc putGetPrevious(PutRequest) returns (Previous); + rpc putMultiGetPrevious(stream PutMultiRequest) returns (stream Previous); rpc putGetDelta(PutRequest) returns (Delta); + rpc putMultiGetDelta(stream PutMultiRequest) returns (stream Delta); rpc putGetChanged(PutRequest) returns (Changed); + rpc putMultiGetChanged(stream PutMultiRequest) returns (stream Changed); rpc putGetPreviousPresence(PutRequest) returns (PreviousPresence); + rpc putMultiGetPreviousPresence(stream PutMultiRequest) returns (stream PreviousPresence); rpc get(GetRequest) returns (GetResponse); rpc getForUpdate(GetRequest) returns (UpdateBegin); rpc exists(GetRequest) returns (PreviousPresence);