2021-01-30 20:16:14 +01:00
|
|
|
package it.cavallium.dbengine.database.collections;
|
|
|
|
|
|
|
|
import it.cavallium.dbengine.client.CompositeSnapshot;
|
|
|
|
import it.cavallium.dbengine.database.LLDictionary;
|
|
|
|
import it.cavallium.dbengine.database.LLRange;
|
|
|
|
import it.cavallium.dbengine.database.LLSnapshot;
|
|
|
|
import java.util.Arrays;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.Map.Entry;
|
|
|
|
import org.jetbrains.annotations.Nullable;
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
import reactor.core.publisher.GroupedFlux;
|
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
|
|
// todo: implement optimized methods
|
2021-01-31 12:02:02 +01:00
|
|
|
public class DatabaseMapDictionary<T, U, US extends DatabaseStage<U>> implements DatabaseStageMap<T, U, US> {
|
2021-01-30 20:16:14 +01:00
|
|
|
|
|
|
|
public static final byte[] EMPTY_BYTES = new byte[0];
|
|
|
|
private final LLDictionary dictionary;
|
2021-01-31 00:36:21 +01:00
|
|
|
private final SubStageGetter<U, US> subStageGetter;
|
2021-01-31 12:02:02 +01:00
|
|
|
private final FixedLengthSerializer<T> suffixKeySerializer;
|
2021-01-30 20:16:14 +01:00
|
|
|
private final byte[] keyPrefix;
|
|
|
|
private final int keySuffixLength;
|
|
|
|
private final int keyExtLength;
|
|
|
|
private final LLRange range;
|
|
|
|
|
|
|
|
private static byte[] firstKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) {
|
|
|
|
return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0x00);
|
|
|
|
}
|
|
|
|
|
|
|
|
private static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) {
|
|
|
|
return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0xFF);
|
|
|
|
}
|
|
|
|
|
|
|
|
private static byte[] fillKeySuffixAndExt(byte[] prefixKey, int prefixLength, int suffixLength, int extLength, byte fillValue) {
|
|
|
|
assert prefixKey.length == prefixLength;
|
|
|
|
assert suffixLength > 0;
|
|
|
|
assert extLength > 0;
|
|
|
|
byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength);
|
|
|
|
Arrays.fill(result, prefixLength, result.length, fillValue);
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
private static byte[] firstKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) {
|
|
|
|
return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0x00);
|
|
|
|
}
|
|
|
|
|
|
|
|
private static byte[] lastKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) {
|
|
|
|
return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0xFF);
|
|
|
|
}
|
|
|
|
|
|
|
|
private static byte[] fillKeyExt(byte[] prefixKey,
|
|
|
|
byte[] suffixKey,
|
|
|
|
int prefixLength,
|
|
|
|
int suffixLength,
|
|
|
|
int extLength,
|
|
|
|
byte fillValue) {
|
|
|
|
assert prefixKey.length == prefixLength;
|
|
|
|
assert suffixKey.length == suffixLength;
|
|
|
|
assert suffixLength > 0;
|
|
|
|
assert extLength > 0;
|
|
|
|
byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength);
|
|
|
|
System.arraycopy(suffixKey, 0, result, prefixLength, suffixLength);
|
|
|
|
Arrays.fill(result, prefixLength + suffixLength, result.length, fillValue);
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
@SuppressWarnings("unused")
|
2021-01-31 12:02:02 +01:00
|
|
|
public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter<U, US> subStageGetter, FixedLengthSerializer<T> keySerializer, int keyLength, int keyExtLength) {
|
|
|
|
this(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyLength, keyExtLength);
|
2021-01-30 20:16:14 +01:00
|
|
|
}
|
|
|
|
|
2021-01-31 12:02:02 +01:00
|
|
|
public DatabaseMapDictionary(LLDictionary dictionary, SubStageGetter<U, US> subStageGetter, FixedLengthSerializer<T> suffixKeySerializer, byte[] prefixKey, int keySuffixLength, int keyExtLength) {
|
2021-01-30 20:16:14 +01:00
|
|
|
this.dictionary = dictionary;
|
2021-01-31 00:36:21 +01:00
|
|
|
this.subStageGetter = subStageGetter;
|
2021-01-31 12:02:02 +01:00
|
|
|
this.suffixKeySerializer = suffixKeySerializer;
|
2021-01-30 20:16:14 +01:00
|
|
|
this.keyPrefix = prefixKey;
|
|
|
|
this.keySuffixLength = keySuffixLength;
|
|
|
|
this.keyExtLength = keyExtLength;
|
|
|
|
byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
|
|
|
|
byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
|
|
|
|
this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey);
|
|
|
|
}
|
|
|
|
|
|
|
|
@SuppressWarnings("unused")
|
|
|
|
private boolean suffixKeyConsistency(int keySuffixLength) {
|
|
|
|
return this.keySuffixLength == keySuffixLength;
|
|
|
|
}
|
|
|
|
|
|
|
|
@SuppressWarnings("unused")
|
|
|
|
private boolean extKeyConsistency(int keyExtLength) {
|
|
|
|
return this.keyExtLength == keyExtLength;
|
|
|
|
}
|
|
|
|
|
|
|
|
@SuppressWarnings("unused")
|
|
|
|
private boolean suffixAndExtKeyConsistency(int keySuffixAndExtLength) {
|
|
|
|
return this.keySuffixLength + this.keyExtLength == keySuffixAndExtLength;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Keep only suffix and ext
|
|
|
|
*/
|
|
|
|
private byte[] stripPrefix(byte[] key) {
|
|
|
|
return Arrays.copyOfRange(key, this.keyPrefix.length, key.length);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Remove ext from suffix
|
|
|
|
*/
|
|
|
|
private byte[] trimSuffix(byte[] keySuffix) {
|
2021-01-31 00:36:21 +01:00
|
|
|
if (keySuffix.length == keySuffixLength)
|
|
|
|
return keySuffix;
|
2021-01-30 20:16:14 +01:00
|
|
|
return Arrays.copyOf(keySuffix, keySuffixLength);
|
|
|
|
}
|
|
|
|
|
2021-01-31 00:36:21 +01:00
|
|
|
/**
|
|
|
|
* Remove ext from full key
|
|
|
|
*/
|
|
|
|
private byte[] removeExtFromFullKey(byte[] key) {
|
|
|
|
return Arrays.copyOf(key, keyPrefix.length + keySuffixLength);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Add prefix to suffix
|
|
|
|
*/
|
|
|
|
private byte[] toKeyWithoutExt(byte[] suffixKey) {
|
|
|
|
assert suffixKey.length == keySuffixLength;
|
|
|
|
byte[] result = Arrays.copyOf(keyPrefix, keyPrefix.length + keySuffixLength);
|
|
|
|
System.arraycopy(suffixKey, 0, result, keyPrefix.length, keySuffixLength);
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2021-01-30 20:16:14 +01:00
|
|
|
/**
|
|
|
|
* Remove suffix from keySuffix, returning probably an empty byte array
|
|
|
|
*/
|
|
|
|
private byte[] stripSuffix(byte[] keySuffix) {
|
2021-01-31 00:36:21 +01:00
|
|
|
if (keySuffix.length == this.keySuffixLength)
|
|
|
|
return EMPTY_BYTES;
|
2021-01-30 20:16:14 +01:00
|
|
|
return Arrays.copyOfRange(keySuffix, this.keySuffixLength, keySuffix.length);
|
|
|
|
}
|
|
|
|
|
|
|
|
private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {
|
|
|
|
if (snapshot == null) {
|
|
|
|
return null;
|
|
|
|
} else {
|
|
|
|
return snapshot.getSnapshot(dictionary);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private LLRange toExtRange(byte[] keySuffix) {
|
|
|
|
byte[] first = firstKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength);
|
|
|
|
byte[] end = lastKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength);
|
|
|
|
return LLRange.of(first, end);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-31 12:02:02 +01:00
|
|
|
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
|
|
|
|
byte[] keySuffixData = serializeSuffix(keySuffix);
|
2021-01-31 00:36:21 +01:00
|
|
|
Flux<byte[]> rangeKeys = this
|
2021-01-31 12:02:02 +01:00
|
|
|
.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData)
|
2021-01-30 20:16:14 +01:00
|
|
|
);
|
2021-01-31 00:36:21 +01:00
|
|
|
return this.subStageGetter
|
2021-01-31 12:02:02 +01:00
|
|
|
.subStage(dictionary, snapshot, toKeyWithoutExt(keySuffixData), rangeKeys);
|
2021-01-30 20:16:14 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-31 12:02:02 +01:00
|
|
|
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
2021-01-31 00:36:21 +01:00
|
|
|
Flux<GroupedFlux<byte[], byte[]>> groupedFlux = dictionary
|
|
|
|
.getRangeKeys(resolveSnapshot(snapshot), range)
|
|
|
|
.groupBy(this::removeExtFromFullKey);
|
2021-01-30 20:16:14 +01:00
|
|
|
return groupedFlux
|
2021-01-31 00:36:21 +01:00
|
|
|
.flatMap(rangeKeys -> this.subStageGetter
|
|
|
|
.subStage(dictionary, snapshot, rangeKeys.key(), rangeKeys)
|
2021-01-31 12:02:02 +01:00
|
|
|
.map(us -> Map.entry(this.deserializeSuffix(this.stripPrefix(rangeKeys.key())), us))
|
2021-01-30 20:16:14 +01:00
|
|
|
);
|
|
|
|
}
|
2021-01-31 12:02:02 +01:00
|
|
|
|
|
|
|
private T deserializeSuffix(byte[] suffix) {
|
|
|
|
return (T) new Object();
|
|
|
|
}
|
|
|
|
|
|
|
|
private byte[] serializeSuffix(T keySuffix) {
|
|
|
|
return new byte[0];
|
|
|
|
}
|
2021-01-30 20:16:14 +01:00
|
|
|
}
|