Update CompositeDatabase.java, CompositeDatabasePartLocation.java, and 13 more files...

This commit is contained in:
Andrea Cavalli 2021-01-30 20:16:14 +01:00
parent abe1f35544
commit a1d4731ae3
15 changed files with 764 additions and 3 deletions

View File

@ -0,0 +1,12 @@
package it.cavallium.dbengine.client;
import reactor.core.publisher.Mono;
public interface CompositeDatabase {
Mono<Void> close();
Mono<CompositeSnapshot> takeSnapshot() throws SnapshotException;
Mono<Void> releaseSnapshot(CompositeSnapshot snapshot) throws SnapshotException;
}

View File

@ -0,0 +1,56 @@
package it.cavallium.dbengine.client;
import java.util.Objects;
import java.util.StringJoiner;
public class CompositeDatabasePartLocation {
private final CompositeDatabasePartType partType;
private final String partName;
private CompositeDatabasePartLocation(CompositeDatabasePartType partType, String partName) {
this.partType = partType;
this.partName = partName;
}
public static CompositeDatabasePartLocation of(CompositeDatabasePartType partType, String partName) {
return new CompositeDatabasePartLocation(partType, partName);
}
public enum CompositeDatabasePartType {
KV_DATABASE,
LUCENE_INDEX
}
public CompositeDatabasePartType getPartType() {
return partType;
}
public String getPartName() {
return partName;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompositeDatabasePartLocation that = (CompositeDatabasePartLocation) o;
return partType == that.partType && Objects.equals(partName, that.partName);
}
@Override
public int hashCode() {
return Objects.hash(partType, partName);
}
@Override
public String toString() {
return new StringJoiner(", ", CompositeDatabasePartLocation.class.getSimpleName() + "[", "]")
.add("partType=" + partType)
.add("partName='" + partName + "'")
.toString();
}
}

View File

@ -0,0 +1,32 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.client.CompositeDatabasePartLocation.CompositeDatabasePartType;
import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSnapshot;
import java.util.Map;
import java.util.Objects;
public class CompositeSnapshot {
private final Map<CompositeDatabasePartLocation, LLSnapshot> snapshots;
public CompositeSnapshot(Map<CompositeDatabasePartLocation, LLSnapshot> snapshots) {
this.snapshots = snapshots;
}
public LLSnapshot getSnapshot(LLKeyValueDatabaseStructure database) {
return Objects.requireNonNull(snapshots.get(CompositeDatabasePartLocation.of(CompositeDatabasePartType.KV_DATABASE,
database.getDatabaseName()
)), () -> "No snapshot for database with name \"" + database.getDatabaseName() + "\"");
}
public LLSnapshot getSnapshot(LLLuceneIndex luceneIndex) {
return Objects.requireNonNull(snapshots.get(CompositeDatabasePartLocation.of(CompositeDatabasePartType.LUCENE_INDEX,
luceneIndex.getLuceneIndexName()
)), () -> "No snapshot for lucene index with name \"" + luceneIndex.getLuceneIndexName() + "\"");
}
public Map<CompositeDatabasePartLocation, LLSnapshot> getAllSnapshots() {
return snapshots;
}
}

View File

@ -0,0 +1,12 @@
package it.cavallium.dbengine.client;
public class SnapshotException extends RuntimeException {
public SnapshotException(Exception ex) {
super(ex);
}
public SnapshotException(String message) {
super(message);
}
}

View File

@ -0,0 +1,9 @@
package it.cavallium.dbengine.database.collections;
public interface DatabaseEntry<U> extends DatabaseStage<U> {
@Override
default DatabaseEntry<U> entry() {
return this;
}
}

View File

@ -0,0 +1,6 @@
package it.cavallium.dbengine.database.collections;
public interface DatabaseEntryable<T> {
DatabaseEntry<T> entry();
}

View File

@ -0,0 +1,165 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
// todo: implement optimized methods
public abstract class DatabaseMapDictionaryParent<U, US extends DatabaseStage<U>> implements DatabaseStageMap<byte[], U, US> {
public static final byte[] EMPTY_BYTES = new byte[0];
private final LLDictionary dictionary;
private final byte[] keyPrefix;
private final int keySuffixLength;
private final int keyExtLength;
private final LLRange range;
private static byte[] firstKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) {
return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0x00);
}
private static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) {
return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0xFF);
}
private static byte[] fillKeySuffixAndExt(byte[] prefixKey, int prefixLength, int suffixLength, int extLength, byte fillValue) {
assert prefixKey.length == prefixLength;
assert suffixLength > 0;
assert extLength > 0;
byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength);
Arrays.fill(result, prefixLength, result.length, fillValue);
return result;
}
private static byte[] firstKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) {
return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0x00);
}
private static byte[] lastKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) {
return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0xFF);
}
private static byte[] fillKeyExt(byte[] prefixKey,
byte[] suffixKey,
int prefixLength,
int suffixLength,
int extLength,
byte fillValue) {
assert prefixKey.length == prefixLength;
assert suffixKey.length == suffixLength;
assert suffixLength > 0;
assert extLength > 0;
byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength);
System.arraycopy(suffixKey, 0, result, prefixLength, suffixLength);
Arrays.fill(result, prefixLength + suffixLength, result.length, fillValue);
return result;
}
@SuppressWarnings("unused")
public DatabaseMapDictionaryParent(LLDictionary dictionary, int keyLength, int keyExtLength) {
this(dictionary, EMPTY_BYTES, keyLength, keyExtLength);
}
public DatabaseMapDictionaryParent(LLDictionary dictionary, byte[] prefixKey, int keySuffixLength, int keyExtLength) {
this.dictionary = dictionary;
this.keyPrefix = prefixKey;
this.keySuffixLength = keySuffixLength;
this.keyExtLength = keyExtLength;
byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey);
}
@SuppressWarnings("unused")
private boolean suffixKeyConsistency(int keySuffixLength) {
return this.keySuffixLength == keySuffixLength;
}
@SuppressWarnings("unused")
private boolean extKeyConsistency(int keyExtLength) {
return this.keyExtLength == keyExtLength;
}
@SuppressWarnings("unused")
private boolean suffixAndExtKeyConsistency(int keySuffixAndExtLength) {
return this.keySuffixLength + this.keyExtLength == keySuffixAndExtLength;
}
/**
* Keep only suffix and ext
*/
private byte[] stripPrefix(byte[] key) {
return Arrays.copyOfRange(key, this.keyPrefix.length, key.length);
}
/**
* Remove ext from suffix
*/
private byte[] trimSuffix(byte[] keySuffix) {
if (keySuffix.length == keySuffixLength) return keySuffix;
return Arrays.copyOf(keySuffix, keySuffixLength);
}
/**
* Remove suffix from keySuffix, returning probably an empty byte array
*/
private byte[] stripSuffix(byte[] keySuffix) {
if (keySuffix.length == this.keySuffixLength) return EMPTY_BYTES;
return Arrays.copyOfRange(keySuffix, this.keySuffixLength, keySuffix.length);
}
private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {
if (snapshot == null) {
return null;
} else {
return snapshot.getSnapshot(dictionary);
}
}
private LLRange toExtRange(byte[] keySuffix) {
byte[] first = firstKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength);
byte[] end = lastKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength);
return LLRange.of(first, end);
}
@Override
public Mono<US> at(@Nullable CompositeSnapshot snapshot, byte[] keySuffix) {
return this.subStage(
this.dictionary
.getRange(resolveSnapshot(snapshot), toExtRange(stripPrefix(keySuffix)))
.map(key -> {
byte[] keyExt = this.stripSuffix(this.stripPrefix(key.getKey()));
return Map.entry(keyExt, key.getValue());
})
);
}
@Override
public Flux<Entry<byte[], US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
Flux<GroupedFlux<byte[], Entry<byte[], byte[]>>> groupedFlux = dictionary
.getRange(resolveSnapshot(snapshot), range)
.groupBy(entry -> this.trimSuffix(this.stripPrefix(entry.getKey())));
return groupedFlux
.flatMap(keyValueFlux -> this
.subStage(keyValueFlux.map(key -> {
byte[] keyExt = this.stripSuffix(this.stripPrefix(key.getKey()));
return Map.entry(keyExt, key.getValue());
}))
.map(us -> Map.entry(keyValueFlux.key(), us))
);
}
/**
*
* @param keyValueFlux a flux with keyExt and full values from the same keySuffix
*/
protected abstract Mono<US> subStage(Flux<Entry<byte[], byte[]>> keyValueFlux);
}

View File

@ -0,0 +1,159 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
// todo: implement optimized methods
public class DatabaseMapDictionaryRange implements DatabaseStageMap<byte[], byte[], DatabaseEntry<byte[]>> {
public static final byte[] NO_PREFIX = new byte[0];
private final LLDictionary dictionary;
private final byte[] keyPrefix;
private final int keySuffixLength;
private final LLRange range;
private static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength) {
assert prefixKey.length == prefixLength;
byte[] lastKey = Arrays.copyOf(prefixKey, prefixLength + suffixLength);
Arrays.fill(lastKey, prefixLength, lastKey.length, (byte) 0xFF);
return lastKey;
}
private static byte[] firstKey(byte[] prefixKey, int prefixLength, int suffixLength) {
assert prefixKey.length == prefixLength;
byte[] lastKey = Arrays.copyOf(prefixKey, prefixLength + suffixLength);
Arrays.fill(lastKey, prefixLength, lastKey.length, (byte) 0x00);
return lastKey;
}
@SuppressWarnings("unused")
public DatabaseMapDictionaryRange(LLDictionary dictionary, int keyLength) {
this(dictionary, NO_PREFIX, keyLength);
}
public DatabaseMapDictionaryRange(LLDictionary dictionary, byte[] prefixKey, int keySuffixLength) {
this.dictionary = dictionary;
this.keyPrefix = prefixKey;
this.keySuffixLength = keySuffixLength;
byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength);
byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength);
this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey);
}
private boolean suffixKeyConsistency(int keySuffixLength) {
return this.keySuffixLength == keySuffixLength;
}
private byte[] toKey(byte[] suffixKey) {
assert suffixKeyConsistency(suffixKey.length);
byte[] key = Arrays.copyOf(keyPrefix, keyPrefix.length + suffixKey.length);
System.arraycopy(suffixKey, 0, key, keyPrefix.length, suffixKey.length);
return key;
}
private byte[] stripPrefix(byte[] key) {
return Arrays.copyOfRange(key, this.keyPrefix.length, key.length);
}
private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {
if (snapshot == null) {
return null;
} else {
return snapshot.getSnapshot(dictionary);
}
}
@Override
public Mono<Map<byte[], byte[]>> get(@Nullable CompositeSnapshot snapshot) {
return dictionary
.getRange(resolveSnapshot(snapshot), range)
.map(this::stripPrefix)
.collectMap(Entry::getKey, Entry::getValue, HashMap::new);
}
@Override
public Mono<Map<byte[], byte[]>> setAndGetPrevious(Map<byte[], byte[]> value) {
return dictionary
.setRange(range, Flux.fromIterable(value.entrySet()), true)
.map(this::stripPrefix)
.collectMap(Entry::getKey, Entry::getValue, HashMap::new);
}
private Entry<byte[], byte[]> stripPrefix(Entry<byte[], byte[]> entry) {
byte[] keySuffix = stripPrefix(entry.getKey());
return Map.entry(keySuffix, entry.getValue());
}
@Override
public Mono<Map<byte[], byte[]>> clearAndGetPrevious() {
return dictionary
.setRange(range, Flux.empty(), true)
.map(this::stripPrefix)
.collectMap(Entry::getKey, Entry::getValue, HashMap::new);
}
@Override
public Mono<Long> size(@Nullable CompositeSnapshot snapshot, boolean fast) {
return dictionary.sizeRange(resolveSnapshot(snapshot), range, true);
}
@Override
public Mono<DatabaseEntry<byte[]>> at(@Nullable CompositeSnapshot snapshot, byte[] keySuffix) {
return Mono.just(new SingleDatabaseEntry(keySuffix));
}
@Override
public Flux<Entry<byte[], DatabaseEntry<byte[]>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return dictionary
.getRangeKeys(resolveSnapshot(snapshot), range)
.map(this::stripPrefix)
.map(keySuffix -> Map.entry(keySuffix, new SingleDatabaseEntry(keySuffix)));
}
private class SingleDatabaseEntry implements DatabaseEntry<byte[]> {
private final byte[] keySuffix;
public SingleDatabaseEntry(byte[] keySuffix) {
this.keySuffix = keySuffix;
}
@Override
public Mono<byte[]> get(@Nullable CompositeSnapshot snapshot) {
return dictionary.get(resolveSnapshot(snapshot), toKey(keySuffix));
}
@Override
public Mono<byte[]> setAndGetPrevious(byte[] value) {
return dictionary.put(toKey(keySuffix), value, LLDictionaryResultType.PREVIOUS_VALUE);
}
@Override
public Mono<byte[]> clearAndGetPrevious() {
return dictionary.remove(toKey(keySuffix), LLDictionaryResultType.PREVIOUS_VALUE);
}
@Override
public Mono<Long> size(@Nullable CompositeSnapshot snapshot, boolean fast) {
return dictionary
.isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(toKey(keySuffix)))
.map(empty -> empty ? 0L : 1L);
}
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return dictionary
.isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(toKey(keySuffix)));
}
}
}

View File

@ -0,0 +1,6 @@
package it.cavallium.dbengine.database.collections;
public interface DatabaseMappable<T, U, US extends DatabaseStage<U>> {
DatabaseStageMap<T, U, US> map();
}

View File

@ -0,0 +1,45 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.CompositeSnapshot;
import java.util.Objects;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
public interface DatabaseStage<T> extends DatabaseEntryable<T> {
Mono<T> get(@Nullable CompositeSnapshot snapshot);
default Mono<T> getOrDefault(@Nullable CompositeSnapshot snapshot, Mono<T> defaultValue) {
return get(snapshot).switchIfEmpty(defaultValue).single();
}
default Mono<Void> set(T value) {
return setAndGetStatus(value).then();
}
Mono<T> setAndGetPrevious(T value);
default Mono<Boolean> setAndGetStatus(T value) {
return setAndGetPrevious(value).map(oldValue -> !Objects.equals(oldValue, value)).defaultIfEmpty(false);
}
default Mono<Void> clear() {
return clearAndGetStatus().then();
}
Mono<T> clearAndGetPrevious();
default Mono<Boolean> clearAndGetStatus() {
return clearAndGetPrevious().map(Objects::nonNull).defaultIfEmpty(false);
}
default Mono<Void> close() {
return Mono.empty();
}
Mono<Long> size(@Nullable CompositeSnapshot snapshot, boolean fast);
default Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return size(snapshot, false).map(size -> size <= 0);
}
}

View File

@ -0,0 +1,144 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.CompositeSnapshot;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SuppressWarnings("unused")
public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends DatabaseEntry<Map<T, U>> {
Mono<US> at(@Nullable CompositeSnapshot snapshot, T key);
default Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T key) {
return this.at(snapshot, key).flatMap(v -> v.get(snapshot));
}
default Mono<U> getValueOrDefault(@Nullable CompositeSnapshot snapshot, T key, Mono<U> defaultValue) {
return getValue(snapshot, key).switchIfEmpty(defaultValue).single();
}
default Mono<Void> putValue(T key, U value) {
return putValueAndGetStatus(key, value).then();
}
default Mono<U> putValueAndGetPrevious(T key, U value) {
return at(null, key).flatMap(v -> v.setAndGetPrevious(value));
}
default Mono<Boolean> putValueAndGetStatus(T key, U value) {
return putValueAndGetPrevious(key, value).map(oldValue -> !Objects.equals(oldValue, value)).defaultIfEmpty(false);
}
default Mono<Void> remove(T key) {
return removeAndGetStatus(key).then();
}
default Mono<U> removeAndGetPrevious(T key) {
return at(null, key).flatMap(DatabaseStage::clearAndGetPrevious);
}
default Mono<Boolean> removeAndGetStatus(T key) {
return removeAndGetPrevious(key).map(o -> true).defaultIfEmpty(false);
}
default Flux<Entry<T, U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys) {
return keys.flatMap(key -> this.getValue(snapshot, key).map(value -> Map.entry(key, value)));
}
default Mono<Void> putMulti(Flux<Entry<T, U>> entries) {
return entries.flatMap(entry -> this.putValue(entry.getKey(), entry.getValue())).then();
}
Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot);
default Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot) {
return this
.getAllStages(null)
.flatMap(entry -> entry.getValue().get(null).map(value -> Map.entry(entry.getKey(), value)));
}
default Mono<Void> setAllValues(Flux<Entry<T, U>> entries) {
return setAllValuesAndGetPrevious(entries).then();
}
default Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
return this
.clear()
.thenMany(entries)
.flatMap(entry -> this.putValue(entry.getKey(), entry.getValue()).thenReturn(entry));
}
default Mono<Void> clear() {
return setAllValues(Flux.empty());
}
default Mono<Void> replaceAllValues(boolean canKeysChange, Function<Entry<T, U>, Mono<Entry<T, U>>> entriesReplacer) {
Flux<Entry<T, U>> replacedFlux = this
.getAllValues(null)
.flatMap(entriesReplacer);
if (canKeysChange) {
return this
.setAllValues(replacedFlux)
.then();
} else {
return replacedFlux
.flatMap(replacedEntry -> this
.at(null, replacedEntry.getKey())
.map(entry -> entry.set(replacedEntry.getValue()))
)
.then();
}
}
default Mono<Void> replaceAll(Function<Entry<T, US>, Mono<Void>> entriesReplacer) {
return this
.getAllStages(null)
.flatMap(entriesReplacer)
.then();
}
@Override
default Mono<Map<T, U>> setAndGetPrevious(Map<T, U> value) {
return this
.setAllValuesAndGetPrevious(Flux.fromIterable(value.entrySet()))
.collectMap(Entry::getKey, Entry::getValue, HashMap::new);
}
@Override
default Mono<Map<T, U>> clearAndGetPrevious() {
return this.setAndGetPrevious(Map.of());
}
@Override
default Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot) {
return getAllValues(snapshot)
.collectMap(Entry::getKey, Entry::getValue, HashMap::new);
}
@Override
default Mono<Long> size(@Nullable CompositeSnapshot snapshot, boolean fast) {
return getAllStages(snapshot).count();
}
/**
* Value getter doesn't lock data. Please make sure to lock before getting data.
*/
default ValueGetterBlocking<T, U> getDbValueGetter() {
return k -> getValue(null, k).block();
}
/**
* Value getter doesn't lock data. Please make sure to lock before getting data.
*/
default ValueGetter<T, U> getAsyncDbValueGetter() {
return k -> getValue(null, k);
}
}

View File

@ -0,0 +1,3 @@
package it.cavallium.dbengine.database.collections;
public interface DatabaseStageQueryable<T, U> {}

View File

@ -0,0 +1,27 @@
package it.cavallium.dbengine.database.collections;
import reactor.core.publisher.Mono;
public interface Joiner<KEY, DBVALUE, JOINEDVALUE> {
interface ValueGetter<KEY, VALUE> {
/**
* Can return Mono error IOException
*/
Mono<VALUE> get(KEY key);
}
/**
* Warning! You must only join with immutable data to ensure data correctness.
* Good examples: message id, send date, ...
* Bad examples: message content, views, edited, ...
*
* Can return Mono error IOException
*/
Mono<JOINEDVALUE> join(ValueGetter<KEY, DBVALUE> dbValueGetter, DBVALUE value);
static <KEY, DBVALUE> Joiner<KEY, DBVALUE, DBVALUE> direct() {
return (dbValueGetter, value) -> Mono.just(value);
};
}

View File

@ -0,0 +1,21 @@
package it.cavallium.dbengine.database.collections;
import java.io.IOException;
public interface JoinerBlocking<KEY, DBVALUE, JOINEDVALUE> {
interface ValueGetterBlocking<KEY, VALUE> {
VALUE get(KEY key) throws IOException;
}
/**
* Warning! You must only join with immutable data to ensure data correctness.
* Good examples: message id, send date, ...
* Bad examples: message content, views, edited, ...
*/
JOINEDVALUE join(ValueGetterBlocking<KEY, DBVALUE> dbValueGetter, DBVALUE value) throws IOException;
static <KEY, DBVALUE> JoinerBlocking<KEY, DBVALUE, DBVALUE> direct() {
return (dbValueGetter, value) -> value;
};
}

View File

@ -269,6 +269,13 @@ public class LLLocalDictionary implements LLDictionary {
}
}
private Flux<Entry<byte[],byte[]>> getRangeSingle(LLSnapshot snapshot, byte[] key) {
return this
.get(snapshot, key)
.map(value -> Map.entry(key, value))
.flux();
}
private Flux<Entry<byte[],byte[]>> getRangeMulti(LLSnapshot snapshot, LLRange range) {
return Mono
.fromCallable(() -> {
@ -324,13 +331,70 @@ public class LLLocalDictionary implements LLDictionary {
);
}
private Flux<Entry<byte[],byte[]>> getRangeSingle(LLSnapshot snapshot, byte[] key) {
@Override
public Flux<byte[]> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) {
if (range.isSingle()) {
return getRangeKeysSingle(snapshot, range.getMin());
} else {
return getRangeKeysMulti(snapshot, range);
}
}
private Flux<byte[]> getRangeKeysSingle(LLSnapshot snapshot, byte[] key) {
return this
.get(snapshot, key)
.map(value -> Map.entry(key, value))
.containsKey(snapshot, key)
.filter(contains -> contains)
.map(contains -> key)
.flux();
}
private Flux<byte[]> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) {
return Mono
.fromCallable(() -> {
var iter = db.newIterator(cfh, resolveSnapshot(snapshot));
if (range.hasMin()) {
iter.seek(range.getMin());
} else {
iter.seekToFirst();
}
return iter;
})
.subscribeOn(Schedulers.boundedElastic())
.flatMapMany(rocksIterator -> Flux
.<byte[]>fromIterable(() -> {
VariableWrapper<byte[]> nextKey = new VariableWrapper<>(null);
return new Iterator<>() {
@Override
public boolean hasNext() {
assert nextKey.var == null;
if (!rocksIterator.isValid()) {
nextKey.var = null;
return false;
}
var key = rocksIterator.key();
var value = rocksIterator.value();
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
nextKey.var = null;
return false;
}
nextKey.var = key;
return true;
}
@Override
public byte[] next() {
var key = nextKey.var;
assert key != null;
nextKey.var = null;
return key;
}
};
})
.doFinally(signalType -> rocksIterator.close())
.subscribeOn(Schedulers.boundedElastic())
);
}
@Override
public Flux<Entry<byte[], byte[]>> setRange(LLRange range,
Flux<Entry<byte[], byte[]>> entries,