diff --git a/src/main/java/it/cavallium/rockserver/core/client/CollectListMappedStreamObserver.java b/src/main/java/it/cavallium/rockserver/core/client/CollectListMappedStreamObserver.java deleted file mode 100644 index aa58652..0000000 --- a/src/main/java/it/cavallium/rockserver/core/client/CollectListMappedStreamObserver.java +++ /dev/null @@ -1,38 +0,0 @@ -package it.cavallium.rockserver.core.client; - -import io.grpc.stub.StreamObserver; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -class CollectListMappedStreamObserver extends CompletableFuture> implements StreamObserver { - - private final Function mapper; - private final List list; - - public CollectListMappedStreamObserver(Function mapper) { - this.mapper = mapper; - this.list = new ArrayList<>(); - } - - public CollectListMappedStreamObserver(Function mapper, int size) { - this.mapper = mapper; - this.list = new ArrayList<>(size); - } - - @Override - public void onNext(T t) { - this.list.add(mapper.apply(t)); - } - - @Override - public void onError(Throwable throwable) { - this.completeExceptionally(throwable); - } - - @Override - public void onCompleted() { - this.complete(this.list); - } -} diff --git a/src/main/java/it/cavallium/rockserver/core/client/CollectListStreamObserver.java b/src/main/java/it/cavallium/rockserver/core/client/CollectListStreamObserver.java deleted file mode 100644 index ef83b41..0000000 --- a/src/main/java/it/cavallium/rockserver/core/client/CollectListStreamObserver.java +++ /dev/null @@ -1,34 +0,0 @@ -package it.cavallium.rockserver.core.client; - -import io.grpc.stub.StreamObserver; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -class CollectListStreamObserver extends CompletableFuture> implements StreamObserver { - - private final List list; - - public CollectListStreamObserver() { - this.list = new ArrayList<>(); - } - - public CollectListStreamObserver(int size) { - this.list = new ArrayList<>(size); - } - - @Override - public void onNext(T t) { - this.list.add(t); - } - - @Override - public void onError(Throwable throwable) { - this.completeExceptionally(throwable); - } - - @Override - public void onCompleted() { - this.complete(this.list); - } -} diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java index e289b60..d21b53c 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java @@ -179,4 +179,14 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { public T reduceRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestReduceRange requestType, long timeoutMs) throws RocksDBException { return db.reduceRange(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs); } + + @Override + public Stream getRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange requestType, long timeoutMs) throws RocksDBException { + return db.getRange(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs); + } + + @Override + public Publisher getRangeAsync(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange requestType, long timeoutMs) throws RocksDBException { + return db.getRangeAsync(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs); + } } diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java index a18f176..e1bc50b 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -386,12 +386,11 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { case RequestNothing _ -> toResponse(this.futureStub.subsequent(request), _ -> null); case RequestExists _ -> (CompletableFuture) toResponse(this.futureStub.subsequentExists(request), PreviousPresence::getPresent); - case RequestMulti _ -> { - CollectListMappedStreamObserver responseObserver - = new CollectListMappedStreamObserver<>(kv -> mapByteString(kv.getValue())); - this.asyncStub.subsequentMultiGet(request, responseObserver); - yield (CompletableFuture) responseObserver; - } + case RequestMulti _ -> + (CompletableFuture) this.reactiveStub.subsequentMultiGet(request) + .map(kv -> mapByteString(kv.getValue())) + .collectList() + .toFuture(); }; } diff --git a/src/main/java/it/cavallium/rockserver/core/common/RequestType.java b/src/main/java/it/cavallium/rockserver/core/common/RequestType.java index 73b9a57..ddc8073 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RequestType.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RequestType.java @@ -18,7 +18,8 @@ public sealed interface RequestType { MULTI(new RequestMulti()), CHANGED(new RequestChanged()), PREVIOUS_PRESENCE(new RequestPreviousPresence()), - FIRST_AND_LAST(new RequestGetFirstAndLast()); + FIRST_AND_LAST(new RequestGetFirstAndLast()), + ALL_IN_RANGE(new RequestGetAllInRange()); private final RequestType requestType; @@ -221,13 +222,13 @@ public sealed interface RequestType { } } - record RequestGetAllInRange() implements RequestGetRange> { + record RequestGetAllInRange() implements RequestGetRange { private static final RequestGetAllInRange INSTANCE = new RequestGetAllInRange<>(); @Override public RequestTypeId getRequestTypeId() { - return RequestTypeId.FIRST_AND_LAST; + return RequestTypeId.ALL_IN_RANGE; } } } diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index e808e22..5536a1c 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; import org.cliffc.high_scale_lib.NonBlockingHashMapLong; import org.github.gestalt.config.exceptions.GestaltException; @@ -45,6 +46,8 @@ import org.rocksdb.RocksDBException; import org.rocksdb.Status.Code; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; public class EmbeddedDB implements RocksDBSyncAPI, Closeable { @@ -52,6 +55,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { public static final long MAX_TRANSACTION_DURATION_MS = 10_000L; private static final boolean USE_FAST_GET = true; private static final byte[] COLUMN_SCHEMAS_COLUMN = "_column_schemas_".getBytes(StandardCharsets.UTF_8); + private static final KV NO_MORE_RESULTS = new KV(new Keys(), null); private final Logger logger; private final @Nullable Path path; private final TransactionalDB db; @@ -1022,6 +1026,104 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } } + @Override + public Stream getRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange requestType, long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { + return Flux + .from(this.getRangeAsync(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs)) + .toStream(); + } + + /** See: {@link it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange}. */ + public Publisher getRangeAsync(Arena arena, + long transactionId, + long columnId, + @Nullable Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + RequestType.RequestGetRange requestType, + long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { + record Resources(ColumnInstance col, ReadOptions ro, AbstractSlice startKeySlice, + AbstractSlice endKeySlice, RocksIterator it) { + public void close() { + ro.close(); + startKeySlice.close(); + endKeySlice.close(); + it.close(); + } + } + return Flux.using(() -> { + var col = getColumn(columnId); + + if (requestType instanceof RequestType.RequestGetAllInRange) { + if (col.hasBuckets()) { + throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.UNSUPPORTED_COLUMN_TYPE, + "Can't get the range elements of a column with buckets"); + } + } + + var ro = new ReadOptions(); + try { + MemorySegment calculatedStartKey = startKeysInclusive != null ? col.calculateKey(arena, startKeysInclusive.keys()) : null; + MemorySegment calculatedEndKey = endKeysExclusive != null ? col.calculateKey(arena, endKeysExclusive.keys()) : null; + var startKeySlice = calculatedStartKey != null ? toDirectSlice(calculatedStartKey) : null; + try { + var endKeySlice = calculatedEndKey != null ? toDirectSlice(calculatedEndKey) : null; + try { + if (startKeysInclusive != null) { + ro.setIterateLowerBound(startKeySlice); + } + if (endKeySlice != null) { + ro.setIterateUpperBound(endKeySlice); + } + + RocksIterator it; + if (transactionId > 0L) { + //noinspection resource + it = getTransaction(transactionId, false).val().getIterator(ro, col.cfh()); + } else { + it = db.get().newIterator(col.cfh(), ro); + } + return new Resources(col, ro, startKeySlice, endKeySlice, it); + } catch (Throwable ex) { + if (endKeySlice != null) endKeySlice.close(); + throw ex; + } + } catch (Throwable ex) { + if (startKeySlice != null) startKeySlice.close(); + throw ex; + } + } catch (Throwable ex) { + ro.close(); + throw ex; + } + }, res -> Flux.generate(() -> { + if (!reverse) { + res.it.seekToFirst(); + } else { + res.it.seekToLast(); + } + return res.it; + }, (it, sink) -> { + if (!it.isValid()) { + sink.complete(); + } else { + var calculatedKey = toMemorySegment(arena, it.key()); + var calculatedValue = res.col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL; + //noinspection unchecked + sink.next((T) decodeKVNoBuckets(arena, res.col, calculatedKey, calculatedValue)); + if (!reverse) { + res.it.next(); + } else { + res.it.prev(); + } + } + return it; + }), Resources::close) + .subscribeOn(Schedulers.boundedElastic()) + .doFirst(ops::beginOp) + .doFinally(_ -> ops.endOp()); + } + private MemorySegment dbGet(Tx tx, ColumnInstance col, Arena arena, diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index 2305ce8..a1e0ddb 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -205,7 +205,7 @@ public class GrpcServer extends Server { return mapKVBatch(Arena.ofAuto(), batch.getEntriesCount(), batch::getEntries); }); - return Mono.fromFuture(asyncApi.putBatchAsync(initialRequest.getColumnId(), batches, mode)); + return Mono.fromFuture(() -> asyncApi.putBatchAsync(initialRequest.getColumnId(), batches, mode)); } else if (firstSignal.isOnComplete()) { return Mono.just(RocksDBException.of( RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request")); @@ -489,6 +489,22 @@ public class GrpcServer extends Server { }); } + @Override + public Flux getAllInRange(GetRangeRequest request) { + var arena = Arena.ofAuto(); + return Flux + .from(asyncApi.getRangeAsync(arena, + request.getTransactionId(), + request.getColumnId(), + mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive), + mapKeys(arena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive), + request.getReverse(), + RequestType.allInRange(), + request.getTimeoutMs() + )) + .map(GrpcServerImpl::unmapKV); + } + private static void closeArenaSafe(Arena autoArena) { if (autoArena != null) { try { diff --git a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java index 1a311a7..d63287b 100644 --- a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java +++ b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java @@ -7,10 +7,7 @@ import it.cavallium.rockserver.core.common.*; import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.objects.ObjectList; import java.lang.foreign.Arena; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; +import java.util.*; import java.util.concurrent.ThreadLocalRandom; import org.jetbrains.annotations.NotNull; @@ -18,6 +15,7 @@ import org.junit.jupiter.api.*; import java.io.IOException; import java.lang.foreign.MemorySegment; +import java.util.stream.Collectors; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -409,6 +407,53 @@ abstract class EmbeddedDBTest { } } + @Test + void getRangeAll() { + int initIndex = 1; + int count = 5; + + var rangeInitKey = getKVSequence().get(initIndex); + var rangeEndKeyExcl = getKVSequence().get(initIndex + count); + var rangeEndKeyIncl = getKVSequence().get(initIndex + count - 1); + if (getSchemaVarKeys().isEmpty()) { + var results = db.getRange(arena, 0, colId, rangeInitKey.keys(), rangeEndKeyExcl.keys(), false, RequestType.allInRange(), 1000).toList(); + Assertions.assertEquals(0, results.size(), "Results count must be 0"); + + fillSomeKeys(); + + boolean reverse = false; + while (true) { + results = db.getRange(arena, 0, colId, rangeInitKey.keys(), rangeEndKeyExcl.keys(), reverse, RequestType.allInRange(), 1000).toList(); + + var expectedResults = getKVSequence().stream().skip(initIndex).limit(count).collect(Collectors.toCollection(ArrayList::new)); + if (reverse) { + Collections.reverse(expectedResults); + } + assert expectedResults.size() == count; + + Assertions.assertEquals(count, results.size(), "Results count is not as expected. Reverse = " + reverse); + + for (int i = 0; i < expectedResults.size(); i++) { + var currentI = results.get(i); + var expectedI = expectedResults.get(i); + Assertions.assertEquals(expectedI, currentI, "Element at index " + i + " mismatch. Reverse = " + reverse); + } + + + if (!reverse) { + reverse = true; + } else { + break; + } + } + } else { + Assertions.assertThrowsExactly(RocksDBException.class, () -> { + db.getRange(arena, 0, colId, rangeInitKey.keys(), rangeEndKeyExcl.keys(), false, RequestType.allInRange(), 1000) + .toList(); + }); + } + } + @Test void putBatchSST() { @NotNull Publisher<@NotNull KVBatch> batchPublisher = new Publisher() {