diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java index 96bc779..cbb74f8 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java @@ -168,4 +168,9 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { @NotNull RequestType.RequestIterate requestType) throws RocksDBException { return db.subsequent(arena, iterationId, skipCount, takeCount, requestType); } + + @Override + public T getRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange requestType, long timeoutMs) throws RocksDBException { + return db.getRange(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs); + } } diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java index 7a1c0bd..c7a7834 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -9,18 +9,16 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.protobuf.UnsafeByteOperations; import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; import io.grpc.netty.NettyChannelBuilder; import io.grpc.stub.*; import io.netty.channel.epoll.EpollDomainSocketChannel; import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.epoll.EpollServerDomainSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.unix.DomainSocketAddress; import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.ColumnSchema; +import it.cavallium.rockserver.core.common.FirstAndLast; import it.cavallium.rockserver.core.common.KVBatch; import it.cavallium.rockserver.core.common.PutBatchMode; import it.cavallium.rockserver.core.common.RequestType.RequestChanged; @@ -36,16 +34,17 @@ import it.cavallium.rockserver.core.common.Utils.HostAndPort; import it.cavallium.rockserver.core.common.api.proto.*; import it.cavallium.rockserver.core.common.api.proto.ColumnHashType; import it.cavallium.rockserver.core.common.api.proto.Delta; +import it.cavallium.rockserver.core.common.api.proto.KV; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceBlockingStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub; +import it.unimi.dsi.fastutil.ints.Int2ObjectFunction; import it.unimi.dsi.fastutil.ints.IntArrayList; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; -import java.net.UnixDomainSocketAddress; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -63,6 +62,8 @@ import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static it.cavallium.rockserver.core.common.Utils.toMemorySegment; + public class GrpcConnection extends BaseConnection implements RocksDBAPI { private static final Logger LOG = LoggerFactory.getLogger(GrpcConnection.class); @@ -505,6 +506,26 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { }; } + @SuppressWarnings("unchecked") + @Override + public CompletableFuture getRangeAsync(Arena arena, long transactionId, long columnId, @NotNull Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange requestType, long timeoutMs) throws RocksDBException { + var request = GetRangeRequest.newBuilder() + .setTransactionId(transactionId) + .setColumnId(columnId) + .addAllStartKeysInclusive(mapKeys(startKeysInclusive)) + .addAllEndKeysExclusive(mapKeys(endKeysExclusive)) + .setReverse(reverse) + .setTimeoutMs(timeoutMs) + .build(); + return (CompletableFuture) switch (requestType) { + case RequestType.RequestGetFirstAndLast _ -> + toResponse(this.futureStub.getRangeFirstAndLast(request), result -> new FirstAndLast<>( + result.hasFirst() ? mapKV(arena, result.getFirst()) : null, + result.hasLast() ? mapKV(arena, result.getLast()) : null + )); + }; + } + private static it.cavallium.rockserver.core.common.Delta mapDelta(Delta x) { return new it.cavallium.rockserver.core.common.Delta<>( x.hasPrevious() ? mapByteString(x.getPrevious()) : null, @@ -555,6 +576,21 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { .build(); } + private static it.cavallium.rockserver.core.common.KV mapKV(Arena arena, @NotNull KV entry) { + return new it.cavallium.rockserver.core.common.KV( + mapKeys(arena, entry.getKeysCount(), entry::getKeys), + toMemorySegment(arena, entry.getValue()) + ); + } + + private static Keys mapKeys(Arena arena, int count, Int2ObjectFunction keyGetterAt) { + var segments = new MemorySegment[count]; + for (int i = 0; i < count; i++) { + segments[i] = toMemorySegment(arena, keyGetterAt.apply(i)); + } + return new Keys(segments); + } + private static Iterable mapKeys(Keys keys) { if (keys == null) return List.of(); return Iterables.transform(Arrays.asList(keys.keys()), k -> UnsafeByteOperations.unsafeWrap(k.asByteBuffer())); diff --git a/src/main/java/it/cavallium/rockserver/core/common/FirstAndLast.java b/src/main/java/it/cavallium/rockserver/core/common/FirstAndLast.java new file mode 100644 index 0000000..9547983 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/common/FirstAndLast.java @@ -0,0 +1,6 @@ +package it.cavallium.rockserver.core.common; + +import org.jetbrains.annotations.Nullable; + +public record FirstAndLast(@Nullable T first, @Nullable T last) { +} diff --git a/src/main/java/it/cavallium/rockserver/core/common/KV.java b/src/main/java/it/cavallium/rockserver/core/common/KV.java new file mode 100644 index 0000000..00d9ed2 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/common/KV.java @@ -0,0 +1,30 @@ +package it.cavallium.rockserver.core.common; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.lang.foreign.MemorySegment; +import java.util.Objects; + +public record KV(@NotNull Keys keys, @Nullable MemorySegment value) { + @Override + public String toString() { + return "KV{" + keys + "=" + (value != null ? Utils.toPrettyString(value) : "null") + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + KV kv = (KV) o; + return Objects.equals(keys, kv.keys) && Utils.valueEquals(value, kv.value); + } + + @Override + public int hashCode() { + int hash = 7; + hash = 31 * hash + Objects.hashCode(keys); + hash = 31 * hash + Utils.valueHash(value); + return hash; + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/common/Keys.java b/src/main/java/it/cavallium/rockserver/core/common/Keys.java index 14466ee..4ab3924 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/Keys.java +++ b/src/main/java/it/cavallium/rockserver/core/common/Keys.java @@ -1,6 +1,7 @@ package it.cavallium.rockserver.core.common; import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; import java.util.Arrays; import java.util.stream.Collectors; import org.jetbrains.annotations.NotNull; @@ -21,11 +22,25 @@ public record Keys(@NotNull MemorySegment @NotNull ... keys) { return false; } Keys keys1 = (Keys) o; - return Arrays.equals(keys, keys1.keys); + if (keys.length != keys1.keys.length) { + return false; + } + for (int i = 0; i < keys.length; i++) { + var k1 = keys[i]; + var k2 = keys1.keys[i]; + if (!Utils.valueEquals(k1, k2)) { + return false; + } + } + return true; } @Override public int hashCode() { - return Arrays.hashCode(keys); + int hash = 7; + for (@NotNull MemorySegment key : keys) { + hash = hash * 31 + Utils.valueHash(key); + } + return hash; } } diff --git a/src/main/java/it/cavallium/rockserver/core/common/RequestType.java b/src/main/java/it/cavallium/rockserver/core/common/RequestType.java index 5acb6ba..e5a21f9 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RequestType.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RequestType.java @@ -1,6 +1,8 @@ package it.cavallium.rockserver.core.common; import java.util.List; +import java.util.concurrent.CompletableFuture; + import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -16,7 +18,8 @@ public sealed interface RequestType { DELTA(new RequestDelta()), MULTI(new RequestMulti()), CHANGED(new RequestChanged()), - PREVIOUS_PRESENCE(new RequestPreviousPresence()); + PREVIOUS_PRESENCE(new RequestPreviousPresence()), + FIRST_AND_LAST(new RequestGetFirstAndLast()); private final RequestType requestType; @@ -91,6 +94,11 @@ public sealed interface RequestType { return (RequestPreviousPresence) RequestPreviousPresence.INSTANCE; } + @SuppressWarnings("unchecked") + static RequestGetFirstAndLast firstAndLast() { + return (RequestGetFirstAndLast) RequestGetFirstAndLast.INSTANCE; + } + @SuppressWarnings("unchecked") static RequestNothing none() { return (RequestNothing) RequestNothing.INSTANCE; @@ -102,6 +110,8 @@ public sealed interface RequestType { sealed interface RequestGet extends RequestType {} + sealed interface RequestGetRange extends RequestType {} + sealed interface RequestIterate extends RequestType {} record RequestNothing() implements RequestPut, RequestPatch, RequestIterate, @@ -194,4 +204,14 @@ public sealed interface RequestType { return RequestTypeId.PREVIOUS_PRESENCE; } } + + record RequestGetFirstAndLast() implements RequestGetRange> { + + private static final RequestGetFirstAndLast INSTANCE = new RequestGetFirstAndLast<>(); + + @Override + public RequestTypeId getRequestTypeId() { + return RequestTypeId.FIRST_AND_LAST; + } + } } diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java index bc4eaf1..e798a92 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java @@ -5,7 +5,7 @@ import it.cavallium.rockserver.core.common.RequestType.RequestPut; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.util.List; -import java.util.StringJoiner; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -385,4 +385,54 @@ public sealed interface RocksDBAPICommand { } } + /** + * Get some values in a range + *

+ * Returns the result + * + * @param arena arena + * @param transactionId transaction id, or 0 + * @param columnId column id + * @param startKeysInclusive start keys, inclusive. [] means "the beginning" + * @param endKeysExclusive end keys, exclusive. Null means "the end" + * @param reverse if true, seek in reverse direction + * @param requestType the request type determines which type of data will be returned. + * @param timeoutMs timeout in milliseconds + */ + record GetRange(Arena arena, + long transactionId, + long columnId, + @Nullable Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + RequestType.RequestGetRange requestType, + long timeoutMs) implements RocksDBAPICommand { + + @Override + public T handleSync(RocksDBSyncAPI api) { + return api.getRange(arena, + transactionId, + columnId, + startKeysInclusive, + endKeysExclusive, + reverse, + requestType, + timeoutMs + ); + } + + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.getRangeAsync(arena, + transactionId, + columnId, + startKeysInclusive, + endKeysExclusive, + reverse, + requestType, + timeoutMs + ); + } + + } } diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java index b1332f2..092f440 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java @@ -9,7 +9,7 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.CreateColumn; import it.cavallium.rockserver.core.common.RocksDBAPICommand.DeleteColumn; import it.cavallium.rockserver.core.common.RocksDBAPICommand.Get; import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetRange; import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction; import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put; import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti; @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler { @@ -77,7 +78,7 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler { /** See: {@link PutBatch}. */ default CompletableFuture putBatchAsync(long columnId, - @NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher, + @NotNull Publisher<@NotNull KVBatch> batchPublisher, @NotNull PutBatchMode mode) throws RocksDBException { return requestAsync(new PutBatch(columnId, batchPublisher, mode)); } @@ -99,7 +100,7 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler { @Nullable Keys endKeysExclusive, boolean reverse, long timeoutMs) throws RocksDBException { - return requestAsync(new OpenIterator(arena, + return requestAsync(new RocksDBAPICommand.OpenIterator(arena, transactionId, columnId, startKeysInclusive, @@ -127,4 +128,24 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler { @NotNull RequestType.RequestIterate requestType) throws RocksDBException { return requestAsync(new Subsequent<>(arena, iterationId, skipCount, takeCount, requestType)); } + + /** See: {@link GetRange}. */ + default CompletableFuture getRangeAsync(Arena arena, + long transactionId, + long columnId, + @Nullable Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + RequestType.RequestGetRange requestType, + long timeoutMs) throws RocksDBException { + return requestAsync(new RocksDBAPICommand.GetRange<>(arena, + transactionId, + columnId, + startKeysInclusive, + endKeysExclusive, + reverse, + requestType, + timeoutMs + )); + } } diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java index d4591d8..0b09c62 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java @@ -31,7 +31,9 @@ public class RocksDBException extends RuntimeException { SST_WRITE_2, SST_WRITE_3, SST_WRITE_4, - SST_GET_SIZE_FAILED + SST_GET_SIZE_FAILED, + UNSUPPORTED_COLUMN_TYPE, + NOT_IMPLEMENTED } public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) { diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java index 015ee76..f21bacb 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java @@ -9,6 +9,7 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.CreateColumn; import it.cavallium.rockserver.core.common.RocksDBAPICommand.DeleteColumn; import it.cavallium.rockserver.core.common.RocksDBAPICommand.Get; import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetRange; import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator; import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction; import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put; @@ -119,4 +120,16 @@ public interface RocksDBSyncAPI extends RocksDBSyncAPIRequestHandler { @NotNull RequestType.RequestIterate requestType) throws RocksDBException { return requestSync(new Subsequent<>(arena, iterationId, skipCount, takeCount, requestType)); } + + /** See: {@link GetRange}. */ + default T getRange(Arena arena, + long transactionId, + long columnId, + @Nullable Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + @NotNull RequestType.RequestGetRange requestType, + long timeoutMs) throws RocksDBException { + return requestSync(new GetRange<>(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs)); + } } diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPIRequestHandler.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPIRequestHandler.java index a97bd4c..a388ba0 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPIRequestHandler.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPIRequestHandler.java @@ -1,24 +1,5 @@ package it.cavallium.rockserver.core.common; -import it.cavallium.rockserver.core.common.RequestType.RequestGet; -import it.cavallium.rockserver.core.common.RequestType.RequestPut; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseFailedUpdate; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseIterator; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseTransaction; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.CreateColumn; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.DeleteColumn; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.Get; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent; -import java.lang.foreign.Arena; -import java.lang.foreign.MemorySegment; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - public interface RocksDBSyncAPIRequestHandler { default R requestSync(RocksDBAPICommand req) { diff --git a/src/main/java/it/cavallium/rockserver/core/common/Utils.java b/src/main/java/it/cavallium/rockserver/core/common/Utils.java index d6323db..2d22acf 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/Utils.java +++ b/src/main/java/it/cavallium/rockserver/core/common/Utils.java @@ -8,6 +8,7 @@ import com.google.protobuf.ByteString; import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; @@ -157,6 +158,16 @@ public class Utils { == -1; } + public static int valueHash(MemorySegment value) { + value = requireNonNullElse(value, NULL); + int hash = 7; + var len = value.byteSize(); + for (long i = 0; i < len; i++) { + hash = hash * 31 + value.get(ValueLayout.JAVA_BYTE, i); + } + return hash; + } + public static HostAndPort parseHostAndPort(URI uri) { return new HostAndPort(uri.getHost(), parsePort(uri)); } diff --git a/src/main/java/it/cavallium/rockserver/core/impl/ColumnInstance.java b/src/main/java/it/cavallium/rockserver/core/impl/ColumnInstance.java index e0b48b3..f3ceaca 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/ColumnInstance.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/ColumnInstance.java @@ -31,6 +31,7 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi public static final OfChar BIG_ENDIAN_CHAR_UNALIGNED = OfByte.JAVA_CHAR_UNALIGNED.withOrder(ByteOrder.BIG_ENDIAN); public static final OfInt BIG_ENDIAN_INT_UNALIGNED = OfByte.JAVA_INT_UNALIGNED.withOrder(ByteOrder.BIG_ENDIAN); + private static final MemorySegment[] EMPTY_MEMORY_SEGMENT_ARRAY = new MemorySegment[0]; public ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema) { this(cfh, schema, calculateFinalKeySizeBytes(schema)); @@ -79,6 +80,36 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi return finalKey; } + /** + * @param bucketValue pass this parameter only if the columnInstance has variable-length keys + */ + @NotNull + public MemorySegment[] decodeKeys(Arena arena, MemorySegment calculatedKey, @Nullable MemorySegment bucketValue) { + validateFinalKeySize(calculatedKey); + MemorySegment[] finalKeys; + if (calculatedKey == MemorySegment.NULL) { + finalKeys = EMPTY_MEMORY_SEGMENT_ARRAY; + } else if (!hasBuckets()) { + if (schema.keysCount() == 1) { + finalKeys = new MemorySegment[] {calculatedKey}; + } else { + finalKeys = new MemorySegment[schema.keysCount()]; + long offsetBytes = 0; + for (int i = 0; i < schema.keysCount(); i++) { + var keyLength = schema.key(i); + var finalKey = finalKeys[i] = arena.allocate(keyLength); + MemorySegment.copy(calculatedKey, offsetBytes, finalKey, 0, keyLength); + offsetBytes += keyLength; + } + } + } else { + // todo: implement + throw RocksDBException.of(RocksDBErrorType.NOT_IMPLEMENTED, "Unsupported bucket columns, implement them"); + } + validateKeyCount(finalKeys); + return finalKeys; + } + private MemorySegment computeKeyAt(Arena arena, int i, MemorySegment[] keys) { if (i < schema.keysCount() - schema.variableLengthKeysCount()) { if (keys[i].byteSize() != schema.key(i)) { diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index 2ac9a01..bdd2130 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -901,7 +901,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { //noinspection resource it = getTransaction(transactionId, false).val().getIterator(ro, col.cfh()); } else { - it = db.get().newIterator(col.cfh()); + it = db.get().newIterator(col.cfh(), ro); } var itEntry = new REntry<>(it, new RocksDBObjects(ro)); return FastRandomUtils.allocateNewValue(its, itEntry, 1, Long.MAX_VALUE); @@ -947,11 +947,86 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } } + @SuppressWarnings("unchecked") + @Override + public T getRange(Arena arena, + long transactionId, + long columnId, + @Nullable Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + RequestType.@NotNull RequestGetRange requestType, + long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { + ops.beginOp(); + try { + var col = getColumn(columnId); + + if (requestType instanceof RequestType.RequestGetFirstAndLast) { + if (col.hasBuckets()) { + throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.UNSUPPORTED_COLUMN_TYPE, + "Can't get the first and last range element of a column with buckets"); + } + } + + try (var ro = new ReadOptions()) { + MemorySegment calculatedStartKey = startKeysInclusive != null ? col.calculateKey(arena, startKeysInclusive.keys()) : null; + MemorySegment calculatedEndKey = endKeysExclusive != null ? col.calculateKey(arena, endKeysExclusive.keys()) : null; + try (var startKeySlice = calculatedStartKey != null ? toDirectSlice(calculatedStartKey) : null; + var endKeySlice = calculatedEndKey != null ? toDirectSlice(calculatedEndKey) : null) { + if (startKeysInclusive != null) { + ro.setIterateLowerBound(startKeySlice); + } + if (endKeySlice != null) { + ro.setIterateUpperBound(endKeySlice); + } + + RocksIterator it; + if (transactionId > 0L) { + //noinspection resource + it = getTransaction(transactionId, false).val().getIterator(ro, col.cfh()); + } else { + it = db.get().newIterator(col.cfh(), ro); + } + try (it) { + return (T) switch (requestType) { + case RequestType.RequestGetFirstAndLast _ -> { + if (!reverse) { + it.seekToFirst(); + } else { + it.seekToLast(); + } + if (!it.isValid()) { + yield new FirstAndLast<>(null, null); + } + var calculatedKey = toMemorySegment(arena, it.key()); + var calculatedValue = col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL; + var first = decodeKVNoBuckets(arena, col, calculatedKey, calculatedValue); + + if (!reverse) { + it.seekToLast(); + } else { + it.seekToFirst(); + } + + calculatedKey = toMemorySegment(arena, it.key()); + calculatedValue = col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL; + var last = decodeKVNoBuckets(arena, col, calculatedKey, calculatedValue); + yield new FirstAndLast<>(first, last); + } + }; + } + } + } + } finally { + ops.endOp(); + } + } + private MemorySegment dbGet(Tx tx, - ColumnInstance col, - Arena arena, - ReadOptions readOptions, - MemorySegment calculatedKey) throws RocksDBException { + ColumnInstance col, + Arena arena, + ReadOptions readOptions, + MemorySegment calculatedKey) throws RocksDBException { if (tx != null) { byte[] previousRawBucketByteArray; if (tx.isFromGetForUpdate()) { @@ -1039,4 +1114,20 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { public DatabaseConfig getConfig() { return config; } + + private AbstractSlice toDirectSlice(MemorySegment calculatedKey) { + return new DirectSlice(calculatedKey.asByteBuffer(), (int) calculatedKey.byteSize()); + } + + private KV decodeKVNoBuckets(Arena arena, ColumnInstance col, MemorySegment calculatedKey, MemorySegment calculatedValue) { + var keys = col.decodeKeys(arena, calculatedKey, calculatedValue); + return new KV(new Keys(keys), calculatedValue); + } + + private KV decodeKV(Arena arena, ColumnInstance col, MemorySegment calculatedKey, MemorySegment calculatedValue) { + var keys = col.decodeKeys(arena, calculatedKey, calculatedValue); + // todo: implement + throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.NOT_IMPLEMENTED, + "Bucket column type not implemented, implement them"); + } } diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index e4766a8..8d3a550 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -4,6 +4,7 @@ import static it.cavallium.rockserver.core.common.Utils.toMemorySegment; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; +import com.google.protobuf.UnsafeByteOperations; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.netty.NettyServerBuilder; @@ -17,7 +18,6 @@ import io.netty.channel.epoll.EpollServerDomainSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.unix.DomainSocketAddress; -import io.netty.util.NettyRuntime; import it.cavallium.rockserver.core.client.RocksDBConnection; import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.ColumnHashType; @@ -35,6 +35,8 @@ import it.cavallium.rockserver.core.common.RequestType.RequestPrevious; import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence; import it.cavallium.rockserver.core.common.api.proto.*; import it.cavallium.rockserver.core.common.api.proto.Delta; +import it.cavallium.rockserver.core.common.api.proto.FirstAndLast; +import it.cavallium.rockserver.core.common.api.proto.KV; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceImplBase; import it.unimi.dsi.fastutil.ints.Int2IntFunction; import it.unimi.dsi.fastutil.ints.Int2ObjectFunction; @@ -45,9 +47,7 @@ import it.unimi.dsi.fastutil.objects.ObjectList; import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; -import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.net.UnixDomainSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletionException; @@ -56,6 +56,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; @@ -698,6 +700,33 @@ public class GrpcServer extends Server { }); } + @Override + public void getRangeFirstAndLast(GetRangeRequest request, StreamObserver responseObserver) { + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + it.cavallium.rockserver.core.common.FirstAndLast firstAndLast + = api.getRange(arena, + request.getTransactionId(), + request.getColumnId(), + mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive), + mapKeys(arena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive), + request.getReverse(), + RequestType.firstAndLast(), + request.getTimeoutMs() + ); + responseObserver.onNext(FirstAndLast.newBuilder() + .setFirst(unmapKV(firstAndLast.first())) + .setLast(unmapKV(firstAndLast.last())) + .build()); + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + handleError(responseObserver, ex); + } + }); + } + private static void closeArenaSafe(Arena autoArena) { if (autoArena != null) { try { @@ -710,6 +739,27 @@ public class GrpcServer extends Server { // mappers + private static KV unmapKV(it.cavallium.rockserver.core.common.KV kv) { + if (kv == null) return null; + return KV.newBuilder() + .addAllKeys(unmapKeys(kv.keys())) + .setValue(unmapValue(kv.value())) + .build(); + } + + private static List unmapKeys(@NotNull Keys keys) { + var result = new ArrayList(keys.keys().length); + for (@NotNull MemorySegment key : keys.keys()) { + result.add(UnsafeByteOperations.unsafeWrap(key.asByteBuffer())); + } + return result; + } + + private static ByteString unmapValue(@Nullable MemorySegment value) { + if (value == null) return null; + return UnsafeByteOperations.unsafeWrap(value.asByteBuffer()); + } + private static ColumnSchema mapColumnSchema(it.cavallium.rockserver.core.common.api.proto.ColumnSchema schema) { return ColumnSchema.of(mapKeysLength(schema.getFixedKeysCount(), schema::getFixedKeys), mapVariableTailKeys(schema.getVariableTailKeysCount(), schema::getVariableTailKeys), diff --git a/src/main/proto/rocksdb.proto b/src/main/proto/rocksdb.proto index 887fe92..4c59c15 100644 --- a/src/main/proto/rocksdb.proto +++ b/src/main/proto/rocksdb.proto @@ -94,6 +94,9 @@ message SeekToRequest {int64 iterationId = 1; repeated bytes keys = 2;} message SubsequentRequest {int64 iterationId = 1; int64 skipCount = 2; int64 takeCount = 3;} +message GetRangeRequest {int64 transactionId = 1; int64 columnId = 2; repeated bytes startKeysInclusive = 3; repeated bytes endKeysExclusive = 4; bool reverse = 5; int64 timeoutMs = 6;} +message FirstAndLast {optional KV first = 1; optional KV last = 2;} + service RocksDBService { rpc openTransaction(OpenTransactionRequest) returns (OpenTransactionResponse); rpc closeTransaction(CloseTransactionRequest) returns (CloseTransactionResponse); @@ -121,4 +124,5 @@ service RocksDBService { rpc subsequent(SubsequentRequest) returns (google.protobuf.Empty); rpc subsequentExists(SubsequentRequest) returns (PreviousPresence); rpc subsequentMultiGet(SubsequentRequest) returns (stream KV); + rpc getRangeFirstAndLast(GetRangeRequest) returns (FirstAndLast); } diff --git a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java index b70a461..4510f91 100644 --- a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java +++ b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java @@ -14,15 +14,16 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.*; import java.io.IOException; import java.lang.foreign.MemorySegment; -import org.junit.jupiter.api.Test; + import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +@TestMethodOrder(MethodOrderer.MethodName.class) abstract class EmbeddedDBTest { protected EmbeddedConnection db; @@ -89,6 +90,25 @@ abstract class EmbeddedDBTest { }); } + /** + * @return a sorted sequence of k-v pairs + */ + protected List getKVSequence() { + var result = new ArrayList(); + for (int i = 0; i < Byte.MAX_VALUE; i++) { + result.add(new KV(getKeyI(i), getValueI(i))); + } + return result; + } + + protected KV getKVSequenceFirst() { + return getKVSequence().getFirst(); + } + + protected KV getKVSequenceLast() { + return getKVSequence().getLast(); + } + protected boolean getHasValues() { return true; } @@ -363,6 +383,28 @@ abstract class EmbeddedDBTest { } } + @Test + void getRangeFirstAndLast() { + var firstKey = getKVSequenceFirst().keys(); + var lastKey = getKVSequenceLast().keys(); + var prevLastKV = getKVSequence().get(getKVSequence().size() - 2); + if (getSchemaVarKeys().isEmpty()) { + FirstAndLast firstAndLast = db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000); + Assertions.assertNull(firstAndLast.first(), "First should be empty because the db is empty"); + Assertions.assertNull(firstAndLast.last(), "Last should be empty because the db is empty"); + + fillSomeKeys(); + + firstAndLast = db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000); + Assertions.assertEquals(getKVSequenceFirst(), firstAndLast.first(), "First key mismatch"); + Assertions.assertEquals(prevLastKV, firstAndLast.last(), "Last key mismatch"); + } else { + Assertions.assertThrowsExactly(RocksDBException.class, () -> { + db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000); + }); + } + } + @Test void putBatchSST() { @NotNull Publisher<@NotNull KVBatch> batchPublisher = new Publisher() {