Add GetRange request, with FirstAndLast mode

This commit is contained in:
Andrea Cavalli 2024-10-18 16:58:27 +02:00
parent 97cf151afb
commit 397b9e0353
17 changed files with 449 additions and 41 deletions

View File

@ -168,4 +168,9 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
@NotNull RequestType.RequestIterate<? super MemorySegment, T> requestType) throws RocksDBException {
return db.subsequent(arena, iterationId, skipCount, takeCount, requestType);
}
@Override
public <T> T getRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange<? super KV, T> requestType, long timeoutMs) throws RocksDBException {
return db.getRange(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs);
}
}

View File

@ -9,18 +9,16 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.*;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.FirstAndLast;
import it.cavallium.rockserver.core.common.KVBatch;
import it.cavallium.rockserver.core.common.PutBatchMode;
import it.cavallium.rockserver.core.common.RequestType.RequestChanged;
@ -36,16 +34,17 @@ import it.cavallium.rockserver.core.common.Utils.HostAndPort;
import it.cavallium.rockserver.core.common.api.proto.*;
import it.cavallium.rockserver.core.common.api.proto.ColumnHashType;
import it.cavallium.rockserver.core.common.api.proto.Delta;
import it.cavallium.rockserver.core.common.api.proto.KV;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceBlockingStub;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub;
import it.unimi.dsi.fastutil.ints.Int2ObjectFunction;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.UnixDomainSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
@ -63,6 +62,8 @@ import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static it.cavallium.rockserver.core.common.Utils.toMemorySegment;
public class GrpcConnection extends BaseConnection implements RocksDBAPI {
private static final Logger LOG = LoggerFactory.getLogger(GrpcConnection.class);
@ -505,6 +506,26 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
};
}
@SuppressWarnings("unchecked")
@Override
public <T> CompletableFuture<T> getRangeAsync(Arena arena, long transactionId, long columnId, @NotNull Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange<? super it.cavallium.rockserver.core.common.KV, T> requestType, long timeoutMs) throws RocksDBException {
var request = GetRangeRequest.newBuilder()
.setTransactionId(transactionId)
.setColumnId(columnId)
.addAllStartKeysInclusive(mapKeys(startKeysInclusive))
.addAllEndKeysExclusive(mapKeys(endKeysExclusive))
.setReverse(reverse)
.setTimeoutMs(timeoutMs)
.build();
return (CompletableFuture<T>) switch (requestType) {
case RequestType.RequestGetFirstAndLast<?> _ ->
toResponse(this.futureStub.getRangeFirstAndLast(request), result -> new FirstAndLast<>(
result.hasFirst() ? mapKV(arena, result.getFirst()) : null,
result.hasLast() ? mapKV(arena, result.getLast()) : null
));
};
}
private static it.cavallium.rockserver.core.common.Delta<MemorySegment> mapDelta(Delta x) {
return new it.cavallium.rockserver.core.common.Delta<>(
x.hasPrevious() ? mapByteString(x.getPrevious()) : null,
@ -555,6 +576,21 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
.build();
}
private static it.cavallium.rockserver.core.common.KV mapKV(Arena arena, @NotNull KV entry) {
return new it.cavallium.rockserver.core.common.KV(
mapKeys(arena, entry.getKeysCount(), entry::getKeys),
toMemorySegment(arena, entry.getValue())
);
}
private static Keys mapKeys(Arena arena, int count, Int2ObjectFunction<ByteString> keyGetterAt) {
var segments = new MemorySegment[count];
for (int i = 0; i < count; i++) {
segments[i] = toMemorySegment(arena, keyGetterAt.apply(i));
}
return new Keys(segments);
}
private static Iterable<? extends ByteString> mapKeys(Keys keys) {
if (keys == null) return List.of();
return Iterables.transform(Arrays.asList(keys.keys()), k -> UnsafeByteOperations.unsafeWrap(k.asByteBuffer()));

View File

@ -0,0 +1,6 @@
package it.cavallium.rockserver.core.common;
import org.jetbrains.annotations.Nullable;
public record FirstAndLast<T>(@Nullable T first, @Nullable T last) {
}

View File

@ -0,0 +1,30 @@
package it.cavallium.rockserver.core.common;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.lang.foreign.MemorySegment;
import java.util.Objects;
public record KV(@NotNull Keys keys, @Nullable MemorySegment value) {
@Override
public String toString() {
return "KV{" + keys + "=" + (value != null ? Utils.toPrettyString(value) : "null") + '}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
KV kv = (KV) o;
return Objects.equals(keys, kv.keys) && Utils.valueEquals(value, kv.value);
}
@Override
public int hashCode() {
int hash = 7;
hash = 31 * hash + Objects.hashCode(keys);
hash = 31 * hash + Utils.valueHash(value);
return hash;
}
}

View File

@ -1,6 +1,7 @@
package it.cavallium.rockserver.core.common;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
@ -21,11 +22,25 @@ public record Keys(@NotNull MemorySegment @NotNull ... keys) {
return false;
}
Keys keys1 = (Keys) o;
return Arrays.equals(keys, keys1.keys);
if (keys.length != keys1.keys.length) {
return false;
}
for (int i = 0; i < keys.length; i++) {
var k1 = keys[i];
var k2 = keys1.keys[i];
if (!Utils.valueEquals(k1, k2)) {
return false;
}
}
return true;
}
@Override
public int hashCode() {
return Arrays.hashCode(keys);
int hash = 7;
for (@NotNull MemorySegment key : keys) {
hash = hash * 31 + Utils.valueHash(key);
}
return hash;
}
}

View File

@ -1,6 +1,8 @@
package it.cavallium.rockserver.core.common;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -16,7 +18,8 @@ public sealed interface RequestType<METHOD_DATA_TYPE, RESULT_TYPE> {
DELTA(new RequestDelta()),
MULTI(new RequestMulti()),
CHANGED(new RequestChanged()),
PREVIOUS_PRESENCE(new RequestPreviousPresence());
PREVIOUS_PRESENCE(new RequestPreviousPresence()),
FIRST_AND_LAST(new RequestGetFirstAndLast());
private final RequestType requestType;
@ -91,6 +94,11 @@ public sealed interface RequestType<METHOD_DATA_TYPE, RESULT_TYPE> {
return (RequestPreviousPresence<T>) RequestPreviousPresence.INSTANCE;
}
@SuppressWarnings("unchecked")
static <T> RequestGetFirstAndLast<T> firstAndLast() {
return (RequestGetFirstAndLast<T>) RequestGetFirstAndLast.INSTANCE;
}
@SuppressWarnings("unchecked")
static <T> RequestNothing<T> none() {
return (RequestNothing<T>) RequestNothing.INSTANCE;
@ -102,6 +110,8 @@ public sealed interface RequestType<METHOD_DATA_TYPE, RESULT_TYPE> {
sealed interface RequestGet<T, U> extends RequestType<T, U> {}
sealed interface RequestGetRange<T, U> extends RequestType<T, U> {}
sealed interface RequestIterate<T, U> extends RequestType<T, U> {}
record RequestNothing<T>() implements RequestPut<T, Void>, RequestPatch<T, Void>, RequestIterate<T, Void>,
@ -194,4 +204,14 @@ public sealed interface RequestType<METHOD_DATA_TYPE, RESULT_TYPE> {
return RequestTypeId.PREVIOUS_PRESENCE;
}
}
record RequestGetFirstAndLast<T>() implements RequestGetRange<T, FirstAndLast<T>> {
private static final RequestGetFirstAndLast<Object> INSTANCE = new RequestGetFirstAndLast<>();
@Override
public RequestTypeId getRequestTypeId() {
return RequestTypeId.FIRST_AND_LAST;
}
}
}

View File

@ -5,7 +5,7 @@ import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -385,4 +385,54 @@ public sealed interface RocksDBAPICommand<R> {
}
}
/**
* Get some values in a range
* <p>
* Returns the result
*
* @param arena arena
* @param transactionId transaction id, or 0
* @param columnId column id
* @param startKeysInclusive start keys, inclusive. [] means "the beginning"
* @param endKeysExclusive end keys, exclusive. Null means "the end"
* @param reverse if true, seek in reverse direction
* @param requestType the request type determines which type of data will be returned.
* @param timeoutMs timeout in milliseconds
*/
record GetRange<T>(Arena arena,
long transactionId,
long columnId,
@Nullable Keys startKeysInclusive,
@Nullable Keys endKeysExclusive,
boolean reverse,
RequestType.RequestGetRange<? super KV, T> requestType,
long timeoutMs) implements RocksDBAPICommand<T> {
@Override
public T handleSync(RocksDBSyncAPI api) {
return api.getRange(arena,
transactionId,
columnId,
startKeysInclusive,
endKeysExclusive,
reverse,
requestType,
timeoutMs
);
}
@Override
public CompletableFuture<T> handleAsync(RocksDBAsyncAPI api) {
return api.getRangeAsync(arena,
transactionId,
columnId,
startKeysInclusive,
endKeysExclusive,
reverse,
requestType,
timeoutMs
);
}
}
}

View File

@ -9,7 +9,7 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.CreateColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.DeleteColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Get;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetRange;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti;
@ -22,6 +22,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
@ -77,7 +78,7 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
/** See: {@link PutBatch}. */
default CompletableFuture<Void> putBatchAsync(long columnId,
@NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher,
@NotNull Publisher<@NotNull KVBatch> batchPublisher,
@NotNull PutBatchMode mode) throws RocksDBException {
return requestAsync(new PutBatch(columnId, batchPublisher, mode));
}
@ -99,7 +100,7 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
@Nullable Keys endKeysExclusive,
boolean reverse,
long timeoutMs) throws RocksDBException {
return requestAsync(new OpenIterator(arena,
return requestAsync(new RocksDBAPICommand.OpenIterator(arena,
transactionId,
columnId,
startKeysInclusive,
@ -127,4 +128,24 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
@NotNull RequestType.RequestIterate<? super MemorySegment, T> requestType) throws RocksDBException {
return requestAsync(new Subsequent<>(arena, iterationId, skipCount, takeCount, requestType));
}
/** See: {@link GetRange}. */
default <T> CompletableFuture<T> getRangeAsync(Arena arena,
long transactionId,
long columnId,
@Nullable Keys startKeysInclusive,
@Nullable Keys endKeysExclusive,
boolean reverse,
RequestType.RequestGetRange<? super KV, T> requestType,
long timeoutMs) throws RocksDBException {
return requestAsync(new RocksDBAPICommand.GetRange<>(arena,
transactionId,
columnId,
startKeysInclusive,
endKeysExclusive,
reverse,
requestType,
timeoutMs
));
}
}

View File

@ -31,7 +31,9 @@ public class RocksDBException extends RuntimeException {
SST_WRITE_2,
SST_WRITE_3,
SST_WRITE_4,
SST_GET_SIZE_FAILED
SST_GET_SIZE_FAILED,
UNSUPPORTED_COLUMN_TYPE,
NOT_IMPLEMENTED
}
public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) {

View File

@ -9,6 +9,7 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.CreateColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.DeleteColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Get;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetRange;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put;
@ -119,4 +120,16 @@ public interface RocksDBSyncAPI extends RocksDBSyncAPIRequestHandler {
@NotNull RequestType.RequestIterate<? super MemorySegment, T> requestType) throws RocksDBException {
return requestSync(new Subsequent<>(arena, iterationId, skipCount, takeCount, requestType));
}
/** See: {@link GetRange}. */
default <T> T getRange(Arena arena,
long transactionId,
long columnId,
@Nullable Keys startKeysInclusive,
@Nullable Keys endKeysExclusive,
boolean reverse,
@NotNull RequestType.RequestGetRange<? super KV, T> requestType,
long timeoutMs) throws RocksDBException {
return requestSync(new GetRange<>(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs));
}
}

View File

@ -1,24 +1,5 @@
package it.cavallium.rockserver.core.common;
import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseFailedUpdate;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CreateColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.DeleteColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Get;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public interface RocksDBSyncAPIRequestHandler {
default <R> R requestSync(RocksDBAPICommand<R> req) {

View File

@ -8,6 +8,7 @@ import com.google.protobuf.ByteString;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
@ -157,6 +158,16 @@ public class Utils {
== -1;
}
public static int valueHash(MemorySegment value) {
value = requireNonNullElse(value, NULL);
int hash = 7;
var len = value.byteSize();
for (long i = 0; i < len; i++) {
hash = hash * 31 + value.get(ValueLayout.JAVA_BYTE, i);
}
return hash;
}
public static HostAndPort parseHostAndPort(URI uri) {
return new HostAndPort(uri.getHost(), parsePort(uri));
}

View File

@ -31,6 +31,7 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi
public static final OfChar BIG_ENDIAN_CHAR_UNALIGNED = OfByte.JAVA_CHAR_UNALIGNED.withOrder(ByteOrder.BIG_ENDIAN);
public static final OfInt BIG_ENDIAN_INT_UNALIGNED = OfByte.JAVA_INT_UNALIGNED.withOrder(ByteOrder.BIG_ENDIAN);
private static final MemorySegment[] EMPTY_MEMORY_SEGMENT_ARRAY = new MemorySegment[0];
public ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema) {
this(cfh, schema, calculateFinalKeySizeBytes(schema));
@ -79,6 +80,36 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi
return finalKey;
}
/**
* @param bucketValue pass this parameter only if the columnInstance has variable-length keys
*/
@NotNull
public MemorySegment[] decodeKeys(Arena arena, MemorySegment calculatedKey, @Nullable MemorySegment bucketValue) {
validateFinalKeySize(calculatedKey);
MemorySegment[] finalKeys;
if (calculatedKey == MemorySegment.NULL) {
finalKeys = EMPTY_MEMORY_SEGMENT_ARRAY;
} else if (!hasBuckets()) {
if (schema.keysCount() == 1) {
finalKeys = new MemorySegment[] {calculatedKey};
} else {
finalKeys = new MemorySegment[schema.keysCount()];
long offsetBytes = 0;
for (int i = 0; i < schema.keysCount(); i++) {
var keyLength = schema.key(i);
var finalKey = finalKeys[i] = arena.allocate(keyLength);
MemorySegment.copy(calculatedKey, offsetBytes, finalKey, 0, keyLength);
offsetBytes += keyLength;
}
}
} else {
// todo: implement
throw RocksDBException.of(RocksDBErrorType.NOT_IMPLEMENTED, "Unsupported bucket columns, implement them");
}
validateKeyCount(finalKeys);
return finalKeys;
}
private MemorySegment computeKeyAt(Arena arena, int i, MemorySegment[] keys) {
if (i < schema.keysCount() - schema.variableLengthKeysCount()) {
if (keys[i].byteSize() != schema.key(i)) {

View File

@ -901,7 +901,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
//noinspection resource
it = getTransaction(transactionId, false).val().getIterator(ro, col.cfh());
} else {
it = db.get().newIterator(col.cfh());
it = db.get().newIterator(col.cfh(), ro);
}
var itEntry = new REntry<>(it, new RocksDBObjects(ro));
return FastRandomUtils.allocateNewValue(its, itEntry, 1, Long.MAX_VALUE);
@ -947,11 +947,86 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
}
}
@SuppressWarnings("unchecked")
@Override
public <T> T getRange(Arena arena,
long transactionId,
long columnId,
@Nullable Keys startKeysInclusive,
@Nullable Keys endKeysExclusive,
boolean reverse,
RequestType.@NotNull RequestGetRange<? super KV, T> requestType,
long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
ops.beginOp();
try {
var col = getColumn(columnId);
if (requestType instanceof RequestType.RequestGetFirstAndLast<?>) {
if (col.hasBuckets()) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.UNSUPPORTED_COLUMN_TYPE,
"Can't get the first and last range element of a column with buckets");
}
}
try (var ro = new ReadOptions()) {
MemorySegment calculatedStartKey = startKeysInclusive != null ? col.calculateKey(arena, startKeysInclusive.keys()) : null;
MemorySegment calculatedEndKey = endKeysExclusive != null ? col.calculateKey(arena, endKeysExclusive.keys()) : null;
try (var startKeySlice = calculatedStartKey != null ? toDirectSlice(calculatedStartKey) : null;
var endKeySlice = calculatedEndKey != null ? toDirectSlice(calculatedEndKey) : null) {
if (startKeysInclusive != null) {
ro.setIterateLowerBound(startKeySlice);
}
if (endKeySlice != null) {
ro.setIterateUpperBound(endKeySlice);
}
RocksIterator it;
if (transactionId > 0L) {
//noinspection resource
it = getTransaction(transactionId, false).val().getIterator(ro, col.cfh());
} else {
it = db.get().newIterator(col.cfh(), ro);
}
try (it) {
return (T) switch (requestType) {
case RequestType.RequestGetFirstAndLast<?> _ -> {
if (!reverse) {
it.seekToFirst();
} else {
it.seekToLast();
}
if (!it.isValid()) {
yield new FirstAndLast<>(null, null);
}
var calculatedKey = toMemorySegment(arena, it.key());
var calculatedValue = col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL;
var first = decodeKVNoBuckets(arena, col, calculatedKey, calculatedValue);
if (!reverse) {
it.seekToLast();
} else {
it.seekToFirst();
}
calculatedKey = toMemorySegment(arena, it.key());
calculatedValue = col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL;
var last = decodeKVNoBuckets(arena, col, calculatedKey, calculatedValue);
yield new FirstAndLast<>(first, last);
}
};
}
}
}
} finally {
ops.endOp();
}
}
private MemorySegment dbGet(Tx tx,
ColumnInstance col,
Arena arena,
ReadOptions readOptions,
MemorySegment calculatedKey) throws RocksDBException {
ColumnInstance col,
Arena arena,
ReadOptions readOptions,
MemorySegment calculatedKey) throws RocksDBException {
if (tx != null) {
byte[] previousRawBucketByteArray;
if (tx.isFromGetForUpdate()) {
@ -1039,4 +1114,20 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
public DatabaseConfig getConfig() {
return config;
}
private AbstractSlice<?> toDirectSlice(MemorySegment calculatedKey) {
return new DirectSlice(calculatedKey.asByteBuffer(), (int) calculatedKey.byteSize());
}
private KV decodeKVNoBuckets(Arena arena, ColumnInstance col, MemorySegment calculatedKey, MemorySegment calculatedValue) {
var keys = col.decodeKeys(arena, calculatedKey, calculatedValue);
return new KV(new Keys(keys), calculatedValue);
}
private KV decodeKV(Arena arena, ColumnInstance col, MemorySegment calculatedKey, MemorySegment calculatedValue) {
var keys = col.decodeKeys(arena, calculatedKey, calculatedValue);
// todo: implement
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.NOT_IMPLEMENTED,
"Bucket column type not implemented, implement them");
}
}

View File

@ -4,6 +4,7 @@ import static it.cavallium.rockserver.core.common.Utils.toMemorySegment;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.NettyServerBuilder;
@ -17,7 +18,6 @@ import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.util.NettyRuntime;
import it.cavallium.rockserver.core.client.RocksDBConnection;
import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.ColumnHashType;
@ -35,6 +35,8 @@ import it.cavallium.rockserver.core.common.RequestType.RequestPrevious;
import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence;
import it.cavallium.rockserver.core.common.api.proto.*;
import it.cavallium.rockserver.core.common.api.proto.Delta;
import it.cavallium.rockserver.core.common.api.proto.FirstAndLast;
import it.cavallium.rockserver.core.common.api.proto.KV;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceImplBase;
import it.unimi.dsi.fastutil.ints.Int2IntFunction;
import it.unimi.dsi.fastutil.ints.Int2ObjectFunction;
@ -45,9 +47,7 @@ import it.unimi.dsi.fastutil.objects.ObjectList;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnixDomainSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionException;
@ -56,6 +56,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
@ -698,6 +700,33 @@ public class GrpcServer extends Server {
});
}
@Override
public void getRangeFirstAndLast(GetRangeRequest request, StreamObserver<FirstAndLast> responseObserver) {
executor.execute(() -> {
try {
try (var arena = Arena.ofConfined()) {
it.cavallium.rockserver.core.common.FirstAndLast<it.cavallium.rockserver.core.common.KV> firstAndLast
= api.getRange(arena,
request.getTransactionId(),
request.getColumnId(),
mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive),
mapKeys(arena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive),
request.getReverse(),
RequestType.firstAndLast(),
request.getTimeoutMs()
);
responseObserver.onNext(FirstAndLast.newBuilder()
.setFirst(unmapKV(firstAndLast.first()))
.setLast(unmapKV(firstAndLast.last()))
.build());
}
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
});
}
private static void closeArenaSafe(Arena autoArena) {
if (autoArena != null) {
try {
@ -710,6 +739,27 @@ public class GrpcServer extends Server {
// mappers
private static KV unmapKV(it.cavallium.rockserver.core.common.KV kv) {
if (kv == null) return null;
return KV.newBuilder()
.addAllKeys(unmapKeys(kv.keys()))
.setValue(unmapValue(kv.value()))
.build();
}
private static List<ByteString> unmapKeys(@NotNull Keys keys) {
var result = new ArrayList<ByteString>(keys.keys().length);
for (@NotNull MemorySegment key : keys.keys()) {
result.add(UnsafeByteOperations.unsafeWrap(key.asByteBuffer()));
}
return result;
}
private static ByteString unmapValue(@Nullable MemorySegment value) {
if (value == null) return null;
return UnsafeByteOperations.unsafeWrap(value.asByteBuffer());
}
private static ColumnSchema mapColumnSchema(it.cavallium.rockserver.core.common.api.proto.ColumnSchema schema) {
return ColumnSchema.of(mapKeysLength(schema.getFixedKeysCount(), schema::getFixedKeys),
mapVariableTailKeys(schema.getVariableTailKeysCount(), schema::getVariableTailKeys),

View File

@ -94,6 +94,9 @@ message SeekToRequest {int64 iterationId = 1; repeated bytes keys = 2;}
message SubsequentRequest {int64 iterationId = 1; int64 skipCount = 2; int64 takeCount = 3;}
message GetRangeRequest {int64 transactionId = 1; int64 columnId = 2; repeated bytes startKeysInclusive = 3; repeated bytes endKeysExclusive = 4; bool reverse = 5; int64 timeoutMs = 6;}
message FirstAndLast {optional KV first = 1; optional KV last = 2;}
service RocksDBService {
rpc openTransaction(OpenTransactionRequest) returns (OpenTransactionResponse);
rpc closeTransaction(CloseTransactionRequest) returns (CloseTransactionResponse);
@ -121,4 +124,5 @@ service RocksDBService {
rpc subsequent(SubsequentRequest) returns (google.protobuf.Empty);
rpc subsequentExists(SubsequentRequest) returns (PreviousPresence);
rpc subsequentMultiGet(SubsequentRequest) returns (stream KV);
rpc getRangeFirstAndLast(GetRangeRequest) returns (FirstAndLast);
}

View File

@ -14,15 +14,16 @@ import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.*;
import java.io.IOException;
import java.lang.foreign.MemorySegment;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@TestMethodOrder(MethodOrderer.MethodName.class)
abstract class EmbeddedDBTest {
protected EmbeddedConnection db;
@ -89,6 +90,25 @@ abstract class EmbeddedDBTest {
});
}
/**
* @return a sorted sequence of k-v pairs
*/
protected List<KV> getKVSequence() {
var result = new ArrayList<KV>();
for (int i = 0; i < Byte.MAX_VALUE; i++) {
result.add(new KV(getKeyI(i), getValueI(i)));
}
return result;
}
protected KV getKVSequenceFirst() {
return getKVSequence().getFirst();
}
protected KV getKVSequenceLast() {
return getKVSequence().getLast();
}
protected boolean getHasValues() {
return true;
}
@ -363,6 +383,28 @@ abstract class EmbeddedDBTest {
}
}
@Test
void getRangeFirstAndLast() {
var firstKey = getKVSequenceFirst().keys();
var lastKey = getKVSequenceLast().keys();
var prevLastKV = getKVSequence().get(getKVSequence().size() - 2);
if (getSchemaVarKeys().isEmpty()) {
FirstAndLast<KV> firstAndLast = db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000);
Assertions.assertNull(firstAndLast.first(), "First should be empty because the db is empty");
Assertions.assertNull(firstAndLast.last(), "Last should be empty because the db is empty");
fillSomeKeys();
firstAndLast = db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000);
Assertions.assertEquals(getKVSequenceFirst(), firstAndLast.first(), "First key mismatch");
Assertions.assertEquals(prevLastKV, firstAndLast.last(), "Last key mismatch");
} else {
Assertions.assertThrowsExactly(RocksDBException.class, () -> {
db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000);
});
}
}
@Test
void putBatchSST() {
@NotNull Publisher<@NotNull KVBatch> batchPublisher = new Publisher<KVBatch>() {