Test getRange

This commit is contained in:
Andrea Cavalli 2024-10-22 14:45:44 +02:00
parent c6b4e62d74
commit f1ece117e1
8 changed files with 187 additions and 86 deletions

View File

@ -1,38 +0,0 @@
package it.cavallium.rockserver.core.client;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
class CollectListMappedStreamObserver<T, U> extends CompletableFuture<List<U>> implements StreamObserver<T> {
private final Function<T, U> mapper;
private final List<U> list;
public CollectListMappedStreamObserver(Function<T, U> mapper) {
this.mapper = mapper;
this.list = new ArrayList<>();
}
public CollectListMappedStreamObserver(Function<T, U> mapper, int size) {
this.mapper = mapper;
this.list = new ArrayList<>(size);
}
@Override
public void onNext(T t) {
this.list.add(mapper.apply(t));
}
@Override
public void onError(Throwable throwable) {
this.completeExceptionally(throwable);
}
@Override
public void onCompleted() {
this.complete(this.list);
}
}

View File

@ -1,34 +0,0 @@
package it.cavallium.rockserver.core.client;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
class CollectListStreamObserver<T> extends CompletableFuture<List<T>> implements StreamObserver<T> {
private final List<T> list;
public CollectListStreamObserver() {
this.list = new ArrayList<>();
}
public CollectListStreamObserver(int size) {
this.list = new ArrayList<>(size);
}
@Override
public void onNext(T t) {
this.list.add(t);
}
@Override
public void onError(Throwable throwable) {
this.completeExceptionally(throwable);
}
@Override
public void onCompleted() {
this.complete(this.list);
}
}

View File

@ -179,4 +179,14 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
public <T> T reduceRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestReduceRange<? super KV, T> requestType, long timeoutMs) throws RocksDBException {
return db.reduceRange(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs);
}
@Override
public <T> Stream<T> getRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange<? super KV, T> requestType, long timeoutMs) throws RocksDBException {
return db.getRange(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs);
}
@Override
public <T> Publisher<T> getRangeAsync(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange<? super KV, T> requestType, long timeoutMs) throws RocksDBException {
return db.getRangeAsync(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs);
}
}

View File

@ -386,12 +386,11 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
case RequestNothing<?> _ -> toResponse(this.futureStub.subsequent(request), _ -> null);
case RequestExists<?> _ ->
(CompletableFuture<T>) toResponse(this.futureStub.subsequentExists(request), PreviousPresence::getPresent);
case RequestMulti<?> _ -> {
CollectListMappedStreamObserver<KV, MemorySegment> responseObserver
= new CollectListMappedStreamObserver<>(kv -> mapByteString(kv.getValue()));
this.asyncStub.subsequentMultiGet(request, responseObserver);
yield (CompletableFuture<T>) responseObserver;
}
case RequestMulti<?> _ ->
(CompletableFuture<T>) this.reactiveStub.subsequentMultiGet(request)
.map(kv -> mapByteString(kv.getValue()))
.collectList()
.toFuture();
};
}

View File

@ -18,7 +18,8 @@ public sealed interface RequestType<METHOD_DATA_TYPE, RESULT_TYPE> {
MULTI(new RequestMulti()),
CHANGED(new RequestChanged()),
PREVIOUS_PRESENCE(new RequestPreviousPresence()),
FIRST_AND_LAST(new RequestGetFirstAndLast());
FIRST_AND_LAST(new RequestGetFirstAndLast()),
ALL_IN_RANGE(new RequestGetAllInRange());
private final RequestType requestType;
@ -221,13 +222,13 @@ public sealed interface RequestType<METHOD_DATA_TYPE, RESULT_TYPE> {
}
}
record RequestGetAllInRange<T>() implements RequestGetRange<T, FirstAndLast<T>> {
record RequestGetAllInRange<T>() implements RequestGetRange<T, T> {
private static final RequestGetAllInRange<Object> INSTANCE = new RequestGetAllInRange<>();
@Override
public RequestTypeId getRequestTypeId() {
return RequestTypeId.FIRST_AND_LAST;
return RequestTypeId.ALL_IN_RANGE;
}
}
}

View File

@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
import org.github.gestalt.config.exceptions.GestaltException;
@ -45,6 +46,8 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.Status.Code;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
@ -52,6 +55,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
public static final long MAX_TRANSACTION_DURATION_MS = 10_000L;
private static final boolean USE_FAST_GET = true;
private static final byte[] COLUMN_SCHEMAS_COLUMN = "_column_schemas_".getBytes(StandardCharsets.UTF_8);
private static final KV NO_MORE_RESULTS = new KV(new Keys(), null);
private final Logger logger;
private final @Nullable Path path;
private final TransactionalDB db;
@ -1022,6 +1026,104 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
}
}
@Override
public <T> Stream<T> getRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange<? super KV, T> requestType, long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
return Flux
.from(this.getRangeAsync(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs))
.toStream();
}
/** See: {@link it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange}. */
public <T> Publisher<T> getRangeAsync(Arena arena,
long transactionId,
long columnId,
@Nullable Keys startKeysInclusive,
@Nullable Keys endKeysExclusive,
boolean reverse,
RequestType.RequestGetRange<? super KV, T> requestType,
long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
record Resources(ColumnInstance col, ReadOptions ro, AbstractSlice<?> startKeySlice,
AbstractSlice<?> endKeySlice, RocksIterator it) {
public void close() {
ro.close();
startKeySlice.close();
endKeySlice.close();
it.close();
}
}
return Flux.using(() -> {
var col = getColumn(columnId);
if (requestType instanceof RequestType.RequestGetAllInRange<?>) {
if (col.hasBuckets()) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.UNSUPPORTED_COLUMN_TYPE,
"Can't get the range elements of a column with buckets");
}
}
var ro = new ReadOptions();
try {
MemorySegment calculatedStartKey = startKeysInclusive != null ? col.calculateKey(arena, startKeysInclusive.keys()) : null;
MemorySegment calculatedEndKey = endKeysExclusive != null ? col.calculateKey(arena, endKeysExclusive.keys()) : null;
var startKeySlice = calculatedStartKey != null ? toDirectSlice(calculatedStartKey) : null;
try {
var endKeySlice = calculatedEndKey != null ? toDirectSlice(calculatedEndKey) : null;
try {
if (startKeysInclusive != null) {
ro.setIterateLowerBound(startKeySlice);
}
if (endKeySlice != null) {
ro.setIterateUpperBound(endKeySlice);
}
RocksIterator it;
if (transactionId > 0L) {
//noinspection resource
it = getTransaction(transactionId, false).val().getIterator(ro, col.cfh());
} else {
it = db.get().newIterator(col.cfh(), ro);
}
return new Resources(col, ro, startKeySlice, endKeySlice, it);
} catch (Throwable ex) {
if (endKeySlice != null) endKeySlice.close();
throw ex;
}
} catch (Throwable ex) {
if (startKeySlice != null) startKeySlice.close();
throw ex;
}
} catch (Throwable ex) {
ro.close();
throw ex;
}
}, res -> Flux.<T, RocksIterator>generate(() -> {
if (!reverse) {
res.it.seekToFirst();
} else {
res.it.seekToLast();
}
return res.it;
}, (it, sink) -> {
if (!it.isValid()) {
sink.complete();
} else {
var calculatedKey = toMemorySegment(arena, it.key());
var calculatedValue = res.col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL;
//noinspection unchecked
sink.next((T) decodeKVNoBuckets(arena, res.col, calculatedKey, calculatedValue));
if (!reverse) {
res.it.next();
} else {
res.it.prev();
}
}
return it;
}), Resources::close)
.subscribeOn(Schedulers.boundedElastic())
.doFirst(ops::beginOp)
.doFinally(_ -> ops.endOp());
}
private MemorySegment dbGet(Tx tx,
ColumnInstance col,
Arena arena,

View File

@ -205,7 +205,7 @@ public class GrpcServer extends Server {
return mapKVBatch(Arena.ofAuto(), batch.getEntriesCount(), batch::getEntries);
});
return Mono.fromFuture(asyncApi.putBatchAsync(initialRequest.getColumnId(), batches, mode));
return Mono.fromFuture(() -> asyncApi.putBatchAsync(initialRequest.getColumnId(), batches, mode));
} else if (firstSignal.isOnComplete()) {
return Mono.just(RocksDBException.of(
RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request"));
@ -489,6 +489,22 @@ public class GrpcServer extends Server {
});
}
@Override
public Flux<KV> getAllInRange(GetRangeRequest request) {
var arena = Arena.ofAuto();
return Flux
.from(asyncApi.getRangeAsync(arena,
request.getTransactionId(),
request.getColumnId(),
mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive),
mapKeys(arena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive),
request.getReverse(),
RequestType.allInRange(),
request.getTimeoutMs()
))
.map(GrpcServerImpl::unmapKV);
}
private static void closeArenaSafe(Arena autoArena) {
if (autoArena != null) {
try {

View File

@ -7,10 +7,7 @@ import it.cavallium.rockserver.core.common.*;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.objects.ObjectList;
import java.lang.foreign.Arena;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import org.jetbrains.annotations.NotNull;
@ -18,6 +15,7 @@ import org.junit.jupiter.api.*;
import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@ -409,6 +407,53 @@ abstract class EmbeddedDBTest {
}
}
@Test
void getRangeAll() {
int initIndex = 1;
int count = 5;
var rangeInitKey = getKVSequence().get(initIndex);
var rangeEndKeyExcl = getKVSequence().get(initIndex + count);
var rangeEndKeyIncl = getKVSequence().get(initIndex + count - 1);
if (getSchemaVarKeys().isEmpty()) {
var results = db.getRange(arena, 0, colId, rangeInitKey.keys(), rangeEndKeyExcl.keys(), false, RequestType.allInRange(), 1000).toList();
Assertions.assertEquals(0, results.size(), "Results count must be 0");
fillSomeKeys();
boolean reverse = false;
while (true) {
results = db.getRange(arena, 0, colId, rangeInitKey.keys(), rangeEndKeyExcl.keys(), reverse, RequestType.allInRange(), 1000).toList();
var expectedResults = getKVSequence().stream().skip(initIndex).limit(count).collect(Collectors.toCollection(ArrayList::new));
if (reverse) {
Collections.reverse(expectedResults);
}
assert expectedResults.size() == count;
Assertions.assertEquals(count, results.size(), "Results count is not as expected. Reverse = " + reverse);
for (int i = 0; i < expectedResults.size(); i++) {
var currentI = results.get(i);
var expectedI = expectedResults.get(i);
Assertions.assertEquals(expectedI, currentI, "Element at index " + i + " mismatch. Reverse = " + reverse);
}
if (!reverse) {
reverse = true;
} else {
break;
}
}
} else {
Assertions.assertThrowsExactly(RocksDBException.class, () -> {
db.getRange(arena, 0, colId, rangeInitKey.keys(), rangeEndKeyExcl.keys(), false, RequestType.allInRange(), 1000)
.toList();
});
}
}
@Test
void putBatchSST() {
@NotNull Publisher<@NotNull KVBatch> batchPublisher = new Publisher<KVBatch>() {