This commit is contained in:
Andrea Cavalli 2024-10-23 20:16:48 +02:00
parent d4ae772d80
commit d34f225b55
6 changed files with 56 additions and 21 deletions

View File

@ -15,14 +15,11 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
@ -97,6 +94,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
public <R, RS, RA> RA requestAsync(RocksDBAPICommand<R, RS, RA> req) { public <R, RS, RA> RA requestAsync(RocksDBAPICommand<R, RS, RA> req) {
return (RA) switch (req) { return (RA) switch (req) {
case RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch putBatch -> this.putBatchAsync(putBatch.columnId(), putBatch.batchPublisher(), putBatch.mode()); case RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch putBatch -> this.putBatchAsync(putBatch.columnId(), putBatch.batchPublisher(), putBatch.mode());
case RocksDBAPICommand.RocksDBAPICommandStream.GetRange<?> getRange -> this.getRangeAsync(getRange.arena(), getRange.transactionId(), getRange.columnId(), getRange.startKeysInclusive(), getRange.endKeysExclusive(), getRange.reverse(), getRange.requestType(), getRange.timeoutMs());
case RocksDBAPICommand.RocksDBAPICommandSingle<?> _ -> CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor); case RocksDBAPICommand.RocksDBAPICommandSingle<?> _ -> CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor);
case RocksDBAPICommand.RocksDBAPICommandStream<?> _ -> throw RocksDBException.of(RocksDBException.RocksDBErrorType.NOT_IMPLEMENTED, "The request of type " + req.getClass().getName() + " is not implemented in class " + this.getClass().getName()); case RocksDBAPICommand.RocksDBAPICommandStream<?> _ -> throw RocksDBException.of(RocksDBException.RocksDBErrorType.NOT_IMPLEMENTED, "The request of type " + req.getClass().getName() + " is not implemented in class " + this.getClass().getName());
}; };
@ -187,6 +185,6 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
@Override @Override
public <T> Publisher<T> getRangeAsync(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange<? super KV, T> requestType, long timeoutMs) throws RocksDBException { public <T> Publisher<T> getRangeAsync(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange<? super KV, T> requestType, long timeoutMs) throws RocksDBException {
return db.getRangeAsync(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs); return db.getRangeAsyncInternal(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs);
} }
} }

View File

@ -411,6 +411,9 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
result.hasFirst() ? mapKV(arena, result.getFirst()) : null, result.hasFirst() ? mapKV(arena, result.getFirst()) : null,
result.hasLast() ? mapKV(arena, result.getLast()) : null result.hasLast() ? mapKV(arena, result.getLast()) : null
)); ));
case RequestType.RequestEntriesCount<?> _ ->
toResponse(this.futureStub.reduceRangeEntriesCount(request), EntriesCount::getCount);
default -> throw new UnsupportedOperationException();
}; };
} }

View File

@ -160,7 +160,14 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
boolean reverse, boolean reverse,
RequestType.RequestGetRange<? super KV, T> requestType, RequestType.RequestGetRange<? super KV, T> requestType,
long timeoutMs) throws RocksDBException { long timeoutMs) throws RocksDBException {
throw RocksDBException.of(RocksDBException.RocksDBErrorType.NOT_IMPLEMENTED, return requestAsync(new GetRange<>(arena,
"GetRangeStream is not implemented"); transactionId,
columnId,
startKeysInclusive,
endKeysExclusive,
reverse,
requestType,
timeoutMs
));
} }
} }

View File

@ -975,11 +975,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
} }
try (var ro = new ReadOptions()) { try (var ro = new ReadOptions()) {
MemorySegment calculatedStartKey = startKeysInclusive != null ? col.calculateKey(arena, startKeysInclusive.keys()) : null; MemorySegment calculatedStartKey = startKeysInclusive != null && startKeysInclusive.keys().length > 0 ? col.calculateKey(arena, startKeysInclusive.keys()) : null;
MemorySegment calculatedEndKey = endKeysExclusive != null ? col.calculateKey(arena, endKeysExclusive.keys()) : null; MemorySegment calculatedEndKey = endKeysExclusive != null && endKeysExclusive.keys().length > 0 ? col.calculateKey(arena, endKeysExclusive.keys()) : null;
try (var startKeySlice = calculatedStartKey != null ? toDirectSlice(calculatedStartKey) : null; try (var startKeySlice = calculatedStartKey != null ? toDirectSlice(calculatedStartKey) : null;
var endKeySlice = calculatedEndKey != null ? toDirectSlice(calculatedEndKey) : null) { var endKeySlice = calculatedEndKey != null ? toDirectSlice(calculatedEndKey) : null) {
if (startKeysInclusive != null) { if (startKeySlice != null) {
ro.setIterateLowerBound(startKeySlice); ro.setIterateLowerBound(startKeySlice);
} }
if (endKeySlice != null) { if (endKeySlice != null) {
@ -996,16 +996,16 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
try (it) { try (it) {
return (T) switch (requestType) { return (T) switch (requestType) {
case RequestEntriesCount<?> _ -> { case RequestEntriesCount<?> _ -> {
if (calculatedStartKey != null || calculatedEndKey != null || path == null) {
long count = 0; long count = 0;
it.seekToFirst(); it.seekToFirst();
if (calculatedStartKey != null || calculatedEndKey != null) {
while (it.isValid()) { while (it.isValid()) {
count++; count++;
it.next(); it.next();
} }
yield count; yield count;
} else { } else {
Map<String, TableProperties> props = null; Map<String, TableProperties> props ;
try { try {
props = db.get().getPropertiesOfAllTables(col.cfh()); props = db.get().getPropertiesOfAllTables(col.cfh());
} catch (org.rocksdb.RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
@ -1054,12 +1054,12 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
@Override @Override
public <T> Stream<T> getRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange<? super KV, T> requestType, long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { public <T> Stream<T> getRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange<? super KV, T> requestType, long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
return Flux return Flux
.from(this.getRangeAsync(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs)) .from(this.getRangeAsyncInternal(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs))
.toStream(); .toStream();
} }
/** See: {@link it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange}. */ /** See: {@link it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange}. */
public <T> Publisher<T> getRangeAsync(Arena arena, public <T> Publisher<T> getRangeAsyncInternal(Arena arena,
long transactionId, long transactionId,
long columnId, long columnId,
@Nullable Keys startKeysInclusive, @Nullable Keys startKeysInclusive,
@ -1071,8 +1071,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
AbstractSlice<?> endKeySlice, RocksIterator it) { AbstractSlice<?> endKeySlice, RocksIterator it) {
public void close() { public void close() {
ro.close(); ro.close();
startKeySlice.close(); if (startKeySlice != null) startKeySlice.close();
endKeySlice.close(); if (endKeySlice != null) endKeySlice.close();
it.close(); it.close();
} }
} }
@ -1088,13 +1088,13 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
var ro = new ReadOptions(); var ro = new ReadOptions();
try { try {
MemorySegment calculatedStartKey = startKeysInclusive != null ? col.calculateKey(arena, startKeysInclusive.keys()) : null; MemorySegment calculatedStartKey = startKeysInclusive != null && startKeysInclusive.keys().length > 0 ? col.calculateKey(arena, startKeysInclusive.keys()) : null;
MemorySegment calculatedEndKey = endKeysExclusive != null ? col.calculateKey(arena, endKeysExclusive.keys()) : null; MemorySegment calculatedEndKey = endKeysExclusive != null && endKeysExclusive.keys().length > 0 ? col.calculateKey(arena, endKeysExclusive.keys()) : null;
var startKeySlice = calculatedStartKey != null ? toDirectSlice(calculatedStartKey) : null; var startKeySlice = calculatedStartKey != null ? toDirectSlice(calculatedStartKey) : null;
try { try {
var endKeySlice = calculatedEndKey != null ? toDirectSlice(calculatedEndKey) : null; var endKeySlice = calculatedEndKey != null ? toDirectSlice(calculatedEndKey) : null;
try { try {
if (startKeysInclusive != null) { if (startKeySlice != null) {
ro.setIterateLowerBound(startKeySlice); ro.setIterateLowerBound(startKeySlice);
} }
if (endKeySlice != null) { if (endKeySlice != null) {

View File

@ -489,6 +489,25 @@ public class GrpcServer extends Server {
}); });
} }
@Override
public Mono<EntriesCount> reduceRangeEntriesCount(GetRangeRequest request) {
return executeSync(() -> {
try (var arena = Arena.ofConfined()) {
long entriesCount
= api.reduceRange(arena,
request.getTransactionId(),
request.getColumnId(),
mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive),
mapKeys(arena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive),
request.getReverse(),
RequestType.entriesCount(),
request.getTimeoutMs()
);
return EntriesCount.newBuilder().setCount(entriesCount).build();
}
});
}
@Override @Override
public Flux<KV> getAllInRange(GetRangeRequest request) { public Flux<KV> getAllInRange(GetRangeRequest request) {
var arena = Arena.ofAuto(); var arena = Arena.ofAuto();
@ -502,7 +521,13 @@ public class GrpcServer extends Server {
RequestType.allInRange(), RequestType.allInRange(),
request.getTimeoutMs() request.getTimeoutMs()
)) ))
.map(GrpcServerImpl::unmapKV); .map(GrpcServerImpl::unmapKV)
.onErrorResume(ex -> {
if (!(ex instanceof RocksDBException)) {
LOG.error("Unexpected error during request: {}", request, ex);
}
return Mono.error(ex);
});
} }
private static void closeArenaSafe(Arena autoArena) { private static void closeArenaSafe(Arena autoArena) {

View File

@ -96,6 +96,7 @@ message SubsequentRequest {int64 iterationId = 1; int64 skipCount = 2; int64 tak
message GetRangeRequest {int64 transactionId = 1; int64 columnId = 2; repeated bytes startKeysInclusive = 3; repeated bytes endKeysExclusive = 4; bool reverse = 5; int64 timeoutMs = 6;} message GetRangeRequest {int64 transactionId = 1; int64 columnId = 2; repeated bytes startKeysInclusive = 3; repeated bytes endKeysExclusive = 4; bool reverse = 5; int64 timeoutMs = 6;}
message FirstAndLast {optional KV first = 1; optional KV last = 2;} message FirstAndLast {optional KV first = 1; optional KV last = 2;}
message EntriesCount {int64 count = 1;}
service RocksDBService { service RocksDBService {
rpc openTransaction(OpenTransactionRequest) returns (OpenTransactionResponse); rpc openTransaction(OpenTransactionRequest) returns (OpenTransactionResponse);
@ -125,5 +126,6 @@ service RocksDBService {
rpc subsequentExists(SubsequentRequest) returns (PreviousPresence); rpc subsequentExists(SubsequentRequest) returns (PreviousPresence);
rpc subsequentMultiGet(SubsequentRequest) returns (stream KV); rpc subsequentMultiGet(SubsequentRequest) returns (stream KV);
rpc reduceRangeFirstAndLast(GetRangeRequest) returns (FirstAndLast); rpc reduceRangeFirstAndLast(GetRangeRequest) returns (FirstAndLast);
rpc reduceRangeEntriesCount(GetRangeRequest) returns (EntriesCount);
rpc getAllInRange(GetRangeRequest) returns (stream KV); rpc getAllInRange(GetRangeRequest) returns (stream KV);
} }