diff --git a/src/example/java/it.cavallium.dbengine.client/Example.java b/src/example/java/it.cavallium.dbengine.client/Example.java index 470672d..bfb33ca 100644 --- a/src/example/java/it.cavallium.dbengine.client/Example.java +++ b/src/example/java/it.cavallium.dbengine.client/Example.java @@ -3,8 +3,10 @@ package it.cavallium.dbengine.client; import io.netty.buffer.Unpooled; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLKeyValueDatabase; +import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.FixedLengthSerializer; +import it.cavallium.dbengine.database.collections.Serializer; import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import java.nio.file.Path; @@ -24,11 +26,12 @@ import reactor.util.function.Tuples; public class Example { private static final boolean printPreviousValue = false; + private static final int numRepeats = 100000; public static void main(String[] args) { - System.out.println("Test"); testAtPut(); testPutValueAndGetPrevious(); + testPutValue(); testPutValue() .subscribeOn(Schedulers.parallel()) .blockOptional(); @@ -43,7 +46,7 @@ public class Example { return test("MapDictionary::at::put (same key, same value)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) - .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ssg, ser))), + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))), tuple -> Mono .defer(() -> Mono .fromRunnable(() -> { @@ -57,7 +60,7 @@ public class Example { System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); }) ), - 100000, + numRepeats, tuple -> tuple.getT1().close()); } @@ -70,7 +73,7 @@ public class Example { return test("MapDictionary::putValueAndGetPrevious (same key, same value)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) - .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ssg, ser))), + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))), tuple -> Mono .defer(() -> Mono .fromRunnable(() -> { @@ -83,7 +86,7 @@ public class Example { System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); }) ), - 10000, + numRepeats, tuple -> tuple.getT1().close()); } @@ -96,7 +99,7 @@ public class Example { return test("MapDictionary::putValue (same key, same value)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) - .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ssg, ser))), + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ssg, ser))), tuple -> Mono .defer(() -> Mono .fromRunnable(() -> { @@ -105,7 +108,82 @@ public class Example { }) .then(tuple.getT2().putValue(itemKeyBuffer, newValue)) ), - 10000, + numRepeats, + tuple -> tuple.getT1().close()); + } + + private static Mono rangeTestAtPut() { + var ser = FixedLengthSerializer.noop(4); + var vser = Serializer.noopBytes(); + var itemKey = new byte[]{0, 1, 2, 3}; + var newValue = new byte[]{4, 5, 6, 7}; + var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey); + return test("MapDictionaryRange::at::put (same key, same value)", + tempDb() + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))), + tuple -> Mono + .defer(() -> Mono + .fromRunnable(() -> { + if (printPreviousValue) + System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue)); + }) + .then(tuple.getT2().at(null, itemKeyBuffer)) + .flatMap(handle -> handle.setAndGetPrevious(newValue)) + .doOnSuccess(oldValue -> { + if (printPreviousValue) + System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); + }) + ), + numRepeats, + tuple -> tuple.getT1().close()); + } + + private static Mono rangeTestPutValueAndGetPrevious() { + var ser = FixedLengthSerializer.noop(4); + var vser = Serializer.noopBytes(); + var itemKey = new byte[]{0, 1, 2, 3}; + var newValue = new byte[]{4, 5, 6, 7}; + var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey); + return test("MapDictionaryRange::putValueAndGetPrevious (same key, same value)", + tempDb() + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))), + tuple -> Mono + .defer(() -> Mono + .fromRunnable(() -> { + if (printPreviousValue) + System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue)); + }) + .then(tuple.getT2().putValueAndGetPrevious(itemKeyBuffer, newValue)) + .doOnSuccess(oldValue -> { + if (printPreviousValue) + System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); + }) + ), + numRepeats, + tuple -> tuple.getT1().close()); + } + + private static Mono rangeTestPutValue() { + var ser = FixedLengthSerializer.noop(4); + var vser = Serializer.noopBytes(); + var itemKey = new byte[]{0, 1, 2, 3}; + var newValue = new byte[]{4, 5, 6, 7}; + var itemKeyBuffer = Unpooled.wrappedBuffer(itemKey); + return test("MapDictionaryRange::putValue (same key, same value)", + tempDb() + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))), + tuple -> Mono + .defer(() -> Mono + .fromRunnable(() -> { + if (printPreviousValue) + System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue)); + }) + .then(tuple.getT2().putValue(itemKeyBuffer, newValue)) + ), + numRepeats, tuple -> tuple.getT1().close()); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 9ecf3fe..23a3240 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -3,223 +3,172 @@ package it.cavallium.dbengine.database.collections; import io.netty.buffer.Unpooled; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; -import it.cavallium.dbengine.database.LLRange; -import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.LLDictionaryResultType; +import it.cavallium.dbengine.database.LLUtils; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; -import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.Mono; -import reactor.util.function.Tuples; -// todo: implement optimized methods -public class DatabaseMapDictionary> implements DatabaseStageMap { +/** + * Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle" + */ +public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> { - public static final byte[] EMPTY_BYTES = new byte[0]; - private final LLDictionary dictionary; - private final SubStageGetter subStageGetter; - private final FixedLengthSerializer keySuffixSerializer; - private final byte[] keyPrefix; - private final int keySuffixLength; - private final int keyExtLength; - private final LLRange range; + private final Serializer valueSerializer; - private static byte[] firstKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) { - return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0x00); + protected DatabaseMapDictionary(LLDictionary dictionary, byte[] prefixKey, FixedLengthSerializer keySuffixSerializer, Serializer valueSerializer) { + super(dictionary, new SubStageGetterSingle<>(valueSerializer), keySuffixSerializer, prefixKey, 0); + this.valueSerializer = valueSerializer; } - private static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) { - return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0xFF); - } - - private static byte[] fillKeySuffixAndExt(byte[] prefixKey, int prefixLength, int suffixLength, int extLength, byte fillValue) { - assert prefixKey.length == prefixLength; - assert suffixLength > 0; - assert extLength > 0; - byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength); - Arrays.fill(result, prefixLength, result.length, fillValue); - return result; - } - - private static byte[] firstKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) { - return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0x00); - } - - private static byte[] lastKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) { - return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0xFF); - } - - private static byte[] fillKeyExt(byte[] prefixKey, - byte[] suffixKey, - int prefixLength, - int suffixLength, - int extLength, - byte fillValue) { - assert prefixKey.length == prefixLength; - assert suffixKey.length == suffixLength; - assert suffixLength > 0; - assert extLength > 0; - byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength); - System.arraycopy(suffixKey, 0, result, prefixLength, suffixLength); - Arrays.fill(result, prefixLength + suffixLength, result.length, fillValue); - return result; - } - - public static DatabaseMapDictionary> simple(LLDictionary dictionary, - SubStageGetterSingle subStageGetter, - FixedLengthSerializer keySerializer) { - return new DatabaseMapDictionary<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, 0); - } - - public static > DatabaseMapDictionary deep(LLDictionary dictionary, - SubStageGetter subStageGetter, + public static DatabaseMapDictionary simple(LLDictionary dictionary, FixedLengthSerializer keySerializer, - int keyExtLength) { - return new DatabaseMapDictionary<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength); + Serializer valueSerializer) { + return new DatabaseMapDictionary<>(dictionary, EMPTY_BYTES, keySerializer, valueSerializer); } - public static > DatabaseMapDictionary deepIntermediate(LLDictionary dictionary, - SubStageGetter subStageGetter, + public static DatabaseMapDictionary tail(LLDictionary dictionary, FixedLengthSerializer keySuffixSerializer, - byte[] prefixKey, - int keyExtLength) { - return new DatabaseMapDictionary<>(dictionary, subStageGetter, keySuffixSerializer, prefixKey, keyExtLength); + Serializer valueSerializer, + byte[] prefixKey) { + return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer); } - private DatabaseMapDictionary(LLDictionary dictionary, - SubStageGetter subStageGetter, - FixedLengthSerializer keySuffixSerializer, - byte[] prefixKey, - int keyExtLength) { - this.dictionary = dictionary; - this.subStageGetter = subStageGetter; - this.keySuffixSerializer = keySuffixSerializer; - this.keyPrefix = prefixKey; - this.keySuffixLength = keySuffixSerializer.getLength(); - this.keyExtLength = keyExtLength; - byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength); - byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength); - this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey); - } - - @SuppressWarnings("unused") - private boolean suffixKeyConsistency(int keySuffixLength) { - return this.keySuffixLength == keySuffixLength; - } - - @SuppressWarnings("unused") - private boolean extKeyConsistency(int keyExtLength) { - return this.keyExtLength == keyExtLength; - } - - @SuppressWarnings("unused") - private boolean suffixAndExtKeyConsistency(int keySuffixAndExtLength) { - return this.keySuffixLength + this.keyExtLength == keySuffixAndExtLength; - } - - /** - * Keep only suffix and ext - */ - private byte[] stripPrefix(byte[] key) { - return Arrays.copyOfRange(key, this.keyPrefix.length, key.length); - } - - /** - * Remove ext from suffix - */ - private byte[] trimSuffix(byte[] keySuffix) { - if (keySuffix.length == keySuffixLength) - return keySuffix; - return Arrays.copyOf(keySuffix, keySuffixLength); - } - - /** - * Remove ext from full key - */ - private byte[] removeExtFromFullKey(byte[] key) { - return Arrays.copyOf(key, keyPrefix.length + keySuffixLength); - } - - /** - * Add prefix to suffix - */ - private byte[] toKeyWithoutExt(byte[] suffixKey) { - assert suffixKey.length == keySuffixLength; - byte[] result = Arrays.copyOf(keyPrefix, keyPrefix.length + keySuffixLength); - System.arraycopy(suffixKey, 0, result, keyPrefix.length, keySuffixLength); - return result; - } - - /** - * Remove suffix from keySuffix, returning probably an empty byte array - */ - private byte[] stripSuffix(byte[] keySuffix) { - if (keySuffix.length == this.keySuffixLength) - return EMPTY_BYTES; - return Arrays.copyOfRange(keySuffix, this.keySuffixLength, keySuffix.length); - } - - private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) { - if (snapshot == null) { - return null; - } else { - return snapshot.getSnapshot(dictionary); - } - } - - private LLRange toExtRange(byte[] keySuffix) { - byte[] first = firstKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength); - byte[] end = lastKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength); - return LLRange.of(first, end); + private byte[] toKey(byte[] suffixKey) { + assert suffixKeyConsistency(suffixKey.length); + byte[] key = Arrays.copyOf(keyPrefix, keyPrefix.length + suffixKey.length); + System.arraycopy(suffixKey, 0, key, keyPrefix.length, suffixKey.length); + return key; } @Override - public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { - byte[] keySuffixData = serializeSuffix(keySuffix); - Flux rangeKeys = this - .dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData) - ); - return this.subStageGetter - .subStage(dictionary, snapshot, toKeyWithoutExt(keySuffixData), rangeKeys); + public Mono> get(@Nullable CompositeSnapshot snapshot) { + return dictionary + .getRange(resolveSnapshot(snapshot), range) + .collectMap(entry -> deserializeSuffix(stripPrefix(entry.getKey())), entry -> deserialize(entry.getValue()), HashMap::new); } @Override - public Flux> getAllStages(@Nullable CompositeSnapshot snapshot) { - Flux> groupedFlux = dictionary + public Mono> setAndGetPrevious(Map value) { + return dictionary + .setRange(range, + Flux + .fromIterable(value.entrySet()) + .map(entry -> Map.entry(serializeSuffix(entry.getKey()), serialize(entry.getValue()))), + true + ) + .collectMap(entry -> deserializeSuffix(stripPrefix(entry.getKey())), entry -> deserialize(entry.getValue()), HashMap::new); + } + + private Entry stripPrefix(Entry entry) { + byte[] keySuffix = stripPrefix(entry.getKey()); + return Map.entry(keySuffix, entry.getValue()); + } + + @Override + public Mono> clearAndGetPrevious() { + return dictionary + .setRange(range, Flux.empty(), true) + .collectMap(entry -> deserializeSuffix(stripPrefix(entry.getKey())), entry -> deserialize(entry.getValue()), HashMap::new); + } + + @Override + public Mono size(@Nullable CompositeSnapshot snapshot, boolean fast) { + return dictionary.sizeRange(resolveSnapshot(snapshot), range, true); + } + + @Override + public Mono> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { + return Mono.just(new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noopBytes())).map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer)); + } + + @Override + public Mono getValue(@Nullable CompositeSnapshot snapshot, T keySuffix) { + return dictionary.get(resolveSnapshot(snapshot), toKey(serializeSuffix(keySuffix))).map(this::deserialize); + } + + @Override + public Mono putValue(T keySuffix, U value) { + return dictionary.put(toKey(serializeSuffix(keySuffix)), serialize(value), LLDictionaryResultType.VOID).then(); + } + + @Override + public Mono putValueAndGetPrevious(T keySuffix, U value) { + return dictionary + .put(toKey(serializeSuffix(keySuffix)), serialize(value), LLDictionaryResultType.PREVIOUS_VALUE) + .map(this::deserialize); + } + + @Override + public Mono putValueAndGetStatus(T keySuffix, U value) { + return dictionary + .put(toKey(serializeSuffix(keySuffix)), serialize(value), LLDictionaryResultType.VALUE_CHANGED) + .map(LLUtils::responseToBoolean); + } + + @Override + public Mono remove(T keySuffix) { + return dictionary.remove(toKey(serializeSuffix(keySuffix)), LLDictionaryResultType.VOID).then(); + } + + @Override + public Mono removeAndGetPrevious(T keySuffix) { + return dictionary + .remove(toKey(serializeSuffix(keySuffix)), LLDictionaryResultType.PREVIOUS_VALUE) + .map(this::deserialize); + } + + @Override + public Mono removeAndGetStatus(T keySuffix) { + return dictionary + .remove(toKey(serializeSuffix(keySuffix)), LLDictionaryResultType.VALUE_CHANGED) + .map(LLUtils::responseToBoolean); + } + + @Override + public Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys) { + return dictionary + .getMulti(resolveSnapshot(snapshot), keys.map(keySuffix -> toKey(serializeSuffix(keySuffix)))) + .map(entry -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue()))); + } + + @Override + public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot) { + return dictionary .getRangeKeys(resolveSnapshot(snapshot), range) - .groupBy(this::removeExtFromFullKey); - return groupedFlux - .flatMap(rangeKeys -> this.subStageGetter - .subStage(dictionary, snapshot, rangeKeys.key(), rangeKeys) - .map(us -> Map.entry(this.deserializeSuffix(this.stripPrefix(rangeKeys.key())), us)) - ); + .map(keySuffix -> Map.entry(deserializeSuffix(stripPrefix(keySuffix)), + new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary, toKey(stripPrefix(keySuffix)), Serializer.noopBytes()), + valueSerializer + ) + )); } @Override public Flux> setAllValuesAndGetPrevious(Flux> entries) { - var newValues = entries - .flatMap(entry -> at(null, entry.getKey()).map(us -> Tuples.of(us, entry.getValue()))) - .flatMap(tuple -> tuple.getT1().set(tuple.getT2())); - - return getAllStages(null) - .flatMap(stage -> stage.getValue().get(null).map(val -> Map.entry(stage.getKey(), val))) - .concatWith(newValues.then(Mono.empty())); + var serializedEntries = entries + .map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))); + return dictionary.setRange(range, serializedEntries, true) + .map(entry -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue()))); } //todo: temporary wrapper. convert the whole class to buffers - private T deserializeSuffix(byte[] keySuffix) { - var serialized = Unpooled.wrappedBuffer(keySuffix); - return keySuffixSerializer.deserialize(serialized); + private U deserialize(byte[] bytes) { + var serialized = Unpooled.wrappedBuffer(bytes); + return valueSerializer.deserialize(serialized); } //todo: temporary wrapper. convert the whole class to buffers - private byte[] serializeSuffix(T keySuffix) { - var output = Unpooled.buffer(keySuffixLength, keySuffixLength); - var outputBytes = new byte[keySuffixLength]; - keySuffixSerializer.serialize(keySuffix, output); - output.getBytes(0, outputBytes, 0, keySuffixLength); + private byte[] serialize(U bytes) { + var output = Unpooled.buffer(); + valueSerializer.serialize(bytes, output); + output.resetReaderIndex(); + int length = output.readableBytes(); + var outputBytes = new byte[length]; + output.getBytes(0, outputBytes, 0, length); return outputBytes; } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java new file mode 100644 index 0000000..84d229c --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -0,0 +1,229 @@ +package it.cavallium.dbengine.database.collections; + +import io.netty.buffer.Unpooled; +import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.LLSnapshot; +import java.util.Arrays; +import java.util.Map; +import java.util.Map.Entry; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.GroupedFlux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuples; + +// todo: implement optimized methods +public class DatabaseMapDictionaryDeep> implements DatabaseStageMap { + + public static final byte[] EMPTY_BYTES = new byte[0]; + protected final LLDictionary dictionary; + protected final SubStageGetter subStageGetter; + protected final FixedLengthSerializer keySuffixSerializer; + protected final byte[] keyPrefix; + protected final int keySuffixLength; + protected final int keyExtLength; + protected final LLRange range; + + protected static byte[] firstKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) { + return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0x00); + } + + protected static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) { + return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0xFF); + } + + protected static byte[] fillKeySuffixAndExt(byte[] prefixKey, int prefixLength, int suffixLength, int extLength, byte fillValue) { + assert prefixKey.length == prefixLength; + assert suffixLength > 0; + assert extLength > 0; + byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength); + Arrays.fill(result, prefixLength, result.length, fillValue); + return result; + } + + protected static byte[] firstKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) { + return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0x00); + } + + protected static byte[] lastKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) { + return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0xFF); + } + + protected static byte[] fillKeyExt(byte[] prefixKey, + byte[] suffixKey, + int prefixLength, + int suffixLength, + int extLength, + byte fillValue) { + assert prefixKey.length == prefixLength; + assert suffixKey.length == suffixLength; + assert suffixLength > 0; + assert extLength > 0; + byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength); + System.arraycopy(suffixKey, 0, result, prefixLength, suffixLength); + Arrays.fill(result, prefixLength + suffixLength, result.length, fillValue); + return result; + } + + /** + * Use DatabaseMapDictionaryRange.simple instead + */ + @Deprecated + public static DatabaseMapDictionaryDeep> simple(LLDictionary dictionary, + SubStageGetterSingle subStageGetter, + FixedLengthSerializer keySerializer) { + return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, 0); + } + + public static > DatabaseMapDictionaryDeep deepTail(LLDictionary dictionary, + SubStageGetter subStageGetter, + FixedLengthSerializer keySerializer, + int keyExtLength) { + return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength); + } + + public static > DatabaseMapDictionaryDeep deepIntermediate(LLDictionary dictionary, + SubStageGetter subStageGetter, + FixedLengthSerializer keySuffixSerializer, + byte[] prefixKey, + int keyExtLength) { + return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySuffixSerializer, prefixKey, keyExtLength); + } + + protected DatabaseMapDictionaryDeep(LLDictionary dictionary, + SubStageGetter subStageGetter, + FixedLengthSerializer keySuffixSerializer, + byte[] prefixKey, + int keyExtLength) { + this.dictionary = dictionary; + this.subStageGetter = subStageGetter; + this.keySuffixSerializer = keySuffixSerializer; + this.keyPrefix = prefixKey; + this.keySuffixLength = keySuffixSerializer.getLength(); + this.keyExtLength = keyExtLength; + byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength); + byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength); + this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey); + } + + @SuppressWarnings("unused") + protected boolean suffixKeyConsistency(int keySuffixLength) { + return this.keySuffixLength == keySuffixLength; + } + + @SuppressWarnings("unused") + protected boolean extKeyConsistency(int keyExtLength) { + return this.keyExtLength == keyExtLength; + } + + @SuppressWarnings("unused") + protected boolean suffixAndExtKeyConsistency(int keySuffixAndExtLength) { + return this.keySuffixLength + this.keyExtLength == keySuffixAndExtLength; + } + + /** + * Keep only suffix and ext + */ + protected byte[] stripPrefix(byte[] key) { + return Arrays.copyOfRange(key, this.keyPrefix.length, key.length); + } + + /** + * Remove ext from suffix + */ + protected byte[] trimSuffix(byte[] keySuffix) { + if (keySuffix.length == keySuffixLength) + return keySuffix; + return Arrays.copyOf(keySuffix, keySuffixLength); + } + + /** + * Remove ext from full key + */ + protected byte[] removeExtFromFullKey(byte[] key) { + return Arrays.copyOf(key, keyPrefix.length + keySuffixLength); + } + + /** + * Add prefix to suffix + */ + protected byte[] toKeyWithoutExt(byte[] suffixKey) { + assert suffixKey.length == keySuffixLength; + byte[] result = Arrays.copyOf(keyPrefix, keyPrefix.length + keySuffixLength); + System.arraycopy(suffixKey, 0, result, keyPrefix.length, keySuffixLength); + return result; + } + + /** + * Remove suffix from keySuffix, returning probably an empty byte array + */ + protected byte[] stripSuffix(byte[] keySuffix) { + if (keySuffix.length == this.keySuffixLength) + return EMPTY_BYTES; + return Arrays.copyOfRange(keySuffix, this.keySuffixLength, keySuffix.length); + } + + protected LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) { + if (snapshot == null) { + return null; + } else { + return snapshot.getSnapshot(dictionary); + } + } + + protected LLRange toExtRange(byte[] keySuffix) { + byte[] first = firstKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength); + byte[] end = lastKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength); + return LLRange.of(first, end); + } + + @Override + public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { + byte[] keySuffixData = serializeSuffix(keySuffix); + Flux rangeKeys = this + .dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData) + ); + return this.subStageGetter + .subStage(dictionary, snapshot, toKeyWithoutExt(keySuffixData), rangeKeys); + } + + @Override + public Flux> getAllStages(@Nullable CompositeSnapshot snapshot) { + Flux> groupedFlux = dictionary + .getRangeKeys(resolveSnapshot(snapshot), range) + .groupBy(this::removeExtFromFullKey); + return groupedFlux + .flatMap(rangeKeys -> this.subStageGetter + .subStage(dictionary, snapshot, rangeKeys.key(), rangeKeys) + .map(us -> Map.entry(this.deserializeSuffix(this.stripPrefix(rangeKeys.key())), us)) + ); + } + + @Override + public Flux> setAllValuesAndGetPrevious(Flux> entries) { + var newValues = entries + .flatMap(entry -> at(null, entry.getKey()).map(us -> Tuples.of(us, entry.getValue()))) + .flatMap(tuple -> tuple.getT1().set(tuple.getT2())); + + return getAllStages(null) + .flatMap(stage -> stage.getValue().get(null).map(val -> Map.entry(stage.getKey(), val))) + .concatWith(newValues.then(Mono.empty())); + } + + //todo: temporary wrapper. convert the whole class to buffers + protected T deserializeSuffix(byte[] keySuffix) { + var serialized = Unpooled.wrappedBuffer(keySuffix); + return keySuffixSerializer.deserialize(serialized); + } + + //todo: temporary wrapper. convert the whole class to buffers + protected byte[] serializeSuffix(T keySuffix) { + var output = Unpooled.buffer(keySuffixLength, keySuffixLength); + var outputBytes = new byte[keySuffixLength]; + keySuffixSerializer.serialize(keySuffix, output); + output.getBytes(0, outputBytes, 0, keySuffixLength); + return outputBytes; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java deleted file mode 100644 index be707d4..0000000 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java +++ /dev/null @@ -1,175 +0,0 @@ -package it.cavallium.dbengine.database.collections; - -import io.netty.buffer.Unpooled; -import it.cavallium.dbengine.client.CompositeSnapshot; -import it.cavallium.dbengine.database.LLDictionary; -import it.cavallium.dbengine.database.LLRange; -import it.cavallium.dbengine.database.LLSnapshot; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import org.jetbrains.annotations.Nullable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -/** - * Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle" - */ -public class DatabaseMapDictionaryRange implements DatabaseStageMap> { - - public static final byte[] NO_PREFIX = new byte[0]; - private final LLDictionary dictionary; - private final byte[] keyPrefix; - private final FixedLengthSerializer keySuffixSerializer; - private final LLRange range; - private final Serializer valueSerializer; - - private static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength) { - assert prefixKey.length == prefixLength; - byte[] lastKey = Arrays.copyOf(prefixKey, prefixLength + suffixLength); - Arrays.fill(lastKey, prefixLength, lastKey.length, (byte) 0xFF); - return lastKey; - } - - private static byte[] firstKey(byte[] prefixKey, int prefixLength, int suffixLength) { - assert prefixKey.length == prefixLength; - byte[] lastKey = Arrays.copyOf(prefixKey, prefixLength + suffixLength); - Arrays.fill(lastKey, prefixLength, lastKey.length, (byte) 0x00); - return lastKey; - } - - @SuppressWarnings("unused") - public DatabaseMapDictionaryRange(LLDictionary dictionary, FixedLengthSerializer keySuffixSerializer, Serializer valueSerializer) { - this(dictionary, NO_PREFIX, keySuffixSerializer, valueSerializer); - } - - public DatabaseMapDictionaryRange(LLDictionary dictionary, byte[] prefixKey, FixedLengthSerializer keySuffixSerializer, Serializer valueSerializer) { - this.dictionary = dictionary; - this.keyPrefix = prefixKey; - this.keySuffixSerializer = keySuffixSerializer; - this.valueSerializer = valueSerializer; - byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixSerializer.getLength()); - byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixSerializer.getLength()); - this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey); - } - - private boolean suffixKeyConsistency(int keySuffixLength) { - return this.keySuffixSerializer.getLength() == keySuffixLength; - } - - private byte[] toKey(byte[] suffixKey) { - assert suffixKeyConsistency(suffixKey.length); - byte[] key = Arrays.copyOf(keyPrefix, keyPrefix.length + suffixKey.length); - System.arraycopy(suffixKey, 0, key, keyPrefix.length, suffixKey.length); - return key; - } - - private byte[] stripPrefix(byte[] key) { - return Arrays.copyOfRange(key, this.keyPrefix.length, key.length); - } - - private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) { - if (snapshot == null) { - return null; - } else { - return snapshot.getSnapshot(dictionary); - } - } - - @Override - public Mono> get(@Nullable CompositeSnapshot snapshot) { - return dictionary - .getRange(resolveSnapshot(snapshot), range) - .map(this::stripPrefix) - .collectMap(entry -> deserializeSuffix(entry.getKey()), entry -> deserialize(entry.getValue()), HashMap::new); - } - - @Override - public Mono> setAndGetPrevious(Map value) { - return dictionary - .setRange(range, - Flux - .fromIterable(value.entrySet()) - .map(entry -> Map.entry(serializeSuffix(entry.getKey()), serialize(entry.getValue()))), - true - ) - .map(this::stripPrefix) - .collectMap(entry -> deserializeSuffix(entry.getKey()), entry -> deserialize(entry.getValue()), HashMap::new); - } - - private Entry stripPrefix(Entry entry) { - byte[] keySuffix = stripPrefix(entry.getKey()); - return Map.entry(keySuffix, entry.getValue()); - } - - @Override - public Mono> clearAndGetPrevious() { - return dictionary - .setRange(range, Flux.empty(), true) - .map(this::stripPrefix) - .collectMap(entry -> deserializeSuffix(entry.getKey()), entry -> deserialize(entry.getValue()), HashMap::new); - } - - @Override - public Mono size(@Nullable CompositeSnapshot snapshot, boolean fast) { - return dictionary.sizeRange(resolveSnapshot(snapshot), range, true); - } - - @Override - public Mono> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { - return Mono.just(new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noopBytes())).map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer)); - } - - @Override - public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot) { - return dictionary - .getRangeKeys(resolveSnapshot(snapshot), range) - .map(this::stripPrefix) - .map(keySuffix -> Map.entry(deserializeSuffix(keySuffix), - new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary, toKey(keySuffix), Serializer.noopBytes()), - valueSerializer - ) - )); - } - - @Override - public Flux> setAllValuesAndGetPrevious(Flux> entries) { - var serializedEntries = entries - .map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))); - return dictionary.setRange(range, serializedEntries, true) - .map(entry -> Map.entry(deserializeSuffix(entry.getKey()), deserialize(entry.getValue()))); - } - - //todo: temporary wrapper. convert the whole class to buffers - private U deserialize(byte[] bytes) { - var serialized = Unpooled.wrappedBuffer(bytes); - return valueSerializer.deserialize(serialized); - } - - //todo: temporary wrapper. convert the whole class to buffers - private byte[] serialize(U bytes) { - var output = Unpooled.buffer(); - valueSerializer.serialize(bytes, output); - output.resetReaderIndex(); - int length = output.readableBytes(); - var outputBytes = new byte[length]; - output.getBytes(0, outputBytes, 0, length); - return outputBytes; - } - - //todo: temporary wrapper. convert the whole class to buffers - private T deserializeSuffix(byte[] keySuffix) { - var serialized = Unpooled.wrappedBuffer(keySuffix); - return keySuffixSerializer.deserialize(serialized); - } - - //todo: temporary wrapper. convert the whole class to buffers - private byte[] serializeSuffix(T keySuffix) { - var output = Unpooled.buffer(keySuffixSerializer.getLength(), keySuffixSerializer.getLength()); - var outputBytes = new byte[keySuffixSerializer.getLength()]; - keySuffixSerializer.serialize(keySuffix, output); - output.getBytes(0, outputBytes, 0, keySuffixSerializer.getLength()); - return outputBytes; - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java index 49edfb0..6a76c5f 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java @@ -27,7 +27,7 @@ public class SubStageGetterMapDeep> implements @Nullable CompositeSnapshot snapshot, byte[] prefixKey, Flux keyFlux) { - return Mono.just(DatabaseMapDictionary.deepIntermediate(dictionary, + return Mono.just(DatabaseMapDictionaryDeep.deepIntermediate(dictionary, subStageGetter, keySerializer, prefixKey, diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapRange.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapRange.java index 8d94c17..cd0c74c 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapRange.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapRange.java @@ -22,6 +22,6 @@ public class SubStageGetterMapRange implements SubStageGetter, D @Nullable CompositeSnapshot snapshot, byte[] prefixKey, Flux keyFlux) { - return Mono.just(new DatabaseMapDictionaryRange<>(dictionary, prefixKey, keySerializer, valueSerializer)); + return Mono.just(DatabaseMapDictionary.tail(dictionary, keySerializer, valueSerializer, prefixKey)); } }