2021-04-12 17:09:55 +02:00
|
|
|
package it.cavallium.dbengine.database.collections;
|
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
import io.netty.buffer.api.Buffer;
|
|
|
|
import io.netty.buffer.api.BufferAllocator;
|
|
|
|
import io.netty.buffer.api.Send;
|
2021-06-26 02:35:33 +02:00
|
|
|
import it.cavallium.dbengine.client.BadBlock;
|
2021-04-12 17:09:55 +02:00
|
|
|
import it.cavallium.dbengine.client.CompositeSnapshot;
|
2021-05-08 03:09:00 +02:00
|
|
|
import it.cavallium.dbengine.database.Delta;
|
2021-04-12 17:09:55 +02:00
|
|
|
import it.cavallium.dbengine.database.LLDictionary;
|
2021-05-02 19:18:15 +02:00
|
|
|
import it.cavallium.dbengine.database.UpdateMode;
|
2021-07-17 11:52:08 +02:00
|
|
|
import it.cavallium.dbengine.database.collections.ValueGetter;
|
2021-04-12 17:09:55 +02:00
|
|
|
import it.cavallium.dbengine.database.serialization.Serializer;
|
|
|
|
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
2021-05-08 03:09:00 +02:00
|
|
|
import it.unimi.dsi.fastutil.objects.ObjectArraySet;
|
2021-07-13 22:58:08 +02:00
|
|
|
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
|
2021-05-19 22:51:55 +02:00
|
|
|
import it.unimi.dsi.fastutil.objects.ObjectSets;
|
2021-06-06 02:23:51 +02:00
|
|
|
import java.util.Collections;
|
2021-04-12 17:09:55 +02:00
|
|
|
import java.util.HashMap;
|
2021-05-08 03:09:00 +02:00
|
|
|
import java.util.HashSet;
|
2021-04-12 17:09:55 +02:00
|
|
|
import java.util.Map;
|
|
|
|
import java.util.Map.Entry;
|
2021-05-08 03:09:00 +02:00
|
|
|
import java.util.Objects;
|
|
|
|
import java.util.Set;
|
2021-07-17 11:52:08 +02:00
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
2021-04-12 17:09:55 +02:00
|
|
|
import java.util.function.Function;
|
|
|
|
import org.jetbrains.annotations.NotNull;
|
|
|
|
import org.jetbrains.annotations.Nullable;
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
import reactor.core.publisher.Mono;
|
2021-07-17 11:52:08 +02:00
|
|
|
import reactor.util.function.Tuple2;
|
|
|
|
import reactor.util.function.Tuple3;
|
|
|
|
import reactor.util.function.Tuples;
|
2021-04-12 17:09:55 +02:00
|
|
|
|
|
|
|
@SuppressWarnings("unused")
|
|
|
|
public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T, U, DatabaseStageEntry<U>> {
|
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
private final BufferAllocator alloc;
|
2021-07-13 22:58:08 +02:00
|
|
|
private final DatabaseMapDictionary<TH, ObjectArraySet<Entry<T, U>>> subDictionary;
|
2021-04-12 17:09:55 +02:00
|
|
|
private final Function<T, TH> keySuffixHashFunction;
|
|
|
|
|
|
|
|
protected DatabaseMapDictionaryHashed(LLDictionary dictionary,
|
2021-08-29 23:18:03 +02:00
|
|
|
Send<Buffer> prefixKey,
|
2021-09-01 00:01:56 +02:00
|
|
|
Serializer<T> keySuffixSerializer,
|
|
|
|
Serializer<U> valueSerializer,
|
2021-04-12 17:09:55 +02:00
|
|
|
Function<T, TH> keySuffixHashFunction,
|
2021-09-01 00:01:56 +02:00
|
|
|
SerializerFixedBinaryLength<TH> keySuffixHashSerializer) {
|
2021-08-29 23:18:03 +02:00
|
|
|
if (dictionary.getUpdateMode().block() != UpdateMode.ALLOW) {
|
|
|
|
throw new IllegalArgumentException("Hashed maps only works when UpdateMode is ALLOW");
|
2021-05-02 19:18:15 +02:00
|
|
|
}
|
2021-08-29 23:18:03 +02:00
|
|
|
this.alloc = dictionary.getAllocator();
|
|
|
|
ValueWithHashSerializer<T, U> valueWithHashSerializer
|
|
|
|
= new ValueWithHashSerializer<>(alloc, keySuffixSerializer, valueSerializer);
|
|
|
|
ValuesSetSerializer<Entry<T, U>> valuesSetSerializer
|
|
|
|
= new ValuesSetSerializer<>(alloc, valueWithHashSerializer);
|
|
|
|
this.subDictionary = DatabaseMapDictionary.tail(dictionary,
|
|
|
|
prefixKey,
|
|
|
|
keySuffixHashSerializer,
|
|
|
|
valuesSetSerializer
|
|
|
|
);
|
|
|
|
this.keySuffixHashFunction = keySuffixHashFunction;
|
2021-04-12 17:09:55 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
public static <T, U, UH> DatabaseMapDictionaryHashed<T, U, UH> simple(LLDictionary dictionary,
|
2021-09-01 00:01:56 +02:00
|
|
|
Serializer<T> keySerializer,
|
|
|
|
Serializer<U> valueSerializer,
|
2021-04-12 17:09:55 +02:00
|
|
|
Function<T, UH> keyHashFunction,
|
2021-09-01 00:01:56 +02:00
|
|
|
SerializerFixedBinaryLength<UH> keyHashSerializer) {
|
2021-05-03 21:41:51 +02:00
|
|
|
return new DatabaseMapDictionaryHashed<>(
|
|
|
|
dictionary,
|
2021-08-31 09:14:46 +02:00
|
|
|
dictionary.getAllocator().allocate(0).send(),
|
2021-04-12 17:09:55 +02:00
|
|
|
keySerializer,
|
|
|
|
valueSerializer,
|
|
|
|
keyHashFunction,
|
|
|
|
keyHashSerializer
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
public static <T, U, UH> DatabaseMapDictionaryHashed<T, U, UH> tail(LLDictionary dictionary,
|
2021-08-31 09:14:46 +02:00
|
|
|
Send<Buffer> prefixKey,
|
2021-09-01 00:01:56 +02:00
|
|
|
Serializer<T> keySuffixSerializer,
|
|
|
|
Serializer<U> valueSerializer,
|
2021-04-12 17:09:55 +02:00
|
|
|
Function<T, UH> keySuffixHashFunction,
|
2021-09-01 00:01:56 +02:00
|
|
|
SerializerFixedBinaryLength<UH> keySuffixHashSerializer) {
|
2021-04-12 17:09:55 +02:00
|
|
|
return new DatabaseMapDictionaryHashed<>(dictionary,
|
|
|
|
prefixKey,
|
|
|
|
keySuffixSerializer,
|
|
|
|
valueSerializer,
|
|
|
|
keySuffixHashFunction,
|
|
|
|
keySuffixHashSerializer
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2021-07-13 22:58:08 +02:00
|
|
|
private Map<TH, ObjectArraySet<Entry<T, U>>> serializeMap(Map<T, U> map) {
|
|
|
|
var newMap = new HashMap<TH, ObjectArraySet<Entry<T, U>>>(map.size());
|
2021-05-08 03:09:00 +02:00
|
|
|
map.forEach((key, value) -> newMap.compute(keySuffixHashFunction.apply(key), (hash, prev) -> {
|
|
|
|
if (prev == null) {
|
2021-07-13 22:58:08 +02:00
|
|
|
prev = new ObjectArraySet<>();
|
2021-05-08 03:09:00 +02:00
|
|
|
}
|
|
|
|
prev.add(Map.entry(key, value));
|
|
|
|
return prev;
|
|
|
|
}));
|
2021-04-12 17:09:55 +02:00
|
|
|
return newMap;
|
|
|
|
}
|
|
|
|
|
2021-07-13 22:58:08 +02:00
|
|
|
private Map<T, U> deserializeMap(Map<TH, ObjectArraySet<Entry<T, U>>> map) {
|
2021-04-12 17:09:55 +02:00
|
|
|
var newMap = new HashMap<T, U>(map.size());
|
2021-05-08 03:09:00 +02:00
|
|
|
map.forEach((hash, set) -> set.forEach(entry -> newMap.put(entry.getKey(), entry.getValue())));
|
2021-04-12 17:09:55 +02:00
|
|
|
return newMap;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot) {
|
|
|
|
return subDictionary.get(snapshot).map(this::deserializeMap);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Map<T, U>> getOrDefault(@Nullable CompositeSnapshot snapshot, Mono<Map<T, U>> defaultValue) {
|
|
|
|
return this.get(snapshot).switchIfEmpty(defaultValue);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Void> set(Map<T, U> map) {
|
|
|
|
return Mono.fromSupplier(() -> this.serializeMap(map)).flatMap(subDictionary::set);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-04-30 19:15:04 +02:00
|
|
|
public Mono<Boolean> setAndGetChanged(Map<T, U> map) {
|
|
|
|
return Mono.fromSupplier(() -> this.serializeMap(map)).flatMap(subDictionary::setAndGetChanged).single();
|
2021-04-12 17:09:55 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Boolean> clearAndGetStatus() {
|
|
|
|
return subDictionary.clearAndGetStatus();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Void> close() {
|
|
|
|
return subDictionary.close();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
|
|
|
|
return subDictionary.isEmpty(snapshot);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public DatabaseStageEntry<Map<T, U>> entry() {
|
|
|
|
return this;
|
|
|
|
}
|
|
|
|
|
2021-06-26 02:35:33 +02:00
|
|
|
@Override
|
|
|
|
public Flux<BadBlock> badBlocks() {
|
|
|
|
return this.subDictionary.badBlocks();
|
|
|
|
}
|
|
|
|
|
2021-04-30 19:15:04 +02:00
|
|
|
@Override
|
|
|
|
public void release() {
|
|
|
|
this.subDictionary.release();
|
|
|
|
}
|
|
|
|
|
2021-04-12 17:09:55 +02:00
|
|
|
@Override
|
|
|
|
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T key) {
|
2021-05-08 03:09:00 +02:00
|
|
|
return this
|
|
|
|
.atPrivate(snapshot, key, keySuffixHashFunction.apply(key))
|
|
|
|
.map(cast -> cast);
|
2021-04-12 17:09:55 +02:00
|
|
|
}
|
|
|
|
|
2021-05-08 03:09:00 +02:00
|
|
|
private Mono<DatabaseSingleBucket<T, U, TH>> atPrivate(@Nullable CompositeSnapshot snapshot, T key, TH hash) {
|
2021-04-12 17:09:55 +02:00
|
|
|
return subDictionary
|
2021-05-08 03:09:00 +02:00
|
|
|
.at(snapshot, hash)
|
|
|
|
.map(entry -> new DatabaseSingleBucket<>(entry, key));
|
2021-04-12 17:09:55 +02:00
|
|
|
}
|
|
|
|
|
2021-05-02 19:18:15 +02:00
|
|
|
@Override
|
|
|
|
public Mono<UpdateMode> getUpdateMode() {
|
|
|
|
return subDictionary.getUpdateMode();
|
|
|
|
}
|
|
|
|
|
2021-04-12 17:09:55 +02:00
|
|
|
@Override
|
|
|
|
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
|
|
|
return subDictionary
|
2021-05-08 03:09:00 +02:00
|
|
|
.getAllValues(snapshot)
|
2021-06-06 02:23:51 +02:00
|
|
|
.map(Entry::getValue)
|
|
|
|
.map(Collections::unmodifiableSet)
|
2021-05-08 03:09:00 +02:00
|
|
|
.flatMap(bucket -> Flux
|
2021-06-06 02:23:51 +02:00
|
|
|
.fromIterable(bucket)
|
2021-05-08 03:09:00 +02:00
|
|
|
.map(Entry::getKey)
|
2021-08-22 18:20:05 +02:00
|
|
|
.flatMap(key -> this.at(snapshot, key).map(stage -> Map.entry(key, stage)))
|
2021-04-12 17:09:55 +02:00
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot) {
|
2021-06-06 02:23:51 +02:00
|
|
|
return subDictionary
|
|
|
|
.getAllValues(snapshot)
|
|
|
|
.map(Entry::getValue)
|
|
|
|
.map(Collections::unmodifiableSet)
|
2021-07-31 18:00:53 +02:00
|
|
|
.concatMapIterable(list -> list);
|
2021-04-12 17:09:55 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
|
2021-05-08 03:09:00 +02:00
|
|
|
return entries
|
2021-08-22 18:20:05 +02:00
|
|
|
.flatMap(entry -> Flux.usingWhen(
|
|
|
|
this.at(null, entry.getKey()),
|
|
|
|
stage -> stage
|
|
|
|
.setAndGetPrevious(entry.getValue())
|
|
|
|
.map(prev -> Map.entry(entry.getKey(), prev)),
|
|
|
|
stage -> Mono.fromRunnable(stage::release)
|
|
|
|
));
|
2021-04-12 17:09:55 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Void> clear() {
|
|
|
|
return subDictionary.clear();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Map<T, U>> setAndGetPrevious(Map<T, U> value) {
|
|
|
|
return Mono
|
|
|
|
.fromSupplier(() -> this.serializeMap(value))
|
|
|
|
.flatMap(subDictionary::setAndGetPrevious)
|
|
|
|
.map(this::deserializeMap);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Map<T, U>> clearAndGetPrevious() {
|
|
|
|
return subDictionary
|
|
|
|
.clearAndGetPrevious()
|
|
|
|
.map(this::deserializeMap);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
|
|
|
|
return subDictionary
|
|
|
|
.get(snapshot, existsAlmostCertainly)
|
|
|
|
.map(this::deserializeMap);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
|
|
|
|
return subDictionary.leavesCount(snapshot, fast);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ValueGetterBlocking<T, U> getDbValueGetter(@Nullable CompositeSnapshot snapshot) {
|
2021-07-13 22:58:08 +02:00
|
|
|
ValueGetterBlocking<TH, ObjectArraySet<Entry<T, U>>> getter = subDictionary.getDbValueGetter(snapshot);
|
2021-05-08 03:09:00 +02:00
|
|
|
return key -> extractValue(getter.get(keySuffixHashFunction.apply(key)), key);
|
2021-04-12 17:09:55 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ValueGetter<T, U> getAsyncDbValueGetter(@Nullable CompositeSnapshot snapshot) {
|
2021-07-13 22:58:08 +02:00
|
|
|
ValueGetter<TH, ObjectArraySet<Entry<T, U>>> getter = subDictionary.getAsyncDbValueGetter(snapshot);
|
2021-05-08 03:09:00 +02:00
|
|
|
return key -> getter
|
|
|
|
.get(keySuffixHashFunction.apply(key))
|
|
|
|
.flatMap(set -> this.extractValueTransformation(set, key));
|
|
|
|
}
|
|
|
|
|
2021-07-13 22:58:08 +02:00
|
|
|
private Mono<U> extractValueTransformation(ObjectArraySet<Entry<T, U>> entries, T key) {
|
2021-05-08 03:09:00 +02:00
|
|
|
return Mono.fromCallable(() -> extractValue(entries, key));
|
|
|
|
}
|
|
|
|
|
|
|
|
@Nullable
|
2021-07-13 22:58:08 +02:00
|
|
|
private U extractValue(ObjectArraySet<Entry<T, U>> entries, T key) {
|
2021-05-08 03:09:00 +02:00
|
|
|
if (entries == null) return null;
|
|
|
|
for (Entry<T, U> entry : entries) {
|
|
|
|
if (Objects.equals(entry.getKey(), key)) {
|
|
|
|
return entry.getValue();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
|
|
|
|
@NotNull
|
2021-07-13 22:58:08 +02:00
|
|
|
private ObjectArraySet<Entry<T, U>> insertValueOrCreate(@Nullable ObjectArraySet<Entry<T, U>> entries, T key, U value) {
|
2021-05-08 03:09:00 +02:00
|
|
|
if (entries != null) {
|
2021-07-13 22:58:08 +02:00
|
|
|
var clonedEntries = entries.clone();
|
|
|
|
clonedEntries.add(Map.entry(key, value));
|
|
|
|
return clonedEntries;
|
2021-05-08 03:09:00 +02:00
|
|
|
} else {
|
2021-07-13 22:58:08 +02:00
|
|
|
var oas = new ObjectArraySet<Entry<T, U>>(1);
|
2021-05-17 04:10:41 +02:00
|
|
|
oas.add(Map.entry(key, value));
|
|
|
|
return oas;
|
2021-05-08 03:09:00 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Nullable
|
2021-07-13 22:58:08 +02:00
|
|
|
private Set<Entry<T, U>> removeValueOrDelete(@Nullable ObjectArraySet<Entry<T, U>> entries, T key) {
|
2021-05-08 03:09:00 +02:00
|
|
|
if (entries != null) {
|
2021-07-13 22:58:08 +02:00
|
|
|
var clonedEntries = entries.clone();
|
|
|
|
var it = clonedEntries.iterator();
|
2021-05-08 03:09:00 +02:00
|
|
|
while (it.hasNext()) {
|
|
|
|
var entry = it.next();
|
|
|
|
if (Objects.equals(entry.getKey(), key)) {
|
|
|
|
it.remove();
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2021-07-13 22:58:08 +02:00
|
|
|
if (clonedEntries.size() == 0) {
|
2021-05-08 03:09:00 +02:00
|
|
|
return null;
|
|
|
|
} else {
|
2021-07-13 22:58:08 +02:00
|
|
|
return clonedEntries;
|
2021-05-08 03:09:00 +02:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
return null;
|
|
|
|
}
|
2021-04-12 17:09:55 +02:00
|
|
|
}
|
|
|
|
}
|