Partial commit

This commit is contained in:
Andrea Cavalli 2024-10-18 18:03:17 +02:00
parent 397b9e0353
commit 9e06f9b9c2
13 changed files with 619 additions and 491 deletions

View File

@ -151,6 +151,11 @@
<artifactId>reactive-streams</artifactId> <artifactId>reactive-streams</artifactId>
<version>1.0.4</version> <version>1.0.4</version>
</dependency> </dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.4</version>
</dependency>
<dependency> <dependency>
<groupId>org.lz4</groupId> <groupId>org.lz4</groupId>

View File

@ -30,6 +30,7 @@ module rockserver.core {
requires io.netty.transport.classes.epoll; requires io.netty.transport.classes.epoll;
requires org.reactivestreams; requires org.reactivestreams;
requires io.netty.transport.unix.common; requires io.netty.transport.unix.common;
requires reactor.core;
exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common; exports it.cavallium.rockserver.core.common;

View File

@ -15,9 +15,14 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
@ -83,17 +88,18 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
} }
@Override @Override
public <R> R requestSync(RocksDBAPICommand<R> req) { public <R, RS> RS requestSync(RocksDBAPICommand<R, RS, ?> req) {
return req.handleSync(this); return req.handleSync(this);
} }
@SuppressWarnings("unchecked")
@Override @Override
public <R> CompletableFuture<R> requestAsync(RocksDBAPICommand<R> req) { public <R, RS, RA> RA requestAsync(RocksDBAPICommand<R, RS, RA> req) {
if (req instanceof RocksDBAPICommand.PutBatch putBatch) { return (RA) switch (req) {
//noinspection unchecked case RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch putBatch -> this.putBatchAsync(putBatch.columnId(), putBatch.batchPublisher(), putBatch.mode());
return (CompletableFuture<R>) this.putBatchAsync(putBatch.columnId(), putBatch.batchPublisher(), putBatch.mode()); case RocksDBAPICommand.RocksDBAPICommandSingle<?> _ -> CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor);
} case RocksDBAPICommand.RocksDBAPICommandStream<?> _ -> throw RocksDBException.of(RocksDBException.RocksDBErrorType.NOT_IMPLEMENTED, "The request of type " + req.getClass().getName() + " is not implemented in class " + this.getClass().getName());
return CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor); };
} }
@Override @Override
@ -170,7 +176,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
} }
@Override @Override
public <T> T getRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange<? super KV, T> requestType, long timeoutMs) throws RocksDBException { public <T> T reduceRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange<? super KV, T> requestType, long timeoutMs) throws RocksDBException {
return db.getRange(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs); return db.reduceRange(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs);
} }
} }

View File

@ -61,6 +61,7 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import static it.cavallium.rockserver.core.common.Utils.toMemorySegment; import static it.cavallium.rockserver.core.common.Utils.toMemorySegment;
@ -133,12 +134,19 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
return this; return this;
} }
@SuppressWarnings("unchecked")
@Override @Override
public <R> R requestSync(RocksDBAPICommand<R> req) { public <R, RS, RA> RS requestSync(RocksDBAPICommand<R, RS, RA> req) {
var asyncResponse = req.handleAsync(this); return (RS) switch (req) {
return asyncResponse case RocksDBAPICommand.RocksDBAPICommandSingle<?> _ -> {
.toCompletableFuture() var asyncResponse = (CompletableFuture<R>) req.handleAsync(this);
.join(); yield asyncResponse.join();
}
case RocksDBAPICommand.RocksDBAPICommandStream<?> _ -> {
var asyncResponse = (Publisher<R>) req.handleAsync(this);
yield Flux.from(asyncResponse).toStream();
}
};
} }
@Override @Override
@ -508,7 +516,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <T> CompletableFuture<T> getRangeAsync(Arena arena, long transactionId, long columnId, @NotNull Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange<? super it.cavallium.rockserver.core.common.KV, T> requestType, long timeoutMs) throws RocksDBException { public <T> CompletableFuture<T> reduceRangeAsync(Arena arena, long transactionId, long columnId, @NotNull Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange<? super it.cavallium.rockserver.core.common.KV, T> requestType, long timeoutMs) throws RocksDBException {
var request = GetRangeRequest.newBuilder() var request = GetRangeRequest.newBuilder()
.setTransactionId(transactionId) .setTransactionId(transactionId)
.setColumnId(columnId) .setColumnId(columnId)
@ -526,6 +534,11 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
}; };
} }
@Override
public <T> Publisher<T> getRangeStream(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange<? super it.cavallium.rockserver.core.common.KV, T> requestType, long timeoutMs) throws RocksDBException {
// todo: implement
}
private static it.cavallium.rockserver.core.common.Delta<MemorySegment> mapDelta(Delta x) { private static it.cavallium.rockserver.core.common.Delta<MemorySegment> mapDelta(Delta x) {
return new it.cavallium.rockserver.core.common.Delta<>( return new it.cavallium.rockserver.core.common.Delta<>(
x.hasPrevious() ? mapByteString(x.getPrevious()) : null, x.hasPrevious() ? mapByteString(x.getPrevious()) : null,

View File

@ -7,13 +7,19 @@ import java.lang.foreign.MemorySegment;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
public sealed interface RocksDBAPICommand<R> { public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_RESULT> {
SYNC_RESULT handleSync(RocksDBSyncAPI api);
ASYNC_RESULT handleAsync(RocksDBAsyncAPI api);
sealed interface RocksDBAPICommandSingle<R> extends RocksDBAPICommand<R, R, CompletableFuture<R>> {
R handleSync(RocksDBSyncAPI api);
CompletionStage<R> handleAsync(RocksDBAsyncAPI api);
/** /**
* Open a transaction * Open a transaction
@ -22,7 +28,7 @@ public sealed interface RocksDBAPICommand<R> {
* *
* @param timeoutMs timeout in milliseconds * @param timeoutMs timeout in milliseconds
*/ */
record OpenTransaction(long timeoutMs) implements RocksDBAPICommand<Long> { record OpenTransaction(long timeoutMs) implements RocksDBAPICommandSingle<Long> {
@Override @Override
public Long handleSync(RocksDBSyncAPI api) { public Long handleSync(RocksDBSyncAPI api) {
@ -30,7 +36,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<Long> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<Long> handleAsync(RocksDBAsyncAPI api) {
return api.openTransactionAsync(timeoutMs); return api.openTransactionAsync(timeoutMs);
} }
@ -43,7 +49,7 @@ public sealed interface RocksDBAPICommand<R> {
* @param transactionId transaction id to close * @param transactionId transaction id to close
* @param commit true to commit the transaction, false to rollback it * @param commit true to commit the transaction, false to rollback it
*/ */
record CloseTransaction(long transactionId, boolean commit) implements RocksDBAPICommand<Boolean> { record CloseTransaction(long transactionId, boolean commit) implements RocksDBAPICommandSingle<Boolean> {
@Override @Override
public Boolean handleSync(RocksDBSyncAPI api) { public Boolean handleSync(RocksDBSyncAPI api) {
@ -51,7 +57,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<Boolean> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<Boolean> handleAsync(RocksDBAsyncAPI api) {
return api.closeTransactionAsync(transactionId, commit); return api.closeTransactionAsync(transactionId, commit);
} }
@ -61,7 +67,7 @@ public sealed interface RocksDBAPICommand<R> {
* *
* @param updateId update id to close * @param updateId update id to close
*/ */
record CloseFailedUpdate(long updateId) implements RocksDBAPICommand<Void> { record CloseFailedUpdate(long updateId) implements RocksDBAPICommandSingle<Void> {
@Override @Override
public Void handleSync(RocksDBSyncAPI api) { public Void handleSync(RocksDBSyncAPI api) {
@ -70,7 +76,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<Void> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<Void> handleAsync(RocksDBAsyncAPI api) {
return api.closeFailedUpdateAsync(updateId); return api.closeFailedUpdateAsync(updateId);
} }
@ -83,7 +89,7 @@ public sealed interface RocksDBAPICommand<R> {
* @param name column name * @param name column name
* @param schema column key-value schema * @param schema column key-value schema
*/ */
record CreateColumn(String name, @NotNull ColumnSchema schema) implements RocksDBAPICommand<Long> { record CreateColumn(String name, @NotNull ColumnSchema schema) implements RocksDBAPICommandSingle<Long> {
@Override @Override
public Long handleSync(RocksDBSyncAPI api) { public Long handleSync(RocksDBSyncAPI api) {
@ -91,7 +97,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<Long> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<Long> handleAsync(RocksDBAsyncAPI api) {
return api.createColumnAsync(name, schema); return api.createColumnAsync(name, schema);
} }
@ -100,7 +106,7 @@ public sealed interface RocksDBAPICommand<R> {
* Delete a column * Delete a column
* @param columnId column id * @param columnId column id
*/ */
record DeleteColumn(long columnId) implements RocksDBAPICommand<Void> { record DeleteColumn(long columnId) implements RocksDBAPICommandSingle<Void> {
@Override @Override
public Void handleSync(RocksDBSyncAPI api) { public Void handleSync(RocksDBSyncAPI api) {
@ -109,7 +115,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<Void> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<Void> handleAsync(RocksDBAsyncAPI api) {
return api.deleteColumnAsync(columnId); return api.deleteColumnAsync(columnId);
} }
@ -121,7 +127,7 @@ public sealed interface RocksDBAPICommand<R> {
* *
* @param name column name * @param name column name
*/ */
record GetColumnId(@NotNull String name) implements RocksDBAPICommand<Long> { record GetColumnId(@NotNull String name) implements RocksDBAPICommandSingle<Long> {
@Override @Override
public Long handleSync(RocksDBSyncAPI api) { public Long handleSync(RocksDBSyncAPI api) {
@ -129,7 +135,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<Long> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<Long> handleAsync(RocksDBAsyncAPI api) {
return api.getColumnIdAsync(name); return api.getColumnIdAsync(name);
} }
@ -148,7 +154,7 @@ public sealed interface RocksDBAPICommand<R> {
long columnId, long columnId,
Keys keys, Keys keys,
@NotNull MemorySegment value, @NotNull MemorySegment value,
RequestPut<? super MemorySegment, T> requestType) implements RocksDBAPICommand<T> { RequestPut<? super MemorySegment, T> requestType) implements RocksDBAPICommandSingle<T> {
@Override @Override
public T handleSync(RocksDBSyncAPI api) { public T handleSync(RocksDBSyncAPI api) {
@ -156,7 +162,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<T> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<T> handleAsync(RocksDBAsyncAPI api) {
return api.putAsync(arena, transactionOrUpdateId, columnId, keys, value, requestType); return api.putAsync(arena, transactionOrUpdateId, columnId, keys, value, requestType);
} }
@ -185,7 +191,7 @@ public sealed interface RocksDBAPICommand<R> {
record PutMulti<T>(Arena arena, long transactionOrUpdateId, long columnId, record PutMulti<T>(Arena arena, long transactionOrUpdateId, long columnId,
@NotNull List<Keys> keys, @NotNull List<Keys> keys,
@NotNull List<@NotNull MemorySegment> values, @NotNull List<@NotNull MemorySegment> values,
RequestPut<? super MemorySegment, T> requestType) implements RocksDBAPICommand<List<T>> { RequestPut<? super MemorySegment, T> requestType) implements RocksDBAPICommandSingle<List<T>> {
@Override @Override
public List<T> handleSync(RocksDBSyncAPI api) { public List<T> handleSync(RocksDBSyncAPI api) {
@ -193,7 +199,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<List<T>> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<List<T>> handleAsync(RocksDBAsyncAPI api) {
return api.putMultiAsync(arena, transactionOrUpdateId, columnId, keys, values, requestType); return api.putMultiAsync(arena, transactionOrUpdateId, columnId, keys, values, requestType);
} }
@ -223,7 +229,7 @@ public sealed interface RocksDBAPICommand<R> {
*/ */
record PutBatch(long columnId, record PutBatch(long columnId,
@NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher, @NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher,
@NotNull PutBatchMode mode) implements RocksDBAPICommand<Void> { @NotNull PutBatchMode mode) implements RocksDBAPICommandSingle<Void> {
@Override @Override
public Void handleSync(RocksDBSyncAPI api) { public Void handleSync(RocksDBSyncAPI api) {
@ -232,7 +238,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<Void> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<Void> handleAsync(RocksDBAsyncAPI api) {
return api.putBatchAsync(columnId, batchPublisher, mode); return api.putBatchAsync(columnId, batchPublisher, mode);
} }
@ -257,7 +263,7 @@ public sealed interface RocksDBAPICommand<R> {
long transactionOrUpdateId, long transactionOrUpdateId,
long columnId, long columnId,
Keys keys, Keys keys,
RequestGet<? super MemorySegment, T> requestType) implements RocksDBAPICommand<T> { RequestGet<? super MemorySegment, T> requestType) implements RocksDBAPICommandSingle<T> {
@Override @Override
public T handleSync(RocksDBSyncAPI api) { public T handleSync(RocksDBSyncAPI api) {
@ -265,7 +271,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<T> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<T> handleAsync(RocksDBAsyncAPI api) {
return api.getAsync(arena, transactionOrUpdateId, columnId, keys, requestType); return api.getAsync(arena, transactionOrUpdateId, columnId, keys, requestType);
} }
@ -300,7 +306,7 @@ public sealed interface RocksDBAPICommand<R> {
Keys startKeysInclusive, Keys startKeysInclusive,
@Nullable Keys endKeysExclusive, @Nullable Keys endKeysExclusive,
boolean reverse, boolean reverse,
long timeoutMs) implements RocksDBAPICommand<Long> { long timeoutMs) implements RocksDBAPICommandSingle<Long> {
@Override @Override
public Long handleSync(RocksDBSyncAPI api) { public Long handleSync(RocksDBSyncAPI api) {
@ -308,7 +314,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<Long> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<Long> handleAsync(RocksDBAsyncAPI api) {
return api.openIteratorAsync(arena, return api.openIteratorAsync(arena,
transactionId, transactionId,
columnId, columnId,
@ -324,7 +330,7 @@ public sealed interface RocksDBAPICommand<R> {
* Close an iterator * Close an iterator
* @param iteratorId iterator id * @param iteratorId iterator id
*/ */
record CloseIterator(long iteratorId) implements RocksDBAPICommand<Void> { record CloseIterator(long iteratorId) implements RocksDBAPICommandSingle<Void> {
@Override @Override
public Void handleSync(RocksDBSyncAPI api) { public Void handleSync(RocksDBSyncAPI api) {
@ -333,7 +339,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<Void> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<Void> handleAsync(RocksDBAsyncAPI api) {
return api.closeIteratorAsync(iteratorId); return api.closeIteratorAsync(iteratorId);
} }
@ -344,8 +350,7 @@ public sealed interface RocksDBAPICommand<R> {
* @param iterationId iteration id * @param iterationId iteration id
* @param keys keys, inclusive. [] means "the beginning" * @param keys keys, inclusive. [] means "the beginning"
*/ */
record SeekTo(Arena arena, long iterationId, Keys keys) implements record SeekTo(Arena arena, long iterationId, Keys keys) implements RocksDBAPICommandSingle<Void> {
RocksDBAPICommand<Void> {
@Override @Override
public Void handleSync(RocksDBSyncAPI api) { public Void handleSync(RocksDBSyncAPI api) {
@ -354,7 +359,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<Void> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<Void> handleAsync(RocksDBAsyncAPI api) {
return api.seekToAsync(arena, iterationId, keys); return api.seekToAsync(arena, iterationId, keys);
} }
@ -372,7 +377,7 @@ public sealed interface RocksDBAPICommand<R> {
long skipCount, long skipCount,
long takeCount, long takeCount,
@NotNull RequestType.RequestIterate<? super MemorySegment, T> requestType) @NotNull RequestType.RequestIterate<? super MemorySegment, T> requestType)
implements RocksDBAPICommand<T> { implements RocksDBAPICommandSingle<T> {
@Override @Override
public T handleSync(RocksDBSyncAPI api) { public T handleSync(RocksDBSyncAPI api) {
@ -380,11 +385,64 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletionStage<T> handleAsync(RocksDBAsyncAPI api) { public CompletableFuture<T> handleAsync(RocksDBAsyncAPI api) {
return api.subsequentAsync(arena, iterationId, skipCount, takeCount, requestType); return api.subsequentAsync(arena, iterationId, skipCount, takeCount, requestType);
} }
} }
/**
* Reduce values in a range
* <p>
* Returns the result
*
* @param arena arena
* @param transactionId transaction id, or 0
* @param columnId column id
* @param startKeysInclusive start keys, inclusive. [] means "the beginning"
* @param endKeysExclusive end keys, exclusive. Null means "the end"
* @param reverse if true, seek in reverse direction
* @param requestType the request type determines which type of data will be returned.
* @param timeoutMs timeout in milliseconds
*/
record ReduceRange<T>(Arena arena,
long transactionId,
long columnId,
@Nullable Keys startKeysInclusive,
@Nullable Keys endKeysExclusive,
boolean reverse,
RequestType.RequestGetRange<? super KV, T> requestType,
long timeoutMs) implements RocksDBAPICommandSingle<T> {
@Override
public T handleSync(RocksDBSyncAPI api) {
return api.reduceRange(arena,
transactionId,
columnId,
startKeysInclusive,
endKeysExclusive,
reverse,
requestType,
timeoutMs
);
}
@Override
public CompletableFuture<T> handleAsync(RocksDBAsyncAPI api) {
return api.reduceRangeAsync(arena,
transactionId,
columnId,
startKeysInclusive,
endKeysExclusive,
reverse,
requestType,
timeoutMs
);
}
}
}
sealed interface RocksDBAPICommandStream<R> extends RocksDBAPICommand<R, Stream<R>, Publisher<R>> {
/** /**
* Get some values in a range * Get some values in a range
* <p> * <p>
@ -406,10 +464,10 @@ public sealed interface RocksDBAPICommand<R> {
@Nullable Keys endKeysExclusive, @Nullable Keys endKeysExclusive,
boolean reverse, boolean reverse,
RequestType.RequestGetRange<? super KV, T> requestType, RequestType.RequestGetRange<? super KV, T> requestType,
long timeoutMs) implements RocksDBAPICommand<T> { long timeoutMs) implements RocksDBAPICommandStream<T> {
@Override @Override
public T handleSync(RocksDBSyncAPI api) { public Stream<T> handleSync(RocksDBSyncAPI api) {
return api.getRange(arena, return api.getRange(arena,
transactionId, transactionId,
columnId, columnId,
@ -422,7 +480,7 @@ public sealed interface RocksDBAPICommand<R> {
} }
@Override @Override
public CompletableFuture<T> handleAsync(RocksDBAsyncAPI api) { public Publisher<T> handleAsync(RocksDBAsyncAPI api) {
return api.getRangeAsync(arena, return api.getRangeAsync(arena,
transactionId, transactionId,
columnId, columnId,
@ -435,4 +493,5 @@ public sealed interface RocksDBAPICommand<R> {
} }
} }
}
} }

View File

@ -2,20 +2,21 @@ package it.cavallium.rockserver.core.common;
import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut; import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseFailedUpdate; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseFailedUpdate;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseIterator; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseTransaction; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CreateColumn; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CreateColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.DeleteColumn; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.DeleteColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Get; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Get;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.GetColumnId;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetRange; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.ReduceRange;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Put;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutBatch; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutMulti;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.SeekTo;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Subsequent;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import java.util.List; import java.util.List;
@ -100,7 +101,7 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
@Nullable Keys endKeysExclusive, @Nullable Keys endKeysExclusive,
boolean reverse, boolean reverse,
long timeoutMs) throws RocksDBException { long timeoutMs) throws RocksDBException {
return requestAsync(new RocksDBAPICommand.OpenIterator(arena, return requestAsync(new OpenIterator(arena,
transactionId, transactionId,
columnId, columnId,
startKeysInclusive, startKeysInclusive,
@ -129,8 +130,8 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
return requestAsync(new Subsequent<>(arena, iterationId, skipCount, takeCount, requestType)); return requestAsync(new Subsequent<>(arena, iterationId, skipCount, takeCount, requestType));
} }
/** See: {@link GetRange}. */ /** See: {@link ReduceRange}. */
default <T> CompletableFuture<T> getRangeAsync(Arena arena, default <T> CompletableFuture<T> reduceRangeAsync(Arena arena,
long transactionId, long transactionId,
long columnId, long columnId,
@Nullable Keys startKeysInclusive, @Nullable Keys startKeysInclusive,
@ -138,7 +139,7 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
boolean reverse, boolean reverse,
RequestType.RequestGetRange<? super KV, T> requestType, RequestType.RequestGetRange<? super KV, T> requestType,
long timeoutMs) throws RocksDBException { long timeoutMs) throws RocksDBException {
return requestAsync(new RocksDBAPICommand.GetRange<>(arena, return requestAsync(new ReduceRange<>(arena,
transactionId, transactionId,
columnId, columnId,
startKeysInclusive, startKeysInclusive,
@ -148,4 +149,17 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
timeoutMs timeoutMs
)); ));
} }
/** See: {@link GetRange}. */
default <T> Publisher<T> getRangeAsync(Arena arena,
long transactionId,
long columnId,
@Nullable Keys startKeysInclusive,
@Nullable Keys endKeysExclusive,
boolean reverse,
RequestType.RequestGetRange<? super KV, T> requestType,
long timeoutMs) throws RocksDBException {
throw RocksDBException.of(RocksDBException.RocksDBErrorType.NOT_IMPLEMENTED,
"GetRangeStream is not implemented");
}
} }

View File

@ -1,10 +1,20 @@
package it.cavallium.rockserver.core.common; package it.cavallium.rockserver.core.common;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
public interface RocksDBAsyncAPIRequestHandler { public interface RocksDBAsyncAPIRequestHandler {
default <R> CompletableFuture<R> requestAsync(RocksDBAPICommand<R> req) { @SuppressWarnings("unchecked")
return CompletableFuture.failedFuture(new UnsupportedOperationException("Unsupported request type: " + req)); default <R, RS, RA> RA requestAsync(RocksDBAPICommand<R, RS, RA> req) {
return (RA) switch (req) {
case RocksDBAPICommand.RocksDBAPICommandStream<?> _ ->
(Publisher<R>) subscriber ->
subscriber.onError(new UnsupportedOperationException("Unsupported request type: " + req));
case RocksDBAPICommand.RocksDBAPICommandSingle<?> _ ->
CompletableFuture.<R>failedFuture(new UnsupportedOperationException("Unsupported request type: " + req));
};
} }
} }

View File

@ -2,24 +2,27 @@ package it.cavallium.rockserver.core.common;
import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut; import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseFailedUpdate; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseFailedUpdate;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseIterator; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseTransaction; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CreateColumn; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CreateColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.DeleteColumn; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.DeleteColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Get; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Get;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.GetColumnId;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetRange; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.ReduceRange;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Put;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutMulti;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutBatch; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.SeekTo;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Subsequent;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import java.util.List; import java.util.List;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
@ -121,8 +124,20 @@ public interface RocksDBSyncAPI extends RocksDBSyncAPIRequestHandler {
return requestSync(new Subsequent<>(arena, iterationId, skipCount, takeCount, requestType)); return requestSync(new Subsequent<>(arena, iterationId, skipCount, takeCount, requestType));
} }
/** See: {@link ReduceRange}. */
default <T> T reduceRange(Arena arena,
long transactionId,
long columnId,
@Nullable Keys startKeysInclusive,
@Nullable Keys endKeysExclusive,
boolean reverse,
@NotNull RequestType.RequestGetRange<? super KV, T> requestType,
long timeoutMs) throws RocksDBException {
return requestSync(new ReduceRange<>(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs));
}
/** See: {@link GetRange}. */ /** See: {@link GetRange}. */
default <T> T getRange(Arena arena, default <T> Stream<T> getRange(Arena arena,
long transactionId, long transactionId,
long columnId, long columnId,
@Nullable Keys startKeysInclusive, @Nullable Keys startKeysInclusive,

View File

@ -2,7 +2,7 @@ package it.cavallium.rockserver.core.common;
public interface RocksDBSyncAPIRequestHandler { public interface RocksDBSyncAPIRequestHandler {
default <R> R requestSync(RocksDBAPICommand<R> req) { default <R, RS, RA> RS requestSync(RocksDBAPICommand<R, RS, RA> req) {
throw new UnsupportedOperationException("Unsupported request type: " + req); throw new UnsupportedOperationException("Unsupported request type: " + req);
} }
} }

View File

@ -949,7 +949,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <T> T getRange(Arena arena, public <T> T reduceRange(Arena arena,
long transactionId, long transactionId,
long columnId, long columnId,
@Nullable Keys startKeysInclusive, @Nullable Keys startKeysInclusive,

View File

@ -706,7 +706,7 @@ public class GrpcServer extends Server {
try { try {
try (var arena = Arena.ofConfined()) { try (var arena = Arena.ofConfined()) {
it.cavallium.rockserver.core.common.FirstAndLast<it.cavallium.rockserver.core.common.KV> firstAndLast it.cavallium.rockserver.core.common.FirstAndLast<it.cavallium.rockserver.core.common.KV> firstAndLast
= api.getRange(arena, = api.reduceRange(arena,
request.getTransactionId(), request.getTransactionId(),
request.getColumnId(), request.getColumnId(),
mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive), mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive),

View File

@ -125,4 +125,5 @@ service RocksDBService {
rpc subsequentExists(SubsequentRequest) returns (PreviousPresence); rpc subsequentExists(SubsequentRequest) returns (PreviousPresence);
rpc subsequentMultiGet(SubsequentRequest) returns (stream KV); rpc subsequentMultiGet(SubsequentRequest) returns (stream KV);
rpc getRangeFirstAndLast(GetRangeRequest) returns (FirstAndLast); rpc getRangeFirstAndLast(GetRangeRequest) returns (FirstAndLast);
rpc getRangeStream(GetRangeRequest) returns (stream KV);
} }

View File

@ -384,23 +384,27 @@ abstract class EmbeddedDBTest {
} }
@Test @Test
void getRangeFirstAndLast() { void reduceRangeFirstAndLast() {
var firstKey = getKVSequenceFirst().keys(); var firstKey = getKVSequenceFirst().keys();
var lastKey = getKVSequenceLast().keys(); var lastKey = getKVSequenceLast().keys();
var prevLastKV = getKVSequence().get(getKVSequence().size() - 2); var prevLastKV = getKVSequence().get(getKVSequence().size() - 2);
if (getSchemaVarKeys().isEmpty()) { if (getSchemaVarKeys().isEmpty()) {
FirstAndLast<KV> firstAndLast = db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000); FirstAndLast<KV> firstAndLast = db.reduceRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000);
Assertions.assertNull(firstAndLast.first(), "First should be empty because the db is empty"); Assertions.assertNull(firstAndLast.first(), "First should be empty because the db is empty");
Assertions.assertNull(firstAndLast.last(), "Last should be empty because the db is empty"); Assertions.assertNull(firstAndLast.last(), "Last should be empty because the db is empty");
fillSomeKeys(); fillSomeKeys();
firstAndLast = db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000); firstAndLast = db.reduceRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000);
Assertions.assertEquals(getKVSequenceFirst(), firstAndLast.first(), "First key mismatch"); Assertions.assertEquals(getKVSequenceFirst(), firstAndLast.first(), "First key mismatch");
Assertions.assertEquals(prevLastKV, firstAndLast.last(), "Last key mismatch"); Assertions.assertEquals(prevLastKV, firstAndLast.last(), "Last key mismatch");
firstAndLast = db.reduceRange(arena, 0, colId, firstKey, firstKey, false, RequestType.firstAndLast(), 1000);
Assertions.assertNull(firstAndLast.first(), "First should be empty because the range is empty");
Assertions.assertNull(firstAndLast.last(), "Last should be empty because the range is empty");
} else { } else {
Assertions.assertThrowsExactly(RocksDBException.class, () -> { Assertions.assertThrowsExactly(RocksDBException.class, () -> {
db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000); db.reduceRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000);
}); });
} }
} }