Update LLKeyValueDatabase.java, DatabaseEntry.java, and 15 more files...
This commit is contained in:
parent
12aa63d615
commit
94234d518d
@ -2,8 +2,8 @@ package it.cavallium.dbengine.database;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.primitives.Longs;
|
||||
import it.cavallium.dbengine.database.collections.LLInt;
|
||||
import it.cavallium.dbengine.database.collections.LLLong;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseInt;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseLong;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@ -23,20 +23,20 @@ public interface LLKeyValueDatabase extends Closeable, LLSnapshottable, LLKeyVal
|
||||
return getDictionary(Column.hashMap(name).getName().getBytes(StandardCharsets.US_ASCII));
|
||||
}
|
||||
|
||||
default LLInt getInteger(String singletonListName, String name, int defaultValue)
|
||||
default DatabaseInt getInteger(String singletonListName, String name, int defaultValue)
|
||||
throws IOException {
|
||||
LLSingleton singleton = getSingleton(
|
||||
Column.special(singletonListName).getName().getBytes(StandardCharsets.US_ASCII),
|
||||
name.getBytes(StandardCharsets.US_ASCII), Ints.toByteArray(defaultValue));
|
||||
return new LLInt(singleton);
|
||||
return new DatabaseInt(singleton);
|
||||
}
|
||||
|
||||
default LLLong getLong(String singletonListName, String name, long defaultValue)
|
||||
default DatabaseLong getLong(String singletonListName, String name, long defaultValue)
|
||||
throws IOException {
|
||||
LLSingleton singleton = getSingleton(
|
||||
Column.special(singletonListName).getName().getBytes(StandardCharsets.US_ASCII),
|
||||
name.getBytes(StandardCharsets.US_ASCII), Longs.toByteArray(defaultValue));
|
||||
return new LLLong(singleton);
|
||||
return new DatabaseLong(singleton);
|
||||
}
|
||||
|
||||
long getProperty(String propertyName) throws IOException;
|
||||
|
@ -1,9 +0,0 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
public interface DatabaseEntry<U> extends DatabaseStage<U> {
|
||||
|
||||
@Override
|
||||
default DatabaseEntry<U> entry() {
|
||||
return this;
|
||||
}
|
||||
}
|
@ -2,5 +2,5 @@ package it.cavallium.dbengine.database.collections;
|
||||
|
||||
public interface DatabaseEntryable<T> {
|
||||
|
||||
DatabaseEntry<T> entry();
|
||||
DatabaseStageEntry<T> entry();
|
||||
}
|
||||
|
@ -7,11 +7,11 @@ import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class LLInt implements LLKeyValueDatabaseStructure {
|
||||
public class DatabaseInt implements LLKeyValueDatabaseStructure {
|
||||
|
||||
private final LLSingleton singleton;
|
||||
|
||||
public LLInt(LLSingleton singleton) {
|
||||
public DatabaseInt(LLSingleton singleton) {
|
||||
this.singleton = singleton;
|
||||
}
|
||||
|
@ -8,11 +8,11 @@ import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class LLLong implements LLKeyValueDatabaseStructure {
|
||||
public class DatabaseLong implements LLKeyValueDatabaseStructure {
|
||||
|
||||
private final LLSingleton singleton;
|
||||
|
||||
public LLLong(LLSingleton singleton) {
|
||||
public DatabaseLong(LLSingleton singleton) {
|
||||
this.singleton = singleton;
|
||||
}
|
||||
|
@ -13,10 +13,11 @@ import reactor.core.publisher.GroupedFlux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
// todo: implement optimized methods
|
||||
public abstract class DatabaseMapDictionaryParent<U, US extends DatabaseStage<U>> implements DatabaseStageMap<byte[], U, US> {
|
||||
public class DatabaseMapDictionaryParent<U, US extends DatabaseStage<U>> implements DatabaseStageMap<byte[], U, US> {
|
||||
|
||||
public static final byte[] EMPTY_BYTES = new byte[0];
|
||||
private final LLDictionary dictionary;
|
||||
private final SubStageGetter<U, US> subStageGetter;
|
||||
private final byte[] keyPrefix;
|
||||
private final int keySuffixLength;
|
||||
private final int keyExtLength;
|
||||
@ -64,12 +65,13 @@ public abstract class DatabaseMapDictionaryParent<U, US extends DatabaseStage<U>
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public DatabaseMapDictionaryParent(LLDictionary dictionary, int keyLength, int keyExtLength) {
|
||||
this(dictionary, EMPTY_BYTES, keyLength, keyExtLength);
|
||||
public DatabaseMapDictionaryParent(LLDictionary dictionary, SubStageGetter<U, US> subStageGetter, int keyLength, int keyExtLength) {
|
||||
this(dictionary, subStageGetter, EMPTY_BYTES, keyLength, keyExtLength);
|
||||
}
|
||||
|
||||
public DatabaseMapDictionaryParent(LLDictionary dictionary, byte[] prefixKey, int keySuffixLength, int keyExtLength) {
|
||||
public DatabaseMapDictionaryParent(LLDictionary dictionary, SubStageGetter<U, US> subStageGetter, byte[] prefixKey, int keySuffixLength, int keyExtLength) {
|
||||
this.dictionary = dictionary;
|
||||
this.subStageGetter = subStageGetter;
|
||||
this.keyPrefix = prefixKey;
|
||||
this.keySuffixLength = keySuffixLength;
|
||||
this.keyExtLength = keyExtLength;
|
||||
@ -104,15 +106,34 @@ public abstract class DatabaseMapDictionaryParent<U, US extends DatabaseStage<U>
|
||||
* Remove ext from suffix
|
||||
*/
|
||||
private byte[] trimSuffix(byte[] keySuffix) {
|
||||
if (keySuffix.length == keySuffixLength) return keySuffix;
|
||||
if (keySuffix.length == keySuffixLength)
|
||||
return keySuffix;
|
||||
return Arrays.copyOf(keySuffix, keySuffixLength);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove ext from full key
|
||||
*/
|
||||
private byte[] removeExtFromFullKey(byte[] key) {
|
||||
return Arrays.copyOf(key, keyPrefix.length + keySuffixLength);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add prefix to suffix
|
||||
*/
|
||||
private byte[] toKeyWithoutExt(byte[] suffixKey) {
|
||||
assert suffixKey.length == keySuffixLength;
|
||||
byte[] result = Arrays.copyOf(keyPrefix, keyPrefix.length + keySuffixLength);
|
||||
System.arraycopy(suffixKey, 0, result, keyPrefix.length, keySuffixLength);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove suffix from keySuffix, returning probably an empty byte array
|
||||
*/
|
||||
private byte[] stripSuffix(byte[] keySuffix) {
|
||||
if (keySuffix.length == this.keySuffixLength) return EMPTY_BYTES;
|
||||
if (keySuffix.length == this.keySuffixLength)
|
||||
return EMPTY_BYTES;
|
||||
return Arrays.copyOfRange(keySuffix, this.keySuffixLength, keySuffix.length);
|
||||
}
|
||||
|
||||
@ -132,34 +153,22 @@ public abstract class DatabaseMapDictionaryParent<U, US extends DatabaseStage<U>
|
||||
|
||||
@Override
|
||||
public Mono<US> at(@Nullable CompositeSnapshot snapshot, byte[] keySuffix) {
|
||||
return this.subStage(
|
||||
this.dictionary
|
||||
.getRange(resolveSnapshot(snapshot), toExtRange(stripPrefix(keySuffix)))
|
||||
.map(key -> {
|
||||
byte[] keyExt = this.stripSuffix(this.stripPrefix(key.getKey()));
|
||||
return Map.entry(keyExt, key.getValue());
|
||||
})
|
||||
Flux<byte[]> rangeKeys = this
|
||||
.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffix)
|
||||
);
|
||||
return this.subStageGetter
|
||||
.subStage(dictionary, snapshot, toKeyWithoutExt(keySuffix), rangeKeys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<byte[], US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||
Flux<GroupedFlux<byte[], Entry<byte[], byte[]>>> groupedFlux = dictionary
|
||||
.getRange(resolveSnapshot(snapshot), range)
|
||||
.groupBy(entry -> this.trimSuffix(this.stripPrefix(entry.getKey())));
|
||||
Flux<GroupedFlux<byte[], byte[]>> groupedFlux = dictionary
|
||||
.getRangeKeys(resolveSnapshot(snapshot), range)
|
||||
.groupBy(this::removeExtFromFullKey);
|
||||
return groupedFlux
|
||||
.flatMap(keyValueFlux -> this
|
||||
.subStage(keyValueFlux.map(key -> {
|
||||
byte[] keyExt = this.stripSuffix(this.stripPrefix(key.getKey()));
|
||||
return Map.entry(keyExt, key.getValue());
|
||||
}))
|
||||
.map(us -> Map.entry(keyValueFlux.key(), us))
|
||||
.flatMap(rangeKeys -> this.subStageGetter
|
||||
.subStage(dictionary, snapshot, rangeKeys.key(), rangeKeys)
|
||||
.map(us -> Map.entry(rangeKeys.key(), us))
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param keyValueFlux a flux with keyExt and full values from the same keySuffix
|
||||
*/
|
||||
protected abstract Mono<US> subStage(Flux<Entry<byte[], byte[]>> keyValueFlux);
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.LLDictionaryResultType;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import java.util.Arrays;
|
||||
@ -14,7 +13,7 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
// todo: implement optimized methods
|
||||
public class DatabaseMapDictionaryRange implements DatabaseStageMap<byte[], byte[], DatabaseEntry<byte[]>> {
|
||||
public class DatabaseMapDictionaryRange implements DatabaseStageMap<byte[], byte[], DatabaseStageEntry<byte[]>> {
|
||||
|
||||
public static final byte[] NO_PREFIX = new byte[0];
|
||||
private final LLDictionary dictionary;
|
||||
@ -108,52 +107,15 @@ public class DatabaseMapDictionaryRange implements DatabaseStageMap<byte[], byte
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<DatabaseEntry<byte[]>> at(@Nullable CompositeSnapshot snapshot, byte[] keySuffix) {
|
||||
return Mono.just(new SingleDatabaseEntry(keySuffix));
|
||||
public Mono<DatabaseStageEntry<byte[]>> at(@Nullable CompositeSnapshot snapshot, byte[] keySuffix) {
|
||||
return Mono.just(new DatabaseSingle(dictionary, toKey(keySuffix)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<byte[], DatabaseEntry<byte[]>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||
public Flux<Entry<byte[], DatabaseStageEntry<byte[]>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary
|
||||
.getRangeKeys(resolveSnapshot(snapshot), range)
|
||||
.map(this::stripPrefix)
|
||||
.map(keySuffix -> Map.entry(keySuffix, new SingleDatabaseEntry(keySuffix)));
|
||||
}
|
||||
|
||||
private class SingleDatabaseEntry implements DatabaseEntry<byte[]> {
|
||||
|
||||
private final byte[] keySuffix;
|
||||
|
||||
public SingleDatabaseEntry(byte[] keySuffix) {
|
||||
this.keySuffix = keySuffix;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> get(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary.get(resolveSnapshot(snapshot), toKey(keySuffix));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> setAndGetPrevious(byte[] value) {
|
||||
return dictionary.put(toKey(keySuffix), value, LLDictionaryResultType.PREVIOUS_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> clearAndGetPrevious() {
|
||||
return dictionary.remove(toKey(keySuffix), LLDictionaryResultType.PREVIOUS_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Long> size(@Nullable CompositeSnapshot snapshot, boolean fast) {
|
||||
return dictionary
|
||||
.isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(toKey(keySuffix)))
|
||||
.map(empty -> empty ? 0L : 1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary
|
||||
.isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(toKey(keySuffix)));
|
||||
}
|
||||
.map(keySuffix -> Map.entry(keySuffix, new DatabaseSingle(dictionary, toKey(keySuffix))));
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,56 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.LLDictionaryResultType;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class DatabaseSingle implements DatabaseStageEntry<byte[]> {
|
||||
|
||||
private final LLDictionary dictionary;
|
||||
private final byte[] key;
|
||||
|
||||
public DatabaseSingle(LLDictionary dictionary, byte[] key) {
|
||||
this.dictionary = dictionary;
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {
|
||||
if (snapshot == null) {
|
||||
return null;
|
||||
} else {
|
||||
return snapshot.getSnapshot(dictionary);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> get(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary.get(resolveSnapshot(snapshot), key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> setAndGetPrevious(byte[] value) {
|
||||
return dictionary.put(key, value, LLDictionaryResultType.PREVIOUS_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> clearAndGetPrevious() {
|
||||
return dictionary.remove(key, LLDictionaryResultType.PREVIOUS_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Long> size(@Nullable CompositeSnapshot snapshot, boolean fast) {
|
||||
return dictionary
|
||||
.isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key))
|
||||
.map(empty -> empty ? 0L : 1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary
|
||||
.isRangeEmpty(resolveSnapshot(snapshot), LLRange.single(key));
|
||||
}
|
||||
}
|
@ -0,0 +1,9 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
public interface DatabaseStageEntry<U> extends DatabaseStage<U> {
|
||||
|
||||
@Override
|
||||
default DatabaseStageEntry<U> entry() {
|
||||
return this;
|
||||
}
|
||||
}
|
@ -13,7 +13,7 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends DatabaseEntry<Map<T, U>> {
|
||||
public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends DatabaseStageEntry<Map<T, U>> {
|
||||
|
||||
Mono<US> at(@Nullable CompositeSnapshot snapshot, T key);
|
||||
|
||||
|
@ -0,0 +1,54 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
|
||||
public class MapBuilder {
|
||||
|
||||
private final LLDictionary dictionary;
|
||||
|
||||
public MapBuilder(LLDictionary dictionary) {
|
||||
this.dictionary = dictionary;
|
||||
}
|
||||
|
||||
public static MapBuilder of(LLDictionary dictionary) {
|
||||
return new MapBuilder(dictionary);
|
||||
}
|
||||
|
||||
public MapBuilder2<byte[], byte[]> map() {
|
||||
return MapBuilder2.fromDictionary(dictionary);
|
||||
}
|
||||
|
||||
public static class MapBuilder2<K, V> {
|
||||
|
||||
private final LLDictionary dictionary;
|
||||
private final MapBuilder2<?, ?> parent;
|
||||
private final SerializationOptions<?, K, ?, V> serializationOptions;
|
||||
|
||||
public MapBuilder2(LLDictionary dictionary, SerializationOptions<byte[], K, byte[], V> serializationOptions) {
|
||||
this.dictionary = dictionary;
|
||||
this.parent = null;
|
||||
this.serializationOptions = serializationOptions;
|
||||
}
|
||||
|
||||
private <K1, V1> MapBuilder2(MapBuilder2<K1, V1> parent, SerializationOptions<K1, K, V1, V> serializationOptions) {
|
||||
this.dictionary = null;
|
||||
this.parent = parent;
|
||||
this.serializationOptions = serializationOptions;
|
||||
}
|
||||
|
||||
public static MapBuilder2<byte[], byte[]> fromDictionary(LLDictionary dictionary) {
|
||||
return new MapBuilder2<>(dictionary, SerializationOptions.noop());
|
||||
}
|
||||
|
||||
public <K2, V2> MapBuilder2<K2, V2> serialize(SerializationOptions<K, K2, V, V2> serializationOptions) {
|
||||
return new MapBuilder2<>(this, serializationOptions);
|
||||
}
|
||||
|
||||
public static class SerializationOptions<K1, K2, V1, V2> {
|
||||
|
||||
public static SerializationOptions<byte[], byte[], byte[], byte[]> noop() {
|
||||
return new SerializationOptions<>();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public interface SubStageGetter<U, US extends DatabaseStage<U>> {
|
||||
|
||||
Mono<US> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, byte[] prefixKey, Flux<byte[]> keyFlux);
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import java.util.Map;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class SubStageGetterMapDeep<U, US extends DatabaseStage<U>> implements
|
||||
SubStageGetter<Map<byte[], U>, DatabaseStageEntry<Map<byte[], U>>> {
|
||||
|
||||
private final SubStageGetter<U, US> subStageGetter;
|
||||
private final int keyLength;
|
||||
private final int keyExtLength;
|
||||
|
||||
public SubStageGetterMapDeep(SubStageGetter<U, US> subStageGetter, int keyLength, int keyExtLength) {
|
||||
this.subStageGetter = subStageGetter;
|
||||
this.keyLength = keyLength;
|
||||
this.keyExtLength = keyExtLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<DatabaseStageEntry<Map<byte[], U>>> subStage(LLDictionary dictionary,
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
byte[] prefixKey,
|
||||
Flux<byte[]> keyFlux) {
|
||||
return Mono.just(new DatabaseMapDictionaryParent<>(dictionary, subStageGetter, prefixKey, keyLength, keyExtLength));
|
||||
}
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import java.util.Map;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class SubStageGetterMapRange implements SubStageGetter<Map<byte[], byte[]>, DatabaseStageEntry<Map<byte[], byte[]>>> {
|
||||
|
||||
private final int keyLength;
|
||||
|
||||
public SubStageGetterMapRange(int keyLength) {
|
||||
this.keyLength = keyLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<DatabaseStageEntry<Map<byte[], byte[]>>> subStage(LLDictionary dictionary,
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
byte[] prefixKey,
|
||||
Flux<byte[]> keyFlux) {
|
||||
return Mono.just(new DatabaseMapDictionaryRange(dictionary, prefixKey, keyLength));
|
||||
}
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class SubStageGetterSingle implements SubStageGetter<byte[], DatabaseStageEntry<byte[]>> {
|
||||
|
||||
@Override
|
||||
public Mono<DatabaseStageEntry<byte[]>> subStage(LLDictionary dictionary,
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
byte[] keyPrefix,
|
||||
Flux<byte[]> keyFlux) {
|
||||
return keyFlux.single().map(key -> new DatabaseSingle(dictionary, key));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user