Usable
This commit is contained in:
parent
a97613284c
commit
76b6985d4f
@ -3,8 +3,10 @@ package it.cavallium.dbengine.client;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import it.cavallium.dbengine.database.Column;
|
||||
import it.cavallium.dbengine.database.LLKeyValueDatabase;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
|
||||
import it.cavallium.dbengine.database.collections.FixedLengthSerializer;
|
||||
import it.cavallium.dbengine.database.collections.Serializer;
|
||||
import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes;
|
||||
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
|
||||
import java.nio.file.Path;
|
||||
@ -24,11 +26,12 @@ import reactor.util.function.Tuples;
|
||||
public class Example {
|
||||
|
||||
private static final boolean printPreviousValue = false;
|
||||
private static final int numRepeats = 100000;
|
||||
|
||||
public static void main(String[] args) {
|
||||
System.out.println("Test");
|
||||
testAtPut();
|
||||
testPutValueAndGetPrevious();
|
||||
testPutValue();
|
||||
testPutValue()
|
||||
.subscribeOn(Schedulers.parallel())
|
||||
.blockOptional();
|
||||
@ -43,7 +46,7 @@ public class Example {
|
||||
return test("MapDictionary::at::put (same key, same value)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ssg, ser))),
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))),
|
||||
tuple -> Mono
|
||||
.defer(() -> Mono
|
||||
.fromRunnable(() -> {
|
||||
@ -57,7 +60,7 @@ public class Example {
|
||||
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
|
||||
})
|
||||
),
|
||||
100000,
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
|
||||
@ -70,7 +73,7 @@ public class Example {
|
||||
return test("MapDictionary::putValueAndGetPrevious (same key, same value)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ssg, ser))),
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))),
|
||||
tuple -> Mono
|
||||
.defer(() -> Mono
|
||||
.fromRunnable(() -> {
|
||||
@ -83,7 +86,7 @@ public class Example {
|
||||
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
|
||||
})
|
||||
),
|
||||
10000,
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
|
||||
@ -96,7 +99,7 @@ public class Example {
|
||||
return test("MapDictionary::putValue (same key, same value)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ssg, ser))),
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))),
|
||||
tuple -> Mono
|
||||
.defer(() -> Mono
|
||||
.fromRunnable(() -> {
|
||||
@ -105,7 +108,82 @@ public class Example {
|
||||
})
|
||||
.then(tuple.getT2().putValue(itemKeyBuffer, newValue))
|
||||
),
|
||||
10000,
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
|
||||
private static Mono<Void> rangeTestAtPut() {
|
||||
var ser = FixedLengthSerializer.noop(4);
|
||||
var vser = Serializer.noopBytes();
|
||||
var itemKey = new byte[]{0, 1, 2, 3};
|
||||
var newValue = new byte[]{4, 5, 6, 7};
|
||||
var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey);
|
||||
return test("MapDictionaryRange::at::put (same key, same value)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))),
|
||||
tuple -> Mono
|
||||
.defer(() -> Mono
|
||||
.fromRunnable(() -> {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue));
|
||||
})
|
||||
.then(tuple.getT2().at(null, itemKeyBuffer))
|
||||
.flatMap(handle -> handle.setAndGetPrevious(newValue))
|
||||
.doOnSuccess(oldValue -> {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
|
||||
})
|
||||
),
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
|
||||
private static Mono<Void> rangeTestPutValueAndGetPrevious() {
|
||||
var ser = FixedLengthSerializer.noop(4);
|
||||
var vser = Serializer.noopBytes();
|
||||
var itemKey = new byte[]{0, 1, 2, 3};
|
||||
var newValue = new byte[]{4, 5, 6, 7};
|
||||
var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey);
|
||||
return test("MapDictionaryRange::putValueAndGetPrevious (same key, same value)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))),
|
||||
tuple -> Mono
|
||||
.defer(() -> Mono
|
||||
.fromRunnable(() -> {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue));
|
||||
})
|
||||
.then(tuple.getT2().putValueAndGetPrevious(itemKeyBuffer, newValue))
|
||||
.doOnSuccess(oldValue -> {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
|
||||
})
|
||||
),
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
|
||||
private static Mono<Void> rangeTestPutValue() {
|
||||
var ser = FixedLengthSerializer.noop(4);
|
||||
var vser = Serializer.noopBytes();
|
||||
var itemKey = new byte[]{0, 1, 2, 3};
|
||||
var newValue = new byte[]{4, 5, 6, 7};
|
||||
var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey);
|
||||
return test("MapDictionaryRange::putValue (same key, same value)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))),
|
||||
tuple -> Mono
|
||||
.defer(() -> Mono
|
||||
.fromRunnable(() -> {
|
||||
if (printPreviousValue)
|
||||
System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue));
|
||||
})
|
||||
.then(tuple.getT2().putValue(itemKeyBuffer, newValue))
|
||||
),
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
|
||||
|
@ -3,223 +3,172 @@ package it.cavallium.dbengine.database.collections;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionaryResultType;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.GroupedFlux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
// todo: implement optimized methods
|
||||
public class DatabaseMapDictionary<T, U, US extends DatabaseStage<U>> implements DatabaseStageMap<T, U, US> {
|
||||
/**
|
||||
* Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle"
|
||||
*/
|
||||
public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> {
|
||||
|
||||
public static final byte[] EMPTY_BYTES = new byte[0];
|
||||
private final LLDictionary dictionary;
|
||||
private final SubStageGetter<U, US> subStageGetter;
|
||||
private final FixedLengthSerializer<T> keySuffixSerializer;
|
||||
private final byte[] keyPrefix;
|
||||
private final int keySuffixLength;
|
||||
private final int keyExtLength;
|
||||
private final LLRange range;
|
||||
private final Serializer<U> valueSerializer;
|
||||
|
||||
private static byte[] firstKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0x00);
|
||||
protected DatabaseMapDictionary(LLDictionary dictionary, byte[] prefixKey, FixedLengthSerializer<T> keySuffixSerializer, Serializer<U> valueSerializer) {
|
||||
super(dictionary, new SubStageGetterSingle<>(valueSerializer), keySuffixSerializer, prefixKey, 0);
|
||||
this.valueSerializer = valueSerializer;
|
||||
}
|
||||
|
||||
private static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0xFF);
|
||||
}
|
||||
|
||||
private static byte[] fillKeySuffixAndExt(byte[] prefixKey, int prefixLength, int suffixLength, int extLength, byte fillValue) {
|
||||
assert prefixKey.length == prefixLength;
|
||||
assert suffixLength > 0;
|
||||
assert extLength > 0;
|
||||
byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength);
|
||||
Arrays.fill(result, prefixLength, result.length, fillValue);
|
||||
return result;
|
||||
}
|
||||
|
||||
private static byte[] firstKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0x00);
|
||||
}
|
||||
|
||||
private static byte[] lastKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0xFF);
|
||||
}
|
||||
|
||||
private static byte[] fillKeyExt(byte[] prefixKey,
|
||||
byte[] suffixKey,
|
||||
int prefixLength,
|
||||
int suffixLength,
|
||||
int extLength,
|
||||
byte fillValue) {
|
||||
assert prefixKey.length == prefixLength;
|
||||
assert suffixKey.length == suffixLength;
|
||||
assert suffixLength > 0;
|
||||
assert extLength > 0;
|
||||
byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength);
|
||||
System.arraycopy(suffixKey, 0, result, prefixLength, suffixLength);
|
||||
Arrays.fill(result, prefixLength + suffixLength, result.length, fillValue);
|
||||
return result;
|
||||
}
|
||||
|
||||
public static <T, U> DatabaseMapDictionary<T, U, DatabaseStageEntry<U>> simple(LLDictionary dictionary,
|
||||
SubStageGetterSingle<U> subStageGetter,
|
||||
FixedLengthSerializer<T> keySerializer) {
|
||||
return new DatabaseMapDictionary<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, 0);
|
||||
}
|
||||
|
||||
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionary<T, U, US> deep(LLDictionary dictionary,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
public static <T, U> DatabaseMapDictionary<T, U> simple(LLDictionary dictionary,
|
||||
FixedLengthSerializer<T> keySerializer,
|
||||
int keyExtLength) {
|
||||
return new DatabaseMapDictionary<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength);
|
||||
Serializer<U> valueSerializer) {
|
||||
return new DatabaseMapDictionary<>(dictionary, EMPTY_BYTES, keySerializer, valueSerializer);
|
||||
}
|
||||
|
||||
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionary<T, U, US> deepIntermediate(LLDictionary dictionary,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
public static <T, U> DatabaseMapDictionary<T, U> tail(LLDictionary dictionary,
|
||||
FixedLengthSerializer<T> keySuffixSerializer,
|
||||
byte[] prefixKey,
|
||||
int keyExtLength) {
|
||||
return new DatabaseMapDictionary<>(dictionary, subStageGetter, keySuffixSerializer, prefixKey, keyExtLength);
|
||||
Serializer<U> valueSerializer,
|
||||
byte[] prefixKey) {
|
||||
return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer);
|
||||
}
|
||||
|
||||
private DatabaseMapDictionary(LLDictionary dictionary,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
FixedLengthSerializer<T> keySuffixSerializer,
|
||||
byte[] prefixKey,
|
||||
int keyExtLength) {
|
||||
this.dictionary = dictionary;
|
||||
this.subStageGetter = subStageGetter;
|
||||
this.keySuffixSerializer = keySuffixSerializer;
|
||||
this.keyPrefix = prefixKey;
|
||||
this.keySuffixLength = keySuffixSerializer.getLength();
|
||||
this.keyExtLength = keyExtLength;
|
||||
byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private boolean suffixKeyConsistency(int keySuffixLength) {
|
||||
return this.keySuffixLength == keySuffixLength;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private boolean extKeyConsistency(int keyExtLength) {
|
||||
return this.keyExtLength == keyExtLength;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private boolean suffixAndExtKeyConsistency(int keySuffixAndExtLength) {
|
||||
return this.keySuffixLength + this.keyExtLength == keySuffixAndExtLength;
|
||||
}
|
||||
|
||||
/**
|
||||
* Keep only suffix and ext
|
||||
*/
|
||||
private byte[] stripPrefix(byte[] key) {
|
||||
return Arrays.copyOfRange(key, this.keyPrefix.length, key.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove ext from suffix
|
||||
*/
|
||||
private byte[] trimSuffix(byte[] keySuffix) {
|
||||
if (keySuffix.length == keySuffixLength)
|
||||
return keySuffix;
|
||||
return Arrays.copyOf(keySuffix, keySuffixLength);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove ext from full key
|
||||
*/
|
||||
private byte[] removeExtFromFullKey(byte[] key) {
|
||||
return Arrays.copyOf(key, keyPrefix.length + keySuffixLength);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add prefix to suffix
|
||||
*/
|
||||
private byte[] toKeyWithoutExt(byte[] suffixKey) {
|
||||
assert suffixKey.length == keySuffixLength;
|
||||
byte[] result = Arrays.copyOf(keyPrefix, keyPrefix.length + keySuffixLength);
|
||||
System.arraycopy(suffixKey, 0, result, keyPrefix.length, keySuffixLength);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove suffix from keySuffix, returning probably an empty byte array
|
||||
*/
|
||||
private byte[] stripSuffix(byte[] keySuffix) {
|
||||
if (keySuffix.length == this.keySuffixLength)
|
||||
return EMPTY_BYTES;
|
||||
return Arrays.copyOfRange(keySuffix, this.keySuffixLength, keySuffix.length);
|
||||
}
|
||||
|
||||
private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {
|
||||
if (snapshot == null) {
|
||||
return null;
|
||||
} else {
|
||||
return snapshot.getSnapshot(dictionary);
|
||||
}
|
||||
}
|
||||
|
||||
private LLRange toExtRange(byte[] keySuffix) {
|
||||
byte[] first = firstKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
byte[] end = lastKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
return LLRange.of(first, end);
|
||||
private byte[] toKey(byte[] suffixKey) {
|
||||
assert suffixKeyConsistency(suffixKey.length);
|
||||
byte[] key = Arrays.copyOf(keyPrefix, keyPrefix.length + suffixKey.length);
|
||||
System.arraycopy(suffixKey, 0, key, keyPrefix.length, suffixKey.length);
|
||||
return key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
|
||||
byte[] keySuffixData = serializeSuffix(keySuffix);
|
||||
Flux<byte[]> rangeKeys = this
|
||||
.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData)
|
||||
);
|
||||
return this.subStageGetter
|
||||
.subStage(dictionary, snapshot, toKeyWithoutExt(keySuffixData), rangeKeys);
|
||||
public Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary
|
||||
.getRange(resolveSnapshot(snapshot), range)
|
||||
.collectMap(entry -> deserializeSuffix(stripPrefix(entry.getKey())), entry -> deserialize(entry.getValue()), HashMap::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||
Flux<GroupedFlux<byte[], byte[]>> groupedFlux = dictionary
|
||||
public Mono<Map<T, U>> setAndGetPrevious(Map<T, U> value) {
|
||||
return dictionary
|
||||
.setRange(range,
|
||||
Flux
|
||||
.fromIterable(value.entrySet())
|
||||
.map(entry -> Map.entry(serializeSuffix(entry.getKey()), serialize(entry.getValue()))),
|
||||
true
|
||||
)
|
||||
.collectMap(entry -> deserializeSuffix(stripPrefix(entry.getKey())), entry -> deserialize(entry.getValue()), HashMap::new);
|
||||
}
|
||||
|
||||
private Entry<byte[], byte[]> stripPrefix(Entry<byte[], byte[]> entry) {
|
||||
byte[] keySuffix = stripPrefix(entry.getKey());
|
||||
return Map.entry(keySuffix, entry.getValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Map<T, U>> clearAndGetPrevious() {
|
||||
return dictionary
|
||||
.setRange(range, Flux.empty(), true)
|
||||
.collectMap(entry -> deserializeSuffix(stripPrefix(entry.getKey())), entry -> deserialize(entry.getValue()), HashMap::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Long> size(@Nullable CompositeSnapshot snapshot, boolean fast) {
|
||||
return dictionary.sizeRange(resolveSnapshot(snapshot), range, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
|
||||
return Mono.just(new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noopBytes())).map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix) {
|
||||
return dictionary.get(resolveSnapshot(snapshot), toKey(serializeSuffix(keySuffix))).map(this::deserialize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> putValue(T keySuffix, U value) {
|
||||
return dictionary.put(toKey(serializeSuffix(keySuffix)), serialize(value), LLDictionaryResultType.VOID).then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<U> putValueAndGetPrevious(T keySuffix, U value) {
|
||||
return dictionary
|
||||
.put(toKey(serializeSuffix(keySuffix)), serialize(value), LLDictionaryResultType.PREVIOUS_VALUE)
|
||||
.map(this::deserialize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> putValueAndGetStatus(T keySuffix, U value) {
|
||||
return dictionary
|
||||
.put(toKey(serializeSuffix(keySuffix)), serialize(value), LLDictionaryResultType.VALUE_CHANGED)
|
||||
.map(LLUtils::responseToBoolean);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> remove(T keySuffix) {
|
||||
return dictionary.remove(toKey(serializeSuffix(keySuffix)), LLDictionaryResultType.VOID).then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<U> removeAndGetPrevious(T keySuffix) {
|
||||
return dictionary
|
||||
.remove(toKey(serializeSuffix(keySuffix)), LLDictionaryResultType.PREVIOUS_VALUE)
|
||||
.map(this::deserialize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> removeAndGetStatus(T keySuffix) {
|
||||
return dictionary
|
||||
.remove(toKey(serializeSuffix(keySuffix)), LLDictionaryResultType.VALUE_CHANGED)
|
||||
.map(LLUtils::responseToBoolean);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys) {
|
||||
return dictionary
|
||||
.getMulti(resolveSnapshot(snapshot), keys.map(keySuffix -> toKey(serializeSuffix(keySuffix))))
|
||||
.map(entry -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue())));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary
|
||||
.getRangeKeys(resolveSnapshot(snapshot), range)
|
||||
.groupBy(this::removeExtFromFullKey);
|
||||
return groupedFlux
|
||||
.flatMap(rangeKeys -> this.subStageGetter
|
||||
.subStage(dictionary, snapshot, rangeKeys.key(), rangeKeys)
|
||||
.map(us -> Map.entry(this.deserializeSuffix(this.stripPrefix(rangeKeys.key())), us))
|
||||
);
|
||||
.map(keySuffix -> Map.entry(deserializeSuffix(stripPrefix(keySuffix)),
|
||||
new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary, toKey(stripPrefix(keySuffix)), Serializer.noopBytes()),
|
||||
valueSerializer
|
||||
)
|
||||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
|
||||
var newValues = entries
|
||||
.flatMap(entry -> at(null, entry.getKey()).map(us -> Tuples.of(us, entry.getValue())))
|
||||
.flatMap(tuple -> tuple.getT1().set(tuple.getT2()));
|
||||
|
||||
return getAllStages(null)
|
||||
.flatMap(stage -> stage.getValue().get(null).map(val -> Map.entry(stage.getKey(), val)))
|
||||
.concatWith(newValues.then(Mono.empty()));
|
||||
var serializedEntries = entries
|
||||
.map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue())));
|
||||
return dictionary.setRange(range, serializedEntries, true)
|
||||
.map(entry -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue())));
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private T deserializeSuffix(byte[] keySuffix) {
|
||||
var serialized = Unpooled.wrappedBuffer(keySuffix);
|
||||
return keySuffixSerializer.deserialize(serialized);
|
||||
private U deserialize(byte[] bytes) {
|
||||
var serialized = Unpooled.wrappedBuffer(bytes);
|
||||
return valueSerializer.deserialize(serialized);
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private byte[] serializeSuffix(T keySuffix) {
|
||||
var output = Unpooled.buffer(keySuffixLength, keySuffixLength);
|
||||
var outputBytes = new byte[keySuffixLength];
|
||||
keySuffixSerializer.serialize(keySuffix, output);
|
||||
output.getBytes(0, outputBytes, 0, keySuffixLength);
|
||||
private byte[] serialize(U bytes) {
|
||||
var output = Unpooled.buffer();
|
||||
valueSerializer.serialize(bytes, output);
|
||||
output.resetReaderIndex();
|
||||
int length = output.readableBytes();
|
||||
var outputBytes = new byte[length];
|
||||
output.getBytes(0, outputBytes, 0, length);
|
||||
return outputBytes;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,229 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import io.netty.buffer.Unpooled;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.GroupedFlux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
// todo: implement optimized methods
|
||||
public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implements DatabaseStageMap<T, U, US> {
|
||||
|
||||
public static final byte[] EMPTY_BYTES = new byte[0];
|
||||
protected final LLDictionary dictionary;
|
||||
protected final SubStageGetter<U, US> subStageGetter;
|
||||
protected final FixedLengthSerializer<T> keySuffixSerializer;
|
||||
protected final byte[] keyPrefix;
|
||||
protected final int keySuffixLength;
|
||||
protected final int keyExtLength;
|
||||
protected final LLRange range;
|
||||
|
||||
protected static byte[] firstKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0x00);
|
||||
}
|
||||
|
||||
protected static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0xFF);
|
||||
}
|
||||
|
||||
protected static byte[] fillKeySuffixAndExt(byte[] prefixKey, int prefixLength, int suffixLength, int extLength, byte fillValue) {
|
||||
assert prefixKey.length == prefixLength;
|
||||
assert suffixLength > 0;
|
||||
assert extLength > 0;
|
||||
byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength);
|
||||
Arrays.fill(result, prefixLength, result.length, fillValue);
|
||||
return result;
|
||||
}
|
||||
|
||||
protected static byte[] firstKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0x00);
|
||||
}
|
||||
|
||||
protected static byte[] lastKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0xFF);
|
||||
}
|
||||
|
||||
protected static byte[] fillKeyExt(byte[] prefixKey,
|
||||
byte[] suffixKey,
|
||||
int prefixLength,
|
||||
int suffixLength,
|
||||
int extLength,
|
||||
byte fillValue) {
|
||||
assert prefixKey.length == prefixLength;
|
||||
assert suffixKey.length == suffixLength;
|
||||
assert suffixLength > 0;
|
||||
assert extLength > 0;
|
||||
byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength);
|
||||
System.arraycopy(suffixKey, 0, result, prefixLength, suffixLength);
|
||||
Arrays.fill(result, prefixLength + suffixLength, result.length, fillValue);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Use DatabaseMapDictionaryRange.simple instead
|
||||
*/
|
||||
@Deprecated
|
||||
public static <T, U> DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> simple(LLDictionary dictionary,
|
||||
SubStageGetterSingle<U> subStageGetter,
|
||||
FixedLengthSerializer<T> keySerializer) {
|
||||
return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, 0);
|
||||
}
|
||||
|
||||
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepTail(LLDictionary dictionary,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
FixedLengthSerializer<T> keySerializer,
|
||||
int keyExtLength) {
|
||||
return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength);
|
||||
}
|
||||
|
||||
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepIntermediate(LLDictionary dictionary,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
FixedLengthSerializer<T> keySuffixSerializer,
|
||||
byte[] prefixKey,
|
||||
int keyExtLength) {
|
||||
return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySuffixSerializer, prefixKey, keyExtLength);
|
||||
}
|
||||
|
||||
protected DatabaseMapDictionaryDeep(LLDictionary dictionary,
|
||||
SubStageGetter<U, US> subStageGetter,
|
||||
FixedLengthSerializer<T> keySuffixSerializer,
|
||||
byte[] prefixKey,
|
||||
int keyExtLength) {
|
||||
this.dictionary = dictionary;
|
||||
this.subStageGetter = subStageGetter;
|
||||
this.keySuffixSerializer = keySuffixSerializer;
|
||||
this.keyPrefix = prefixKey;
|
||||
this.keySuffixLength = keySuffixSerializer.getLength();
|
||||
this.keyExtLength = keyExtLength;
|
||||
byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
protected boolean suffixKeyConsistency(int keySuffixLength) {
|
||||
return this.keySuffixLength == keySuffixLength;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
protected boolean extKeyConsistency(int keyExtLength) {
|
||||
return this.keyExtLength == keyExtLength;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
protected boolean suffixAndExtKeyConsistency(int keySuffixAndExtLength) {
|
||||
return this.keySuffixLength + this.keyExtLength == keySuffixAndExtLength;
|
||||
}
|
||||
|
||||
/**
|
||||
* Keep only suffix and ext
|
||||
*/
|
||||
protected byte[] stripPrefix(byte[] key) {
|
||||
return Arrays.copyOfRange(key, this.keyPrefix.length, key.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove ext from suffix
|
||||
*/
|
||||
protected byte[] trimSuffix(byte[] keySuffix) {
|
||||
if (keySuffix.length == keySuffixLength)
|
||||
return keySuffix;
|
||||
return Arrays.copyOf(keySuffix, keySuffixLength);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove ext from full key
|
||||
*/
|
||||
protected byte[] removeExtFromFullKey(byte[] key) {
|
||||
return Arrays.copyOf(key, keyPrefix.length + keySuffixLength);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add prefix to suffix
|
||||
*/
|
||||
protected byte[] toKeyWithoutExt(byte[] suffixKey) {
|
||||
assert suffixKey.length == keySuffixLength;
|
||||
byte[] result = Arrays.copyOf(keyPrefix, keyPrefix.length + keySuffixLength);
|
||||
System.arraycopy(suffixKey, 0, result, keyPrefix.length, keySuffixLength);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove suffix from keySuffix, returning probably an empty byte array
|
||||
*/
|
||||
protected byte[] stripSuffix(byte[] keySuffix) {
|
||||
if (keySuffix.length == this.keySuffixLength)
|
||||
return EMPTY_BYTES;
|
||||
return Arrays.copyOfRange(keySuffix, this.keySuffixLength, keySuffix.length);
|
||||
}
|
||||
|
||||
protected LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {
|
||||
if (snapshot == null) {
|
||||
return null;
|
||||
} else {
|
||||
return snapshot.getSnapshot(dictionary);
|
||||
}
|
||||
}
|
||||
|
||||
protected LLRange toExtRange(byte[] keySuffix) {
|
||||
byte[] first = firstKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
byte[] end = lastKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
return LLRange.of(first, end);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
|
||||
byte[] keySuffixData = serializeSuffix(keySuffix);
|
||||
Flux<byte[]> rangeKeys = this
|
||||
.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData)
|
||||
);
|
||||
return this.subStageGetter
|
||||
.subStage(dictionary, snapshot, toKeyWithoutExt(keySuffixData), rangeKeys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||
Flux<GroupedFlux<byte[], byte[]>> groupedFlux = dictionary
|
||||
.getRangeKeys(resolveSnapshot(snapshot), range)
|
||||
.groupBy(this::removeExtFromFullKey);
|
||||
return groupedFlux
|
||||
.flatMap(rangeKeys -> this.subStageGetter
|
||||
.subStage(dictionary, snapshot, rangeKeys.key(), rangeKeys)
|
||||
.map(us -> Map.entry(this.deserializeSuffix(this.stripPrefix(rangeKeys.key())), us))
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
|
||||
var newValues = entries
|
||||
.flatMap(entry -> at(null, entry.getKey()).map(us -> Tuples.of(us, entry.getValue())))
|
||||
.flatMap(tuple -> tuple.getT1().set(tuple.getT2()));
|
||||
|
||||
return getAllStages(null)
|
||||
.flatMap(stage -> stage.getValue().get(null).map(val -> Map.entry(stage.getKey(), val)))
|
||||
.concatWith(newValues.then(Mono.empty()));
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
protected T deserializeSuffix(byte[] keySuffix) {
|
||||
var serialized = Unpooled.wrappedBuffer(keySuffix);
|
||||
return keySuffixSerializer.deserialize(serialized);
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
protected byte[] serializeSuffix(T keySuffix) {
|
||||
var output = Unpooled.buffer(keySuffixLength, keySuffixLength);
|
||||
var outputBytes = new byte[keySuffixLength];
|
||||
keySuffixSerializer.serialize(keySuffix, output);
|
||||
output.getBytes(0, outputBytes, 0, keySuffixLength);
|
||||
return outputBytes;
|
||||
}
|
||||
}
|
@ -1,175 +0,0 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import io.netty.buffer.Unpooled;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle"
|
||||
*/
|
||||
public class DatabaseMapDictionaryRange<T, U> implements DatabaseStageMap<T, U, DatabaseStageEntry<U>> {
|
||||
|
||||
public static final byte[] NO_PREFIX = new byte[0];
|
||||
private final LLDictionary dictionary;
|
||||
private final byte[] keyPrefix;
|
||||
private final FixedLengthSerializer<T> keySuffixSerializer;
|
||||
private final LLRange range;
|
||||
private final Serializer<U> valueSerializer;
|
||||
|
||||
private static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength) {
|
||||
assert prefixKey.length == prefixLength;
|
||||
byte[] lastKey = Arrays.copyOf(prefixKey, prefixLength + suffixLength);
|
||||
Arrays.fill(lastKey, prefixLength, lastKey.length, (byte) 0xFF);
|
||||
return lastKey;
|
||||
}
|
||||
|
||||
private static byte[] firstKey(byte[] prefixKey, int prefixLength, int suffixLength) {
|
||||
assert prefixKey.length == prefixLength;
|
||||
byte[] lastKey = Arrays.copyOf(prefixKey, prefixLength + suffixLength);
|
||||
Arrays.fill(lastKey, prefixLength, lastKey.length, (byte) 0x00);
|
||||
return lastKey;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public DatabaseMapDictionaryRange(LLDictionary dictionary, FixedLengthSerializer<T> keySuffixSerializer, Serializer<U> valueSerializer) {
|
||||
this(dictionary, NO_PREFIX, keySuffixSerializer, valueSerializer);
|
||||
}
|
||||
|
||||
public DatabaseMapDictionaryRange(LLDictionary dictionary, byte[] prefixKey, FixedLengthSerializer<T> keySuffixSerializer, Serializer<U> valueSerializer) {
|
||||
this.dictionary = dictionary;
|
||||
this.keyPrefix = prefixKey;
|
||||
this.keySuffixSerializer = keySuffixSerializer;
|
||||
this.valueSerializer = valueSerializer;
|
||||
byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixSerializer.getLength());
|
||||
byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixSerializer.getLength());
|
||||
this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey);
|
||||
}
|
||||
|
||||
private boolean suffixKeyConsistency(int keySuffixLength) {
|
||||
return this.keySuffixSerializer.getLength() == keySuffixLength;
|
||||
}
|
||||
|
||||
private byte[] toKey(byte[] suffixKey) {
|
||||
assert suffixKeyConsistency(suffixKey.length);
|
||||
byte[] key = Arrays.copyOf(keyPrefix, keyPrefix.length + suffixKey.length);
|
||||
System.arraycopy(suffixKey, 0, key, keyPrefix.length, suffixKey.length);
|
||||
return key;
|
||||
}
|
||||
|
||||
private byte[] stripPrefix(byte[] key) {
|
||||
return Arrays.copyOfRange(key, this.keyPrefix.length, key.length);
|
||||
}
|
||||
|
||||
private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {
|
||||
if (snapshot == null) {
|
||||
return null;
|
||||
} else {
|
||||
return snapshot.getSnapshot(dictionary);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary
|
||||
.getRange(resolveSnapshot(snapshot), range)
|
||||
.map(this::stripPrefix)
|
||||
.collectMap(entry -> deserializeSuffix(entry.getKey()), entry -> deserialize(entry.getValue()), HashMap::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Map<T, U>> setAndGetPrevious(Map<T, U> value) {
|
||||
return dictionary
|
||||
.setRange(range,
|
||||
Flux
|
||||
.fromIterable(value.entrySet())
|
||||
.map(entry -> Map.entry(serializeSuffix(entry.getKey()), serialize(entry.getValue()))),
|
||||
true
|
||||
)
|
||||
.map(this::stripPrefix)
|
||||
.collectMap(entry -> deserializeSuffix(entry.getKey()), entry -> deserialize(entry.getValue()), HashMap::new);
|
||||
}
|
||||
|
||||
private Entry<byte[], byte[]> stripPrefix(Entry<byte[], byte[]> entry) {
|
||||
byte[] keySuffix = stripPrefix(entry.getKey());
|
||||
return Map.entry(keySuffix, entry.getValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Map<T, U>> clearAndGetPrevious() {
|
||||
return dictionary
|
||||
.setRange(range, Flux.empty(), true)
|
||||
.map(this::stripPrefix)
|
||||
.collectMap(entry -> deserializeSuffix(entry.getKey()), entry -> deserialize(entry.getValue()), HashMap::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Long> size(@Nullable CompositeSnapshot snapshot, boolean fast) {
|
||||
return dictionary.sizeRange(resolveSnapshot(snapshot), range, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
|
||||
return Mono.just(new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noopBytes())).map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary
|
||||
.getRangeKeys(resolveSnapshot(snapshot), range)
|
||||
.map(this::stripPrefix)
|
||||
.map(keySuffix -> Map.entry(deserializeSuffix(keySuffix),
|
||||
new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary, toKey(keySuffix), Serializer.noopBytes()),
|
||||
valueSerializer
|
||||
)
|
||||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
|
||||
var serializedEntries = entries
|
||||
.map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue())));
|
||||
return dictionary.setRange(range, serializedEntries, true)
|
||||
.map(entry -> Map.entry(deserializeSuffix(entry.getKey()), deserialize(entry.getValue())));
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private U deserialize(byte[] bytes) {
|
||||
var serialized = Unpooled.wrappedBuffer(bytes);
|
||||
return valueSerializer.deserialize(serialized);
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private byte[] serialize(U bytes) {
|
||||
var output = Unpooled.buffer();
|
||||
valueSerializer.serialize(bytes, output);
|
||||
output.resetReaderIndex();
|
||||
int length = output.readableBytes();
|
||||
var outputBytes = new byte[length];
|
||||
output.getBytes(0, outputBytes, 0, length);
|
||||
return outputBytes;
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private T deserializeSuffix(byte[] keySuffix) {
|
||||
var serialized = Unpooled.wrappedBuffer(keySuffix);
|
||||
return keySuffixSerializer.deserialize(serialized);
|
||||
}
|
||||
|
||||
//todo: temporary wrapper. convert the whole class to buffers
|
||||
private byte[] serializeSuffix(T keySuffix) {
|
||||
var output = Unpooled.buffer(keySuffixSerializer.getLength(), keySuffixSerializer.getLength());
|
||||
var outputBytes = new byte[keySuffixSerializer.getLength()];
|
||||
keySuffixSerializer.serialize(keySuffix, output);
|
||||
output.getBytes(0, outputBytes, 0, keySuffixSerializer.getLength());
|
||||
return outputBytes;
|
||||
}
|
||||
}
|
@ -27,7 +27,7 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
byte[] prefixKey,
|
||||
Flux<byte[]> keyFlux) {
|
||||
return Mono.just(DatabaseMapDictionary.deepIntermediate(dictionary,
|
||||
return Mono.just(DatabaseMapDictionaryDeep.deepIntermediate(dictionary,
|
||||
subStageGetter,
|
||||
keySerializer,
|
||||
prefixKey,
|
||||
|
@ -22,6 +22,6 @@ public class SubStageGetterMapRange<T, U> implements SubStageGetter<Map<T, U>, D
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
byte[] prefixKey,
|
||||
Flux<byte[]> keyFlux) {
|
||||
return Mono.just(new DatabaseMapDictionaryRange<>(dictionary, prefixKey, keySerializer, valueSerializer));
|
||||
return Mono.just(DatabaseMapDictionary.tail(dictionary, keySerializer, valueSerializer, prefixKey));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user