diff --git a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java new file mode 100644 index 0000000..bbbc5ac --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java @@ -0,0 +1,12 @@ +package it.cavallium.dbengine.client; + +import reactor.core.publisher.Mono; + +public interface CompositeDatabase { + + Mono close(); + + Mono takeSnapshot() throws SnapshotException; + + Mono releaseSnapshot(CompositeSnapshot snapshot) throws SnapshotException; +} diff --git a/src/main/java/it/cavallium/dbengine/client/CompositeDatabasePartLocation.java b/src/main/java/it/cavallium/dbengine/client/CompositeDatabasePartLocation.java new file mode 100644 index 0000000..6b41dcb --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/CompositeDatabasePartLocation.java @@ -0,0 +1,56 @@ +package it.cavallium.dbengine.client; + +import java.util.Objects; +import java.util.StringJoiner; + +public class CompositeDatabasePartLocation { + private final CompositeDatabasePartType partType; + private final String partName; + + private CompositeDatabasePartLocation(CompositeDatabasePartType partType, String partName) { + this.partType = partType; + this.partName = partName; + } + + public static CompositeDatabasePartLocation of(CompositeDatabasePartType partType, String partName) { + return new CompositeDatabasePartLocation(partType, partName); + } + + public enum CompositeDatabasePartType { + KV_DATABASE, + LUCENE_INDEX + } + + public CompositeDatabasePartType getPartType() { + return partType; + } + + public String getPartName() { + return partName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompositeDatabasePartLocation that = (CompositeDatabasePartLocation) o; + return partType == that.partType && Objects.equals(partName, that.partName); + } + + @Override + public int hashCode() { + return Objects.hash(partType, partName); + } + + @Override + public String toString() { + return new StringJoiner(", ", CompositeDatabasePartLocation.class.getSimpleName() + "[", "]") + .add("partType=" + partType) + .add("partName='" + partName + "'") + .toString(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/client/CompositeSnapshot.java b/src/main/java/it/cavallium/dbengine/client/CompositeSnapshot.java new file mode 100644 index 0000000..f477989 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/CompositeSnapshot.java @@ -0,0 +1,32 @@ +package it.cavallium.dbengine.client; + +import it.cavallium.dbengine.client.CompositeDatabasePartLocation.CompositeDatabasePartType; +import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure; +import it.cavallium.dbengine.database.LLLuceneIndex; +import it.cavallium.dbengine.database.LLSnapshot; +import java.util.Map; +import java.util.Objects; + +public class CompositeSnapshot { + private final Map snapshots; + + public CompositeSnapshot(Map snapshots) { + this.snapshots = snapshots; + } + + public LLSnapshot getSnapshot(LLKeyValueDatabaseStructure database) { + return Objects.requireNonNull(snapshots.get(CompositeDatabasePartLocation.of(CompositeDatabasePartType.KV_DATABASE, + database.getDatabaseName() + )), () -> "No snapshot for database with name \"" + database.getDatabaseName() + "\""); + } + + public LLSnapshot getSnapshot(LLLuceneIndex luceneIndex) { + return Objects.requireNonNull(snapshots.get(CompositeDatabasePartLocation.of(CompositeDatabasePartType.LUCENE_INDEX, + luceneIndex.getLuceneIndexName() + )), () -> "No snapshot for lucene index with name \"" + luceneIndex.getLuceneIndexName() + "\""); + } + + public Map getAllSnapshots() { + return snapshots; + } +} diff --git a/src/main/java/it/cavallium/dbengine/client/SnapshotException.java b/src/main/java/it/cavallium/dbengine/client/SnapshotException.java new file mode 100644 index 0000000..61a7d73 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/SnapshotException.java @@ -0,0 +1,12 @@ +package it.cavallium.dbengine.client; + +public class SnapshotException extends RuntimeException { + + public SnapshotException(Exception ex) { + super(ex); + } + + public SnapshotException(String message) { + super(message); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEntry.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEntry.java new file mode 100644 index 0000000..1d4d4de --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEntry.java @@ -0,0 +1,9 @@ +package it.cavallium.dbengine.database.collections; + +public interface DatabaseEntry extends DatabaseStage { + + @Override + default DatabaseEntry entry() { + return this; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEntryable.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEntryable.java new file mode 100644 index 0000000..25740fd --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEntryable.java @@ -0,0 +1,6 @@ +package it.cavallium.dbengine.database.collections; + +public interface DatabaseEntryable { + + DatabaseEntry entry(); +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryParent.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryParent.java new file mode 100644 index 0000000..8e8a6e5 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryParent.java @@ -0,0 +1,165 @@ +package it.cavallium.dbengine.database.collections; + +import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.LLSnapshot; +import java.util.Arrays; +import java.util.Map; +import java.util.Map.Entry; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.GroupedFlux; +import reactor.core.publisher.Mono; + +// todo: implement optimized methods +public abstract class DatabaseMapDictionaryParent> implements DatabaseStageMap { + + public static final byte[] EMPTY_BYTES = new byte[0]; + private final LLDictionary dictionary; + private final byte[] keyPrefix; + private final int keySuffixLength; + private final int keyExtLength; + private final LLRange range; + + private static byte[] firstKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) { + return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0x00); + } + + private static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) { + return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0xFF); + } + + private static byte[] fillKeySuffixAndExt(byte[] prefixKey, int prefixLength, int suffixLength, int extLength, byte fillValue) { + assert prefixKey.length == prefixLength; + assert suffixLength > 0; + assert extLength > 0; + byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength); + Arrays.fill(result, prefixLength, result.length, fillValue); + return result; + } + + private static byte[] firstKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) { + return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0x00); + } + + private static byte[] lastKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) { + return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0xFF); + } + + private static byte[] fillKeyExt(byte[] prefixKey, + byte[] suffixKey, + int prefixLength, + int suffixLength, + int extLength, + byte fillValue) { + assert prefixKey.length == prefixLength; + assert suffixKey.length == suffixLength; + assert suffixLength > 0; + assert extLength > 0; + byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength); + System.arraycopy(suffixKey, 0, result, prefixLength, suffixLength); + Arrays.fill(result, prefixLength + suffixLength, result.length, fillValue); + return result; + } + + @SuppressWarnings("unused") + public DatabaseMapDictionaryParent(LLDictionary dictionary, int keyLength, int keyExtLength) { + this(dictionary, EMPTY_BYTES, keyLength, keyExtLength); + } + + public DatabaseMapDictionaryParent(LLDictionary dictionary, byte[] prefixKey, int keySuffixLength, int keyExtLength) { + this.dictionary = dictionary; + this.keyPrefix = prefixKey; + this.keySuffixLength = keySuffixLength; + this.keyExtLength = keyExtLength; + byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength); + byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength); + this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey); + } + + @SuppressWarnings("unused") + private boolean suffixKeyConsistency(int keySuffixLength) { + return this.keySuffixLength == keySuffixLength; + } + + @SuppressWarnings("unused") + private boolean extKeyConsistency(int keyExtLength) { + return this.keyExtLength == keyExtLength; + } + + @SuppressWarnings("unused") + private boolean suffixAndExtKeyConsistency(int keySuffixAndExtLength) { + return this.keySuffixLength + this.keyExtLength == keySuffixAndExtLength; + } + + /** + * Keep only suffix and ext + */ + private byte[] stripPrefix(byte[] key) { + return Arrays.copyOfRange(key, this.keyPrefix.length, key.length); + } + + /** + * Remove ext from suffix + */ + private byte[] trimSuffix(byte[] keySuffix) { + if (keySuffix.length == keySuffixLength) return keySuffix; + return Arrays.copyOf(keySuffix, keySuffixLength); + } + + /** + * Remove suffix from keySuffix, returning probably an empty byte array + */ + private byte[] stripSuffix(byte[] keySuffix) { + if (keySuffix.length == this.keySuffixLength) return EMPTY_BYTES; + return Arrays.copyOfRange(keySuffix, this.keySuffixLength, keySuffix.length); + } + + private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) { + if (snapshot == null) { + return null; + } else { + return snapshot.getSnapshot(dictionary); + } + } + + private LLRange toExtRange(byte[] keySuffix) { + byte[] first = firstKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength); + byte[] end = lastKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength); + return LLRange.of(first, end); + } + + @Override + public Mono at(@Nullable CompositeSnapshot snapshot, byte[] keySuffix) { + return this.subStage( + this.dictionary + .getRange(resolveSnapshot(snapshot), toExtRange(stripPrefix(keySuffix))) + .map(key -> { + byte[] keyExt = this.stripSuffix(this.stripPrefix(key.getKey())); + return Map.entry(keyExt, key.getValue()); + }) + ); + } + + @Override + public Flux> getAllStages(@Nullable CompositeSnapshot snapshot) { + Flux>> groupedFlux = dictionary + .getRange(resolveSnapshot(snapshot), range) + .groupBy(entry -> this.trimSuffix(this.stripPrefix(entry.getKey()))); + return groupedFlux + .flatMap(keyValueFlux -> this + .subStage(keyValueFlux.map(key -> { + byte[] keyExt = this.stripSuffix(this.stripPrefix(key.getKey())); + return Map.entry(keyExt, key.getValue()); + })) + .map(us -> Map.entry(keyValueFlux.key(), us)) + ); + } + + /** + * + * @param keyValueFlux a flux with keyExt and full values from the same keySuffix + */ + protected abstract Mono subStage(Flux> keyValueFlux); +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java new file mode 100644 index 0000000..7b19f08 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryRange.java @@ -0,0 +1,159 @@ +package it.cavallium.dbengine.database.collections; + +import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLDictionaryResultType; +import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.LLSnapshot; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +// todo: implement optimized methods +public class DatabaseMapDictionaryRange implements DatabaseStageMap> { + + public static final byte[] NO_PREFIX = new byte[0]; + private final LLDictionary dictionary; + private final byte[] keyPrefix; + private final int keySuffixLength; + private final LLRange range; + + private static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength) { + assert prefixKey.length == prefixLength; + byte[] lastKey = Arrays.copyOf(prefixKey, prefixLength + suffixLength); + Arrays.fill(lastKey, prefixLength, lastKey.length, (byte) 0xFF); + return lastKey; + } + + private static byte[] firstKey(byte[] prefixKey, int prefixLength, int suffixLength) { + assert prefixKey.length == prefixLength; + byte[] lastKey = Arrays.copyOf(prefixKey, prefixLength + suffixLength); + Arrays.fill(lastKey, prefixLength, lastKey.length, (byte) 0x00); + return lastKey; + } + + @SuppressWarnings("unused") + public DatabaseMapDictionaryRange(LLDictionary dictionary, int keyLength) { + this(dictionary, NO_PREFIX, keyLength); + } + + public DatabaseMapDictionaryRange(LLDictionary dictionary, byte[] prefixKey, int keySuffixLength) { + this.dictionary = dictionary; + this.keyPrefix = prefixKey; + this.keySuffixLength = keySuffixLength; + byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength); + byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength); + this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey); + } + + private boolean suffixKeyConsistency(int keySuffixLength) { + return this.keySuffixLength == keySuffixLength; + } + + private byte[] toKey(byte[] suffixKey) { + assert suffixKeyConsistency(suffixKey.length); + byte[] key = Arrays.copyOf(keyPrefix, keyPrefix.length + suffixKey.length); + System.arraycopy(suffixKey, 0, key, keyPrefix.length, suffixKey.length); + return key; + } + + private byte[] stripPrefix(byte[] key) { + return Arrays.copyOfRange(key, this.keyPrefix.length, key.length); + } + + private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) { + if (snapshot == null) { + return null; + } else { + return snapshot.getSnapshot(dictionary); + } + } + + @Override + public Mono> get(@Nullable CompositeSnapshot snapshot) { + return dictionary + .getRange(resolveSnapshot(snapshot), range) + .map(this::stripPrefix) + .collectMap(Entry::getKey, Entry::getValue, HashMap::new); + } + + @Override + public Mono> setAndGetPrevious(Map value) { + return dictionary + .setRange(range, Flux.fromIterable(value.entrySet()), true) + .map(this::stripPrefix) + .collectMap(Entry::getKey, Entry::getValue, HashMap::new); + } + + private Entry stripPrefix(Entry entry) { + byte[] keySuffix = stripPrefix(entry.getKey()); + return Map.entry(keySuffix, entry.getValue()); + } + + @Override + public Mono> clearAndGetPrevious() { + return dictionary + .setRange(range, Flux.empty(), true) + .map(this::stripPrefix) + .collectMap(Entry::getKey, Entry::getValue, HashMap::new); + } + + @Override + public Mono size(@Nullable CompositeSnapshot snapshot, boolean fast) { + return dictionary.sizeRange(resolveSnapshot(snapshot), range, true); + } + + @Override + public Mono> at(@Nullable CompositeSnapshot snapshot, byte[] keySuffix) { + return Mono.just(new SingleDatabaseEntry(keySuffix)); + } + + @Override + public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot) { + return dictionary + .getRangeKeys(resolveSnapshot(snapshot), range) + .map(this::stripPrefix) + .map(keySuffix -> Map.entry(keySuffix, new SingleDatabaseEntry(keySuffix))); + } + + private class SingleDatabaseEntry implements DatabaseEntry { + + private final byte[] keySuffix; + + public SingleDatabaseEntry(byte[] keySuffix) { + this.keySuffix = keySuffix; + } + + @Override + public Mono get(@Nullable CompositeSnapshot snapshot) { + return dictionary.get(resolveSnapshot(snapshot), toKey(keySuffix)); + } + + @Override + public Mono setAndGetPrevious(byte[] value) { + return dictionary.put(toKey(keySuffix), value, LLDictionaryResultType.PREVIOUS_VALUE); + } + + @Override + public Mono clearAndGetPrevious() { + return dictionary.remove(toKey(keySuffix), LLDictionaryResultType.PREVIOUS_VALUE); + } + + @Override + public Mono size(@Nullable CompositeSnapshot snapshot, boolean fast) { + return dictionary + .isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(toKey(keySuffix))) + .map(empty -> empty ? 0L : 1L); + } + + @Override + public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { + return dictionary + .isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(toKey(keySuffix))); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMappable.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMappable.java new file mode 100644 index 0000000..5b8713c --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMappable.java @@ -0,0 +1,6 @@ +package it.cavallium.dbengine.database.collections; + +public interface DatabaseMappable> { + + DatabaseStageMap map(); +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java new file mode 100644 index 0000000..2b3b6bb --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java @@ -0,0 +1,45 @@ +package it.cavallium.dbengine.database.collections; + +import it.cavallium.dbengine.client.CompositeSnapshot; +import java.util.Objects; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Mono; + +public interface DatabaseStage extends DatabaseEntryable { + + Mono get(@Nullable CompositeSnapshot snapshot); + + default Mono getOrDefault(@Nullable CompositeSnapshot snapshot, Mono defaultValue) { + return get(snapshot).switchIfEmpty(defaultValue).single(); + } + + default Mono set(T value) { + return setAndGetStatus(value).then(); + } + + Mono setAndGetPrevious(T value); + + default Mono setAndGetStatus(T value) { + return setAndGetPrevious(value).map(oldValue -> !Objects.equals(oldValue, value)).defaultIfEmpty(false); + } + + default Mono clear() { + return clearAndGetStatus().then(); + } + + Mono clearAndGetPrevious(); + + default Mono clearAndGetStatus() { + return clearAndGetPrevious().map(Objects::nonNull).defaultIfEmpty(false); + } + + default Mono close() { + return Mono.empty(); + } + + Mono size(@Nullable CompositeSnapshot snapshot, boolean fast); + + default Mono isEmpty(@Nullable CompositeSnapshot snapshot) { + return size(snapshot, false).map(size -> size <= 0); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java new file mode 100644 index 0000000..7382f09 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -0,0 +1,144 @@ +package it.cavallium.dbengine.database.collections; + +import it.cavallium.dbengine.client.CompositeSnapshot; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.function.Function; +import org.jetbrains.annotations.Nullable; +import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; +import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@SuppressWarnings("unused") +public interface DatabaseStageMap> extends DatabaseEntry> { + + Mono at(@Nullable CompositeSnapshot snapshot, T key); + + default Mono getValue(@Nullable CompositeSnapshot snapshot, T key) { + return this.at(snapshot, key).flatMap(v -> v.get(snapshot)); + } + + default Mono getValueOrDefault(@Nullable CompositeSnapshot snapshot, T key, Mono defaultValue) { + return getValue(snapshot, key).switchIfEmpty(defaultValue).single(); + } + + default Mono putValue(T key, U value) { + return putValueAndGetStatus(key, value).then(); + } + + default Mono putValueAndGetPrevious(T key, U value) { + return at(null, key).flatMap(v -> v.setAndGetPrevious(value)); + } + + default Mono putValueAndGetStatus(T key, U value) { + return putValueAndGetPrevious(key, value).map(oldValue -> !Objects.equals(oldValue, value)).defaultIfEmpty(false); + } + + default Mono remove(T key) { + return removeAndGetStatus(key).then(); + } + + default Mono removeAndGetPrevious(T key) { + return at(null, key).flatMap(DatabaseStage::clearAndGetPrevious); + } + + default Mono removeAndGetStatus(T key) { + return removeAndGetPrevious(key).map(o -> true).defaultIfEmpty(false); + } + + default Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys) { + return keys.flatMap(key -> this.getValue(snapshot, key).map(value -> Map.entry(key, value))); + } + + default Mono putMulti(Flux> entries) { + return entries.flatMap(entry -> this.putValue(entry.getKey(), entry.getValue())).then(); + } + + Flux> getAllStages(@Nullable CompositeSnapshot snapshot); + + default Flux> getAllValues(@Nullable CompositeSnapshot snapshot) { + return this + .getAllStages(null) + .flatMap(entry -> entry.getValue().get(null).map(value -> Map.entry(entry.getKey(), value))); + } + + default Mono setAllValues(Flux> entries) { + return setAllValuesAndGetPrevious(entries).then(); + } + + default Flux> setAllValuesAndGetPrevious(Flux> entries) { + return this + .clear() + .thenMany(entries) + .flatMap(entry -> this.putValue(entry.getKey(), entry.getValue()).thenReturn(entry)); + } + + default Mono clear() { + return setAllValues(Flux.empty()); + } + + default Mono replaceAllValues(boolean canKeysChange, Function, Mono>> entriesReplacer) { + Flux> replacedFlux = this + .getAllValues(null) + .flatMap(entriesReplacer); + if (canKeysChange) { + return this + .setAllValues(replacedFlux) + .then(); + } else { + return replacedFlux + .flatMap(replacedEntry -> this + .at(null, replacedEntry.getKey()) + .map(entry -> entry.set(replacedEntry.getValue())) + ) + .then(); + } + } + + default Mono replaceAll(Function, Mono> entriesReplacer) { + return this + .getAllStages(null) + .flatMap(entriesReplacer) + .then(); + } + + @Override + default Mono> setAndGetPrevious(Map value) { + return this + .setAllValuesAndGetPrevious(Flux.fromIterable(value.entrySet())) + .collectMap(Entry::getKey, Entry::getValue, HashMap::new); + } + + @Override + default Mono> clearAndGetPrevious() { + return this.setAndGetPrevious(Map.of()); + } + + @Override + default Mono> get(@Nullable CompositeSnapshot snapshot) { + return getAllValues(snapshot) + .collectMap(Entry::getKey, Entry::getValue, HashMap::new); + } + + @Override + default Mono size(@Nullable CompositeSnapshot snapshot, boolean fast) { + return getAllStages(snapshot).count(); + } + + /** + * Value getter doesn't lock data. Please make sure to lock before getting data. + */ + default ValueGetterBlocking getDbValueGetter() { + return k -> getValue(null, k).block(); + } + + /** + * Value getter doesn't lock data. Please make sure to lock before getting data. + */ + default ValueGetter getAsyncDbValueGetter() { + return k -> getValue(null, k); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageQueryable.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageQueryable.java new file mode 100644 index 0000000..08faa47 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageQueryable.java @@ -0,0 +1,3 @@ +package it.cavallium.dbengine.database.collections; + +public interface DatabaseStageQueryable {} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/Joiner.java b/src/main/java/it/cavallium/dbengine/database/collections/Joiner.java new file mode 100644 index 0000000..2902e6e --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/Joiner.java @@ -0,0 +1,27 @@ +package it.cavallium.dbengine.database.collections; + +import reactor.core.publisher.Mono; + +public interface Joiner { + + interface ValueGetter { + + /** + * Can return Mono error IOException + */ + Mono get(KEY key); + } + + /** + * Warning! You must only join with immutable data to ensure data correctness. + * Good examples: message id, send date, ... + * Bad examples: message content, views, edited, ... + * + * Can return Mono error IOException + */ + Mono join(ValueGetter dbValueGetter, DBVALUE value); + + static Joiner direct() { + return (dbValueGetter, value) -> Mono.just(value); + }; +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/JoinerBlocking.java b/src/main/java/it/cavallium/dbengine/database/collections/JoinerBlocking.java new file mode 100644 index 0000000..da72042 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/JoinerBlocking.java @@ -0,0 +1,21 @@ +package it.cavallium.dbengine.database.collections; + +import java.io.IOException; + +public interface JoinerBlocking { + + interface ValueGetterBlocking { + VALUE get(KEY key) throws IOException; + } + + /** + * Warning! You must only join with immutable data to ensure data correctness. + * Good examples: message id, send date, ... + * Bad examples: message content, views, edited, ... + */ + JOINEDVALUE join(ValueGetterBlocking dbValueGetter, DBVALUE value) throws IOException; + + static JoinerBlocking direct() { + return (dbValueGetter, value) -> value; + }; +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index ce8ba45..d1934d7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -269,6 +269,13 @@ public class LLLocalDictionary implements LLDictionary { } } + private Flux> getRangeSingle(LLSnapshot snapshot, byte[] key) { + return this + .get(snapshot, key) + .map(value -> Map.entry(key, value)) + .flux(); + } + private Flux> getRangeMulti(LLSnapshot snapshot, LLRange range) { return Mono .fromCallable(() -> { @@ -324,13 +331,70 @@ public class LLLocalDictionary implements LLDictionary { ); } - private Flux> getRangeSingle(LLSnapshot snapshot, byte[] key) { + @Override + public Flux getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) { + if (range.isSingle()) { + return getRangeKeysSingle(snapshot, range.getMin()); + } else { + return getRangeKeysMulti(snapshot, range); + } + } + + private Flux getRangeKeysSingle(LLSnapshot snapshot, byte[] key) { return this - .get(snapshot, key) - .map(value -> Map.entry(key, value)) + .containsKey(snapshot, key) + .filter(contains -> contains) + .map(contains -> key) .flux(); } + private Flux getRangeKeysMulti(LLSnapshot snapshot, LLRange range) { + return Mono + .fromCallable(() -> { + var iter = db.newIterator(cfh, resolveSnapshot(snapshot)); + if (range.hasMin()) { + iter.seek(range.getMin()); + } else { + iter.seekToFirst(); + } + return iter; + }) + .subscribeOn(Schedulers.boundedElastic()) + .flatMapMany(rocksIterator -> Flux + .fromIterable(() -> { + VariableWrapper nextKey = new VariableWrapper<>(null); + return new Iterator<>() { + @Override + public boolean hasNext() { + assert nextKey.var == null; + if (!rocksIterator.isValid()) { + nextKey.var = null; + return false; + } + var key = rocksIterator.key(); + var value = rocksIterator.value(); + if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { + nextKey.var = null; + return false; + } + nextKey.var = key; + return true; + } + + @Override + public byte[] next() { + var key = nextKey.var; + assert key != null; + nextKey.var = null; + return key; + } + }; + }) + .doFinally(signalType -> rocksIterator.close()) + .subscribeOn(Schedulers.boundedElastic()) + ); + } + @Override public Flux> setRange(LLRange range, Flux> entries,