CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java

360 lines
11 KiB
Java
Raw Normal View History

2021-04-12 17:09:55 +02:00
package it.cavallium.dbengine.database.collections;
2021-09-17 16:56:28 +02:00
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
2021-09-23 20:57:28 +02:00
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
2021-09-24 04:01:28 +02:00
import io.net5.buffer.api.Resource;
2021-09-17 16:56:28 +02:00
import io.net5.buffer.api.Send;
2021-06-26 02:35:33 +02:00
import it.cavallium.dbengine.client.BadBlock;
2021-04-12 17:09:55 +02:00
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLUtils;
2021-10-17 17:15:57 +02:00
import io.net5.buffer.api.internal.ResourceSupport;
2021-05-02 19:18:15 +02:00
import it.cavallium.dbengine.database.UpdateMode;
2021-04-12 17:09:55 +02:00
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
2021-12-18 18:16:56 +01:00
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
2021-05-08 03:09:00 +02:00
import it.unimi.dsi.fastutil.objects.ObjectArraySet;
2021-06-06 02:23:51 +02:00
import java.util.Collections;
2021-04-12 17:09:55 +02:00
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
2021-05-08 03:09:00 +02:00
import java.util.Objects;
import java.util.Set;
2021-04-12 17:09:55 +02:00
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
2021-04-12 17:09:55 +02:00
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SuppressWarnings("unused")
2021-12-18 18:16:56 +01:00
public class DatabaseMapDictionaryHashed<T, U, TH> extends
ResourceSupport<DatabaseStage<Object2ObjectSortedMap<T, U>>, DatabaseMapDictionaryHashed<T, U, TH>> implements
DatabaseStageMap<T, U, DatabaseStageEntry<U>> {
2021-04-12 17:09:55 +02:00
private static final Logger logger = LogManager.getLogger(DatabaseMapDictionaryHashed.class);
2021-10-01 19:17:33 +02:00
private static final Drop<DatabaseMapDictionaryHashed<?, ?, ?>> DROP = new Drop<>() {
@Override
public void drop(DatabaseMapDictionaryHashed<?, ?, ?> obj) {
try {
if (obj.subDictionary != null) {
obj.subDictionary.close();
}
} catch (Throwable ex) {
logger.error("Failed to close subDictionary", ex);
}
}
@Override
public Drop<DatabaseMapDictionaryHashed<?, ?, ?>> fork() {
return this;
}
@Override
public void attach(DatabaseMapDictionaryHashed<?, ?, ?> obj) {
}
};
2021-08-29 23:18:03 +02:00
private final BufferAllocator alloc;
2021-04-12 17:09:55 +02:00
private final Function<T, TH> keySuffixHashFunction;
2021-09-23 20:57:28 +02:00
private DatabaseMapDictionary<TH, ObjectArraySet<Entry<T, U>>> subDictionary;
2021-10-01 19:17:33 +02:00
@SuppressWarnings({"unchecked", "rawtypes"})
2021-04-12 17:09:55 +02:00
protected DatabaseMapDictionaryHashed(LLDictionary dictionary,
2021-11-08 16:33:41 +01:00
@Nullable Buffer prefixKey,
2021-09-01 00:01:56 +02:00
Serializer<T> keySuffixSerializer,
Serializer<U> valueSerializer,
2021-04-12 17:09:55 +02:00
Function<T, TH> keySuffixHashFunction,
2021-09-23 20:57:28 +02:00
SerializerFixedBinaryLength<TH> keySuffixHashSerializer,
2021-10-01 19:17:33 +02:00
Runnable onClose) {
super((Drop<DatabaseMapDictionaryHashed<T, U, TH>>) (Drop) DROP);
2021-08-29 23:18:03 +02:00
if (dictionary.getUpdateMode().block() != UpdateMode.ALLOW) {
throw new IllegalArgumentException("Hashed maps only works when UpdateMode is ALLOW");
2021-05-02 19:18:15 +02:00
}
2021-08-29 23:18:03 +02:00
this.alloc = dictionary.getAllocator();
ValueWithHashSerializer<T, U> valueWithHashSerializer
2021-10-19 00:22:05 +02:00
= new ValueWithHashSerializer<>(keySuffixSerializer, valueSerializer);
2021-08-29 23:18:03 +02:00
ValuesSetSerializer<Entry<T, U>> valuesSetSerializer
2021-10-19 00:22:05 +02:00
= new ValuesSetSerializer<>(valueWithHashSerializer);
2021-09-23 20:57:28 +02:00
this.subDictionary = DatabaseMapDictionary.tail(dictionary, prefixKey, keySuffixHashSerializer,
2021-10-01 19:17:33 +02:00
valuesSetSerializer, onClose);
2021-08-29 23:18:03 +02:00
this.keySuffixHashFunction = keySuffixHashFunction;
2021-04-12 17:09:55 +02:00
}
2021-10-01 19:17:33 +02:00
@SuppressWarnings({"unchecked", "rawtypes"})
2021-09-23 20:57:28 +02:00
private DatabaseMapDictionaryHashed(BufferAllocator alloc,
Function<T, TH> keySuffixHashFunction,
2021-12-18 18:16:56 +01:00
Send<DatabaseStage<Object2ObjectSortedMap<TH, ObjectArraySet<Entry<T, U>>>>> subDictionary,
2021-09-23 20:57:28 +02:00
Drop<DatabaseMapDictionaryHashed<T, U, TH>> drop) {
2021-10-01 19:17:33 +02:00
super((Drop<DatabaseMapDictionaryHashed<T, U, TH>>) (Drop) DROP);
2021-09-23 20:57:28 +02:00
this.alloc = alloc;
this.keySuffixHashFunction = keySuffixHashFunction;
this.subDictionary = (DatabaseMapDictionary<TH, ObjectArraySet<Entry<T, U>>>) subDictionary.receive();
}
2021-04-12 17:09:55 +02:00
public static <T, U, UH> DatabaseMapDictionaryHashed<T, U, UH> simple(LLDictionary dictionary,
2021-09-01 00:01:56 +02:00
Serializer<T> keySerializer,
Serializer<U> valueSerializer,
2021-04-12 17:09:55 +02:00
Function<T, UH> keyHashFunction,
2021-09-23 20:57:28 +02:00
SerializerFixedBinaryLength<UH> keyHashSerializer,
2021-10-01 19:17:33 +02:00
Runnable onClose) {
2021-05-03 21:41:51 +02:00
return new DatabaseMapDictionaryHashed<>(
dictionary,
2021-11-08 16:33:41 +01:00
null,
2021-04-12 17:09:55 +02:00
keySerializer,
valueSerializer,
keyHashFunction,
2021-09-23 20:57:28 +02:00
keyHashSerializer,
2021-10-01 19:17:33 +02:00
onClose
2021-04-12 17:09:55 +02:00
);
}
public static <T, U, UH> DatabaseMapDictionaryHashed<T, U, UH> tail(LLDictionary dictionary,
2021-11-08 16:33:41 +01:00
@Nullable Buffer prefixKey,
2021-09-01 00:01:56 +02:00
Serializer<T> keySuffixSerializer,
Serializer<U> valueSerializer,
2021-04-12 17:09:55 +02:00
Function<T, UH> keySuffixHashFunction,
2021-09-23 20:57:28 +02:00
SerializerFixedBinaryLength<UH> keySuffixHashSerializer,
2021-10-01 19:17:33 +02:00
Runnable onClose) {
2021-04-12 17:09:55 +02:00
return new DatabaseMapDictionaryHashed<>(dictionary,
prefixKey,
keySuffixSerializer,
valueSerializer,
keySuffixHashFunction,
2021-09-23 20:57:28 +02:00
keySuffixHashSerializer,
2021-10-01 19:17:33 +02:00
onClose
2021-04-12 17:09:55 +02:00
);
}
2021-12-18 18:16:56 +01:00
private Object2ObjectSortedMap<TH, ObjectArraySet<Entry<T, U>>> serializeMap(Object2ObjectSortedMap<T, U> map) {
var newMap = new Object2ObjectLinkedOpenHashMap<TH, ObjectArraySet<Entry<T, U>>>(map.size());
2021-05-08 03:09:00 +02:00
map.forEach((key, value) -> newMap.compute(keySuffixHashFunction.apply(key), (hash, prev) -> {
if (prev == null) {
2021-07-13 22:58:08 +02:00
prev = new ObjectArraySet<>();
2021-05-08 03:09:00 +02:00
}
prev.add(Map.entry(key, value));
return prev;
}));
2021-04-12 17:09:55 +02:00
return newMap;
}
2021-12-18 18:16:56 +01:00
private Object2ObjectSortedMap<T, U> deserializeMap(Object2ObjectSortedMap<TH, ObjectArraySet<Entry<T, U>>> map) {
var newMap = new Object2ObjectLinkedOpenHashMap<T, U>(map.size());
2021-05-08 03:09:00 +02:00
map.forEach((hash, set) -> set.forEach(entry -> newMap.put(entry.getKey(), entry.getValue())));
2021-04-12 17:09:55 +02:00
return newMap;
}
@Override
2021-12-18 18:16:56 +01:00
public Mono<Object2ObjectSortedMap<T, U>> get(@Nullable CompositeSnapshot snapshot) {
2021-04-12 17:09:55 +02:00
return subDictionary.get(snapshot).map(this::deserializeMap);
}
@Override
2021-12-18 18:16:56 +01:00
public Mono<Object2ObjectSortedMap<T, U>> getOrDefault(@Nullable CompositeSnapshot snapshot,
Mono<Object2ObjectSortedMap<T, U>> defaultValue) {
2021-04-12 17:09:55 +02:00
return this.get(snapshot).switchIfEmpty(defaultValue);
}
@Override
2021-12-18 18:16:56 +01:00
public Mono<Void> set(Object2ObjectSortedMap<T, U> map) {
2021-04-12 17:09:55 +02:00
return Mono.fromSupplier(() -> this.serializeMap(map)).flatMap(subDictionary::set);
}
@Override
2021-12-18 18:16:56 +01:00
public Mono<Boolean> setAndGetChanged(Object2ObjectSortedMap<T, U> map) {
return Mono.fromSupplier(() -> this.serializeMap(map)).flatMap(subDictionary::setAndGetChanged).single();
2021-04-12 17:09:55 +02:00
}
@Override
public Mono<Boolean> clearAndGetStatus() {
return subDictionary.clearAndGetStatus();
}
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return subDictionary.isEmpty(snapshot);
}
@Override
2021-12-18 18:16:56 +01:00
public DatabaseStageEntry<Object2ObjectSortedMap<T, U>> entry() {
2021-04-12 17:09:55 +02:00
return this;
}
2021-06-26 02:35:33 +02:00
@Override
public Flux<BadBlock> badBlocks() {
return this.subDictionary.badBlocks();
}
2021-04-12 17:09:55 +02:00
@Override
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T key) {
2021-05-08 03:09:00 +02:00
return this
.atPrivate(snapshot, key, keySuffixHashFunction.apply(key))
2021-09-24 04:01:28 +02:00
.map(cast -> (DatabaseStageEntry<U>) cast)
.doOnDiscard(Resource.class, Resource::close);
2021-04-12 17:09:55 +02:00
}
2021-05-08 03:09:00 +02:00
private Mono<DatabaseSingleBucket<T, U, TH>> atPrivate(@Nullable CompositeSnapshot snapshot, T key, TH hash) {
2021-04-12 17:09:55 +02:00
return subDictionary
2021-05-08 03:09:00 +02:00
.at(snapshot, hash)
2021-10-01 19:17:33 +02:00
.map(entry -> new DatabaseSingleBucket<T, U, TH>(entry, key, null))
2021-09-24 04:01:28 +02:00
.doOnDiscard(Resource.class, Resource::close);
2021-04-12 17:09:55 +02:00
}
2021-05-02 19:18:15 +02:00
@Override
public Mono<UpdateMode> getUpdateMode() {
return subDictionary.getUpdateMode();
}
2021-04-12 17:09:55 +02:00
@Override
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return subDictionary
2021-05-08 03:09:00 +02:00
.getAllValues(snapshot)
2021-06-06 02:23:51 +02:00
.map(Entry::getValue)
.map(Collections::unmodifiableSet)
2021-05-08 03:09:00 +02:00
.flatMap(bucket -> Flux
2021-06-06 02:23:51 +02:00
.fromIterable(bucket)
2021-05-08 03:09:00 +02:00
.map(Entry::getKey)
.flatMap(key -> this.at(snapshot, key).map(stage -> Map.entry(key, stage)))
2021-04-12 17:09:55 +02:00
);
}
@Override
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot) {
2021-06-06 02:23:51 +02:00
return subDictionary
.getAllValues(snapshot)
.map(Entry::getValue)
.map(Collections::unmodifiableSet)
2021-07-31 18:00:53 +02:00
.concatMapIterable(list -> list);
2021-04-12 17:09:55 +02:00
}
@Override
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
2021-05-08 03:09:00 +02:00
return entries
2021-09-23 20:57:28 +02:00
.flatMap(entry -> LLUtils.usingResource(this.at(null, entry.getKey()),
stage -> stage
.setAndGetPrevious(entry.getValue())
2021-09-23 20:57:28 +02:00
.map(prev -> Map.entry(entry.getKey(), prev)), true)
);
2021-04-12 17:09:55 +02:00
}
@Override
public Mono<Void> clear() {
return subDictionary.clear();
}
@Override
2021-12-18 18:16:56 +01:00
public Mono<Object2ObjectSortedMap<T, U>> setAndGetPrevious(Object2ObjectSortedMap<T, U> value) {
2021-04-12 17:09:55 +02:00
return Mono
.fromSupplier(() -> this.serializeMap(value))
.flatMap(subDictionary::setAndGetPrevious)
.map(this::deserializeMap);
}
@Override
2021-12-18 18:16:56 +01:00
public Mono<Object2ObjectSortedMap<T, U>> clearAndGetPrevious() {
2021-04-12 17:09:55 +02:00
return subDictionary
.clearAndGetPrevious()
.map(this::deserializeMap);
}
@Override
2021-12-18 18:16:56 +01:00
public Mono<Object2ObjectSortedMap<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
2021-04-12 17:09:55 +02:00
return subDictionary
.get(snapshot, existsAlmostCertainly)
.map(this::deserializeMap);
}
@Override
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return subDictionary.leavesCount(snapshot, fast);
}
@Override
public ValueGetterBlocking<T, U> getDbValueGetter(@Nullable CompositeSnapshot snapshot) {
2021-07-13 22:58:08 +02:00
ValueGetterBlocking<TH, ObjectArraySet<Entry<T, U>>> getter = subDictionary.getDbValueGetter(snapshot);
2021-05-08 03:09:00 +02:00
return key -> extractValue(getter.get(keySuffixHashFunction.apply(key)), key);
2021-04-12 17:09:55 +02:00
}
@Override
public ValueGetter<T, U> getAsyncDbValueGetter(@Nullable CompositeSnapshot snapshot) {
2021-07-13 22:58:08 +02:00
ValueGetter<TH, ObjectArraySet<Entry<T, U>>> getter = subDictionary.getAsyncDbValueGetter(snapshot);
2021-05-08 03:09:00 +02:00
return key -> getter
.get(keySuffixHashFunction.apply(key))
.flatMap(set -> this.extractValueTransformation(set, key));
}
2021-07-13 22:58:08 +02:00
private Mono<U> extractValueTransformation(ObjectArraySet<Entry<T, U>> entries, T key) {
2021-05-08 03:09:00 +02:00
return Mono.fromCallable(() -> extractValue(entries, key));
}
@Nullable
2021-07-13 22:58:08 +02:00
private U extractValue(ObjectArraySet<Entry<T, U>> entries, T key) {
2021-05-08 03:09:00 +02:00
if (entries == null) return null;
for (Entry<T, U> entry : entries) {
if (Objects.equals(entry.getKey(), key)) {
return entry.getValue();
}
}
return null;
}
@NotNull
2021-07-13 22:58:08 +02:00
private ObjectArraySet<Entry<T, U>> insertValueOrCreate(@Nullable ObjectArraySet<Entry<T, U>> entries, T key, U value) {
2021-05-08 03:09:00 +02:00
if (entries != null) {
2021-07-13 22:58:08 +02:00
var clonedEntries = entries.clone();
clonedEntries.add(Map.entry(key, value));
return clonedEntries;
2021-05-08 03:09:00 +02:00
} else {
2021-07-13 22:58:08 +02:00
var oas = new ObjectArraySet<Entry<T, U>>(1);
2021-05-17 04:10:41 +02:00
oas.add(Map.entry(key, value));
return oas;
2021-05-08 03:09:00 +02:00
}
}
@Nullable
2021-07-13 22:58:08 +02:00
private Set<Entry<T, U>> removeValueOrDelete(@Nullable ObjectArraySet<Entry<T, U>> entries, T key) {
2021-05-08 03:09:00 +02:00
if (entries != null) {
2021-07-13 22:58:08 +02:00
var clonedEntries = entries.clone();
var it = clonedEntries.iterator();
2021-05-08 03:09:00 +02:00
while (it.hasNext()) {
var entry = it.next();
if (Objects.equals(entry.getKey(), key)) {
it.remove();
break;
}
}
2021-07-13 22:58:08 +02:00
if (clonedEntries.size() == 0) {
2021-05-08 03:09:00 +02:00
return null;
} else {
2021-07-13 22:58:08 +02:00
return clonedEntries;
2021-05-08 03:09:00 +02:00
}
} else {
return null;
}
2021-04-12 17:09:55 +02:00
}
2021-09-23 20:57:28 +02:00
@Override
protected RuntimeException createResourceClosedException() {
throw new IllegalStateException("Closed");
}
@Override
protected Owned<DatabaseMapDictionaryHashed<T, U, TH>> prepareSend() {
var subDictionary = this.subDictionary.send();
return drop -> new DatabaseMapDictionaryHashed<>(alloc, keySuffixHashFunction, subDictionary, drop);
}
@Override
protected void makeInaccessible() {
this.subDictionary = null;
}
2021-04-12 17:09:55 +02:00
}