Add PutMulti

This commit is contained in:
Andrea Cavalli 2024-03-29 20:41:58 +01:00
parent 5851c233b1
commit 5b87231c26
8 changed files with 1534 additions and 178 deletions

View File

@ -15,6 +15,7 @@ import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.net.URI;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@ -107,6 +108,16 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
return db.put(arena, transactionOrUpdateId, columnId, keys, value, requestType);
}
@Override
public <T> List<T> putMulti(Arena arena,
long transactionOrUpdateId,
long columnId,
@NotNull List<@NotNull MemorySegment @NotNull []> keys,
@NotNull List<@NotNull MemorySegment> values,
RequestPut<? super MemorySegment, T> requestType) throws RocksDBException {
return db.putMulti(arena, transactionOrUpdateId, columnId, keys, values, requestType);
}
@Override
public <T> T get(Arena arena,
long transactionOrUpdateId,

View File

@ -4,6 +4,7 @@ import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.List;
import java.util.concurrent.CompletionStage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -151,6 +152,31 @@ public sealed interface RocksDBAPICommand<R> {
return api.putAsync(arena, transactionOrUpdateId, columnId, keys, value, requestType);
}
}
/**
* Put multiple elements into the specified positions
* @param arena arena
* @param transactionOrUpdateId transaction id, update id, or 0
* @param columnId column id
* @param keys multiple lists of column keys
* @param values multiple values, or null if not needed
* @param requestType the request type determines which type of data will be returned.
*/
record PutMulti<T>(Arena arena, long transactionOrUpdateId, long columnId,
@NotNull List<@NotNull MemorySegment @NotNull []> keys,
@NotNull List<@NotNull MemorySegment> values,
RequestPut<? super MemorySegment, T> requestType) implements RocksDBAPICommand<List<T>> {
@Override
public List<T> handleSync(RocksDBSyncAPI api) {
return api.putMulti(arena, transactionOrUpdateId, columnId, keys, values, requestType);
}
@Override
public CompletionStage<List<T>> handleAsync(RocksDBAsyncAPI api) {
return api.putMultiAsync(arena, transactionOrUpdateId, columnId, keys, values, requestType);
}
}
/**
* Get an element from the specified position

View File

@ -12,10 +12,12 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.jetbrains.annotations.NotNull;
@ -63,6 +65,16 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
return requestAsync(new Put<>(arena, transactionOrUpdateId, columnId, keys, value, requestType));
}
/** See: {@link PutMulti}. */
default <T> CompletionStage<List<T>> putMultiAsync(Arena arena,
long transactionOrUpdateId,
long columnId,
@NotNull List<@NotNull MemorySegment @NotNull []> keys,
@NotNull List<@NotNull MemorySegment> values,
RequestPut<? super MemorySegment, T> requestType) throws RocksDBException {
return requestAsync(new PutMulti<>(arena, transactionOrUpdateId, columnId, keys, values, requestType));
}
/** See: {@link Get}. */
default <T> CompletionStage<T> getAsync(Arena arena,
long transactionOrUpdateId,

View File

@ -15,10 +15,12 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -64,6 +66,16 @@ public interface RocksDBSyncAPI extends RocksDBSyncAPIRequestHandler {
return requestSync(new Put<>(arena, transactionOrUpdateId, columnId, keys, value, requestType));
}
/** See: {@link PutMulti}. */
default <T> List<T> putMulti(Arena arena,
long transactionOrUpdateId,
long columnId,
@NotNull List<@NotNull MemorySegment @NotNull []> keys,
@NotNull List<@NotNull MemorySegment> values,
RequestPut<? super MemorySegment, T> requestType) throws RocksDBException {
return requestSync(new PutMulti<>(arena, transactionOrUpdateId, columnId, keys, values, requestType));
}
/** See: {@link Get}. */
default <T> T get(Arena arena,
long transactionOrUpdateId,

View File

@ -7,6 +7,7 @@ import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue;
import it.cavallium.rockserver.core.common.RequestType;
import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestNothing;
import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Delta;
@ -31,6 +32,8 @@ import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@ -326,6 +329,26 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
}
}
@Override
public <T> List<T> putMulti(Arena arena,
long transactionOrUpdateId,
long columnId,
@NotNull List<@NotNull MemorySegment @NotNull []> keys,
@NotNull List<@NotNull MemorySegment> values,
RequestPut<? super MemorySegment, T> requestType) throws it.cavallium.rockserver.core.common.RocksDBException {
if (keys.size() != values.size()) {
throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size());
}
List<T> responses = requestType instanceof RequestType.RequestNothing<?> ? null : new ArrayList<>(keys.size());
for (int i = 0; i < keys.size(); i++) {
var result = put(arena, transactionOrUpdateId, columnId, keys.get(i), values.get(i), requestType);
if (responses != null) {
responses.add(result);
}
}
return responses != null ? responses : List.of();
}
/**
* @param txConsumer this can be called multiple times, if the optimistic transaction failed
*/

View File

@ -49,6 +49,10 @@ public class ThriftServer extends Server {
}
}
private static @NotNull List<@NotNull MemorySegment[]> keysToRecords(Arena arena, @NotNull List<@NotNull List< @NotNull ByteBuffer>> keysMulti) {
return keysMulti.stream().map(keys -> keysToRecord(arena, keys)).toList();
}
private static MemorySegment[] keysToRecord(Arena arena, List<@NotNull ByteBuffer> keys) {
if (keys == null) {
return null;
@ -62,6 +66,10 @@ public class ThriftServer extends Server {
return result;
}
private static @NotNull List<@NotNull MemorySegment> keyToRecords(Arena arena, @NotNull List<@NotNull ByteBuffer> keyMulti) {
return keyMulti.stream().map(key -> keyToRecord(arena, key)).toList();
}
private static @NotNull MemorySegment keyToRecord(Arena arena, @NotNull ByteBuffer key) {
if (key.isDirect()) {
return MemorySegment.ofBuffer(key);
@ -119,6 +127,22 @@ public class ThriftServer extends Server {
};
}
private static BiConsumer<? super List<Void>, ? super Throwable> handleResultListWithArena(Arena arena,
AsyncMethodCallback<Void> resultHandler) {
return (result, error) -> {
arena.close();
if (error != null) {
if (error instanceof Exception ex) {
resultHandler.onError(ex);
} else {
resultHandler.onError(new Exception(error));
}
} else {
resultHandler.onComplete(null);
}
};
}
private static OptionalBinary mapResult(MemorySegment memorySegment) {
var result = new OptionalBinary();
return memorySegment != null ? result.setValue(memorySegment.asByteBuffer()) : result;
@ -200,6 +224,18 @@ public class ThriftServer extends Server {
.whenComplete(handleResultWithArena(arena, resultHandler));
}
@Override
public void putMulti(long transactionOrUpdateId,
long columnId,
List<List<ByteBuffer>> keysMulti,
List<ByteBuffer> valueMulti,
AsyncMethodCallback<Void> resultHandler) {
var arena = Arena.ofShared();
client.getAsyncApi()
.putMultiAsync(arena, transactionOrUpdateId, columnId, keysToRecords(arena, keysMulti), keyToRecords(arena, valueMulti), RequestType.none())
.whenComplete(handleResultListWithArena(arena, resultHandler));
}
@Override
public void putGetPrevious(long transactionOrUpdateId,
long columnId,

View File

@ -56,6 +56,8 @@ service RocksDB {
void put(1: required i64 transactionOrUpdateId, 2: required i64 columnId, 3: required list<binary> keys, 4: required binary value),
void putMulti(1: required i64 transactionOrUpdateId, 2: required i64 columnId, 3: required list<list<binary>> keysMulti, 4: required list<binary> valueMulti),
OptionalBinary putGetPrevious(1: required i64 transactionOrUpdateId, 2: required i64 columnId, 3: required list<binary> keys, 4: required binary value),
Delta putGetDelta(1: required i64 transactionOrUpdateId, 2: required i64 columnId, 3: required list<binary> keys, 4: required binary value),