diff --git a/src/main/java/it/cavallium/dbengine/database/Delta.java b/src/main/java/it/cavallium/dbengine/database/Delta.java new file mode 100644 index 0000000..a06f19f --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/Delta.java @@ -0,0 +1,10 @@ +package it.cavallium.dbengine.database; + +import lombok.Value; +import org.jetbrains.annotations.Nullable; + +@Value(staticConstructor = "of") +public class Delta { + @Nullable T previous; + @Nullable T current; +} diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 70fdde5..6096b30 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -9,6 +9,7 @@ import org.jetbrains.annotations.Nullable; import org.warp.commonutils.concurrency.atomicity.NotAtomic; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; @SuppressWarnings("unused") @NotAtomic @@ -26,10 +27,28 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { Mono getUpdateMode(); - Mono update(ByteBuf key, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, boolean existsAlmostCertainly); + default Mono update(ByteBuf key, + Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, + UpdateReturnMode updateReturnMode, + boolean existsAlmostCertainly) { + return this + .updateAndGetDelta(key, updater, existsAlmostCertainly) + .transform(prev -> LLUtils.resolveDelta(prev, updateReturnMode)); + } - default Mono update(ByteBuf key, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater) { - return update(key, updater, false); + default Mono update(ByteBuf key, + Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, + UpdateReturnMode returnMode) { + return update(key, updater, returnMode, false); + } + + Mono> updateAndGetDelta(ByteBuf key, + Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, + boolean existsAlmostCertainly); + + default Mono> updateAndGetDelta(ByteBuf key, + Function<@Nullable ByteBuf, @Nullable ByteBuf> updater) { + return updateAndGetDelta(key, updater, false); } Mono clear(); diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 8916eaf..2b4773a 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -2,7 +2,6 @@ package it.cavallium.dbengine.database; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import io.netty.buffer.AbstractByteBufAllocator; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; @@ -14,6 +13,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.Objects; +import java.util.function.Function; import java.util.function.ToIntFunction; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -32,6 +33,7 @@ import org.apache.lucene.search.SortedNumericSortField; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.RocksDB; +import reactor.core.publisher.Mono; @SuppressWarnings("unused") public class LLUtils { @@ -412,4 +414,56 @@ public class LLUtils { } } } + + public static Mono resolveDelta(Mono> prev, UpdateReturnMode updateReturnMode) { + return prev.handle((delta, sink) -> { + switch (updateReturnMode) { + case GET_NEW_VALUE: + var current = delta.getCurrent(); + if (current != null) { + sink.next(current); + } else { + sink.complete(); + } + break; + case GET_OLD_VALUE: + var previous = delta.getPrevious(); + if (previous != null) { + sink.next(previous); + } else { + sink.complete(); + } + break; + case NOTHING: + sink.complete(); + break; + default: + sink.error(new IllegalStateException()); + } + }); + } + + public static Mono> mapDelta(Mono> mono, Function<@NotNull T, @Nullable U> mapper) { + return mono.map(delta -> { + T prev = delta.getPrevious(); + T curr = delta.getCurrent(); + U newPrev; + U newCurr; + if (prev != null) { + newPrev = mapper.apply(prev); + } else { + newPrev = null; + } + if (curr != null) { + newCurr = mapper.apply(curr); + } else { + newCurr = null; + } + return Delta.of(newPrev, newCurr); + }); + } + + public static boolean isDeltaChanged(Delta delta) { + return !Objects.equals(delta.getPrevious(), delta.getCurrent()); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/UpdateReturnMode.java b/src/main/java/it/cavallium/dbengine/database/UpdateReturnMode.java new file mode 100644 index 0000000..bdeae0f --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/UpdateReturnMode.java @@ -0,0 +1,5 @@ +package it.cavallium.dbengine.database; + +public enum UpdateReturnMode { + GET_OLD_VALUE, GET_NEW_VALUE, NOTHING +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 6f1e049..2dbc7ec 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -3,10 +3,12 @@ package it.cavallium.dbengine.database.collections; import io.netty.buffer.ByteBuf; import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.HashMap; @@ -142,26 +144,56 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep updateValue(T keySuffix, + public Mono updateValue(T keySuffix, + UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly, Function<@Nullable U, @Nullable U> updater) { return Mono .using( () -> toKey(serializeSuffix(keySuffix)), - keyBuf -> dictionary.update(keyBuf.retain(), oldSerialized -> { - try { - var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain())); - if (result == null) { - return null; - } else { - return this.serialize(result); - } - } finally { - if (oldSerialized != null) { - oldSerialized.release(); - } - } - }, existsAlmostCertainly), + keyBuf -> dictionary + .update(keyBuf.retain(), oldSerialized -> { + try { + var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain())); + if (result == null) { + return null; + } else { + return this.serialize(result); + } + } finally { + if (oldSerialized != null) { + oldSerialized.release(); + } + } + }, updateReturnMode, existsAlmostCertainly) + .map(this::deserialize), + ReferenceCounted::release + ); + } + + @Override + public Mono> updateValueAndGetDelta(T keySuffix, + boolean existsAlmostCertainly, + Function<@Nullable U, @Nullable U> updater) { + return Mono + .using( + () -> toKey(serializeSuffix(keySuffix)), + keyBuf -> dictionary + .updateAndGetDelta(keyBuf.retain(), oldSerialized -> { + try { + var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain())); + if (result == null) { + return null; + } else { + return this.serialize(result); + } + } finally { + if (oldSerialized != null) { + oldSerialized.release(); + } + } + }, existsAlmostCertainly) + .transform(mono -> LLUtils.mapDelta(mono, this::deserialize)), ReferenceCounted::release ); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 3e16fec..f24bd84 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -3,16 +3,20 @@ package it.cavallium.dbengine.database.collections; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLDictionary; -import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; +import it.unimi.dsi.fastutil.objects.ObjectArraySet; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -23,9 +27,8 @@ import reactor.core.publisher.Mono; public class DatabaseMapDictionaryHashed implements DatabaseStageMap> { private final ByteBufAllocator alloc; - private final DatabaseMapDictionary> subDictionary; + private final DatabaseMapDictionary>> subDictionary; private final Function keySuffixHashFunction; - private final Function> valueMapper; protected DatabaseMapDictionaryHashed(LLDictionary dictionary, ByteBuf prefixKey, @@ -34,15 +37,18 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap keySuffixHashFunction, SerializerFixedBinaryLength keySuffixHashSerializer) { try { - ValueWithHashSerializer valueWithHashSerializer = new ValueWithHashSerializer<>(keySuffixSerializer, - valueSerializer - ); + if (dictionary.getUpdateMode().block() != UpdateMode.ALLOW) { + throw new IllegalArgumentException("Hashed maps only works when UpdateMode is ALLOW"); + } this.alloc = dictionary.getAllocator(); - this.valueMapper = ValueMapper::new; + ValueWithHashSerializer valueWithHashSerializer + = new ValueWithHashSerializer<>(alloc, keySuffixSerializer, valueSerializer); + ValuesSetSerializer> valuesSetSerializer + = new ValuesSetSerializer<>(alloc, valueWithHashSerializer); this.subDictionary = DatabaseMapDictionary.tail(dictionary, prefixKey.retain(), keySuffixHashSerializer, - valueWithHashSerializer + valuesSetSerializer ); this.keySuffixHashFunction = keySuffixHashFunction; } finally { @@ -50,72 +56,6 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap implements Serializer, ByteBuf> { - - private final Serializer keySuffixSerializer; - private final Serializer valueSerializer; - - private ValueWithHashSerializer(Serializer keySuffixSerializer, Serializer valueSerializer) { - this.keySuffixSerializer = keySuffixSerializer; - this.valueSerializer = valueSerializer; - } - - @Override - public @NotNull Entry deserialize(@NotNull ByteBuf serialized) { - try { - int keySuffixLength = serialized.readInt(); - int initialReaderIndex = serialized.readerIndex(); - int initialWriterIndex = serialized.writerIndex(); - T keySuffix = keySuffixSerializer.deserialize(serialized.setIndex(initialReaderIndex, initialReaderIndex + keySuffixLength).retain()); - assert serialized.readerIndex() == initialReaderIndex + keySuffixLength; - U value = valueSerializer.deserialize(serialized.setIndex(initialReaderIndex + keySuffixLength, initialWriterIndex).retain()); - return Map.entry(keySuffix, value); - } finally { - serialized.release(); - } - } - - @Override - public @NotNull ByteBuf serialize(@NotNull Entry deserialized) { - ByteBuf keySuffix = keySuffixSerializer.serialize(deserialized.getKey()); - try { - ByteBuf value = valueSerializer.serialize(deserialized.getValue()); - try { - ByteBuf keySuffixLen = alloc.buffer(Integer.BYTES); - try { - keySuffixLen.writeInt(keySuffix.readableBytes()); - return LLUtils.compositeBuffer(alloc, keySuffixLen.retain(), keySuffix.retain(), value.retain()); - } finally { - keySuffixLen.release(); - } - } finally { - value.release(); - } - } finally { - keySuffix.release(); - } - } - } - - private static class ValueMapper implements Serializer> { - - private final T key; - - public ValueMapper(T key) { - this.key = key; - } - - @Override - public @NotNull U deserialize(@NotNull Entry serialized) { - return serialized.getValue(); - } - - @Override - public @NotNull Entry serialize(@NotNull U deserialized) { - return Map.entry(key, deserialized); - } - } - public static DatabaseMapDictionaryHashed simple(LLDictionary dictionary, Serializer keySerializer, Serializer valueSerializer, @@ -146,15 +86,21 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap> serializeMap(Map map) { - var newMap = new HashMap>(map.size()); - map.forEach((key, value) -> newMap.put(keySuffixHashFunction.apply(key), Map.entry(key, value))); + private Map>> serializeMap(Map map) { + var newMap = new HashMap>>(map.size()); + map.forEach((key, value) -> newMap.compute(keySuffixHashFunction.apply(key), (hash, prev) -> { + if (prev == null) { + prev = new HashSet<>(); + } + prev.add(Map.entry(key, value)); + return prev; + })); return newMap; } - private Map deserializeMap(Map> map) { + private Map deserializeMap(Map>> map) { var newMap = new HashMap(map.size()); - map.forEach((key, value) -> newMap.put(value.getKey(), value.getValue())); + map.forEach((hash, set) -> set.forEach(entry -> newMap.put(entry.getKey(), entry.getValue()))); return newMap; } @@ -178,18 +124,6 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap this.serializeMap(map)).flatMap(subDictionary::setAndGetChanged).single(); } - @Override - public Mono update(Function<@Nullable Map, @Nullable Map> updater) { - return subDictionary.update(old -> { - var result = updater.apply(old == null ? null : this.deserializeMap(old)); - if (result == null) { - return null; - } else { - return this.serializeMap(result); - } - }); - } - @Override public Mono clearAndGetStatus() { return subDictionary.clearAndGetStatus(); @@ -217,33 +151,15 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap> at(@Nullable CompositeSnapshot snapshot, T key) { + return this + .atPrivate(snapshot, key, keySuffixHashFunction.apply(key)) + .map(cast -> cast); + } + + private Mono> atPrivate(@Nullable CompositeSnapshot snapshot, T key, TH hash) { return subDictionary - .at(snapshot, keySuffixHashFunction.apply(key)) - .map(entry -> new DatabaseSingleMapped<>(entry, valueMapper.apply(key))); - } - - @Override - public Mono getValue(@Nullable CompositeSnapshot snapshot, T key, boolean existsAlmostCertainly) { - return subDictionary - .getValue(snapshot, keySuffixHashFunction.apply(key), existsAlmostCertainly) - .map(Entry::getValue); - } - - @Override - public Mono getValue(@Nullable CompositeSnapshot snapshot, T key) { - return subDictionary.getValue(snapshot, keySuffixHashFunction.apply(key)).map(Entry::getValue); - } - - @Override - public Mono getValueOrDefault(@Nullable CompositeSnapshot snapshot, T key, Mono defaultValue) { - return subDictionary - .getValueOrDefault(snapshot, keySuffixHashFunction.apply(key), defaultValue.map(v -> Map.entry(key, v))) - .map(Entry::getValue); - } - - @Override - public Mono putValue(T key, U value) { - return subDictionary.putValue(keySuffixHashFunction.apply(key), Map.entry(key, value)); + .at(snapshot, hash) + .map(entry -> new DatabaseSingleBucket<>(entry, key)); } @Override @@ -251,109 +167,35 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap updateValue(T key, boolean existsAlmostCertainly, Function<@Nullable U, @Nullable U> updater) { - return subDictionary.updateValue(keySuffixHashFunction.apply(key), existsAlmostCertainly, old -> { - var result = updater.apply(old == null ? null : old.getValue()); - if (result == null) { - return null; - } else { - return Map.entry(key, result); - } - }); - } - - @Override - public Mono updateValue(T key, Function<@Nullable U, @Nullable U> updater) { - return subDictionary.updateValue(keySuffixHashFunction.apply(key), old -> { - var result = updater.apply(old == null ? null : old.getValue()); - if (result == null) { - return null; - } else { - return Map.entry(key, result); - } - }); - } - - @Override - public Mono putValueAndGetPrevious(T key, U value) { - return subDictionary - .putValueAndGetPrevious(keySuffixHashFunction.apply(key), Map.entry(key, value)) - .map(Entry::getValue); - } - - @Override - public Mono putValueAndGetChanged(T key, U value) { - return subDictionary - .putValueAndGetChanged(keySuffixHashFunction.apply(key), Map.entry(key, value)); - } - - @Override - public Mono remove(T key) { - return subDictionary.remove(keySuffixHashFunction.apply(key)); - } - - @Override - public Mono removeAndGetPrevious(T key) { - return subDictionary.removeAndGetPrevious(keySuffixHashFunction.apply(key)).map(Entry::getValue); - } - - @Override - public Mono removeAndGetStatus(T key) { - return subDictionary.removeAndGetStatus(keySuffixHashFunction.apply(key)); - } - - @Override - public Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { - return subDictionary - .getMulti(snapshot, keys.map(keySuffixHashFunction), existsAlmostCertainly) - .map(Entry::getValue); - } - - @Override - public Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys) { - return subDictionary - .getMulti(snapshot, keys.map(keySuffixHashFunction)) - .map(Entry::getValue); - } - - @Override - public Mono putMulti(Flux> entries) { - return subDictionary - .putMulti(entries.map(entry -> Map.entry(keySuffixHashFunction.apply(entry.getKey()), - Map.entry(entry.getKey(), entry.getValue()) - ))); - } - @Override public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot) { return subDictionary - .getAllStages(snapshot) - .flatMap(hashEntry -> hashEntry - .getValue() - .get(snapshot) - .map(entry -> Map.entry(entry.getKey(), - new DatabaseSingleMapped<>(hashEntry.getValue(), valueMapper.apply(entry.getKey())) - )) + .getAllValues(snapshot) + .flatMap(bucket -> Flux + .fromIterable(bucket.getValue()) + .map(Entry::getKey) + .flatMap(key -> this + .at(snapshot, key) + .flatMap(stage -> Mono.just(Map.entry(key, stage)).doFinally(s -> stage.release())) + ) ); } @Override public Flux> getAllValues(@Nullable CompositeSnapshot snapshot) { - return subDictionary.getAllValues(snapshot).map(Entry::getValue); - } - - @Override - public Mono setAllValues(Flux> entries) { - return subDictionary - .setAllValues(entries.map(entry -> Map.entry(keySuffixHashFunction.apply(entry.getKey()), entry))); + return subDictionary.getAllValues(snapshot).flatMap(s -> Flux.fromIterable(s.getValue())); } @Override public Flux> setAllValuesAndGetPrevious(Flux> entries) { - return subDictionary - .setAllValuesAndGetPrevious(entries.map(entry -> Map.entry(keySuffixHashFunction.apply(entry.getKey()), entry))) - .map(Entry::getValue); + return entries + .flatMap(entry -> this + .at(null, entry.getKey()) + .flatMap(stage -> stage + .setAndGetPrevious(entry.getValue()) + .map(prev -> Map.entry(entry.getKey(), prev)) + .doFinally(s -> stage.release())) + ); } @Override @@ -361,25 +203,6 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap replaceAllValues(boolean canKeysChange, Function, Mono>> entriesReplacer) { - return subDictionary.replaceAllValues(canKeysChange, - entry -> entriesReplacer - .apply(entry.getValue()) - .map(result -> Map.entry(keySuffixHashFunction.apply(result.getKey()), result)) - ); - } - - @Override - public Mono replaceAll(Function>, Mono> entriesReplacer) { - return subDictionary.replaceAll(hashEntry -> hashEntry - .getValue() - .get(null) - .flatMap(entry -> entriesReplacer.apply(Map.entry(entry.getKey(), - new DatabaseSingleMapped<>(hashEntry.getValue(), valueMapper.apply(entry.getKey())) - )))); - } - @Override public Mono> setAndGetPrevious(Map value) { return Mono @@ -388,19 +211,6 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap update(Function<@Nullable Map, @Nullable Map> updater, - boolean existsAlmostCertainly) { - return subDictionary.update(item -> { - var result = updater.apply(item == null ? null : this.deserializeMap(item)); - if (result == null) { - return null; - } else { - return this.serializeMap(result); - } - }, existsAlmostCertainly); - } - @Override public Mono> clearAndGetPrevious() { return subDictionary @@ -422,13 +232,61 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap getDbValueGetter(@Nullable CompositeSnapshot snapshot) { - ValueGetterBlocking> getter = subDictionary.getDbValueGetter(snapshot); - return key -> getter.get(keySuffixHashFunction.apply(key)).getValue(); + ValueGetterBlocking>> getter = subDictionary.getDbValueGetter(snapshot); + return key -> extractValue(getter.get(keySuffixHashFunction.apply(key)), key); } @Override public ValueGetter getAsyncDbValueGetter(@Nullable CompositeSnapshot snapshot) { - ValueGetter> getter = subDictionary.getAsyncDbValueGetter(snapshot); - return key -> getter.get(keySuffixHashFunction.apply(key)).map(Entry::getValue); + ValueGetter>> getter = subDictionary.getAsyncDbValueGetter(snapshot); + return key -> getter + .get(keySuffixHashFunction.apply(key)) + .flatMap(set -> this.extractValueTransformation(set, key)); + } + + private Mono extractValueTransformation(Set> entries, T key) { + return Mono.fromCallable(() -> extractValue(entries, key)); + } + + @Nullable + private U extractValue(Set> entries, T key) { + if (entries == null) return null; + for (Entry entry : entries) { + if (Objects.equals(entry.getKey(), key)) { + return entry.getValue(); + } + } + return null; + } + + @NotNull + private Set> insertValueOrCreate(@Nullable Set> entries, T key, U value) { + if (entries != null) { + entries.add(Map.entry(key, value)); + return entries; + } else { + return new ObjectArraySet<>(new Object[] {Map.entry(key, value)}); + } + } + + @Nullable + private Set> removeValueOrDelete(@Nullable Set> entries, T key) { + if (entries != null) { + var it = entries.iterator(); + while (it.hasNext()) { + var entry = it.next(); + if (Objects.equals(entry.getKey(), key)) { + it.remove(); + break; + } + } + if (entries.size() == 0) { + return null; + } else { + return entries; + } + } else { + return null; + } } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java index 338c26c..1d54089 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java @@ -2,10 +2,13 @@ package it.cavallium.dbengine.database.collections; import io.netty.buffer.ByteBuf; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.Serializer; import java.util.Optional; import java.util.function.Function; @@ -52,7 +55,9 @@ public class DatabaseSingle implements DatabaseStageEntry { } @Override - public Mono update(Function<@Nullable U, @Nullable U> updater, boolean existsAlmostCertainly) { + public Mono update(Function<@Nullable U, @Nullable U> updater, + UpdateReturnMode updateReturnMode, + boolean existsAlmostCertainly) { return dictionary.update(key.retain(), (oldValueSer) -> { var result = updater.apply(oldValueSer == null ? null : this.deserialize(oldValueSer)); if (result == null) { @@ -60,7 +65,20 @@ public class DatabaseSingle implements DatabaseStageEntry { } else { return this.serialize(result); } - }, existsAlmostCertainly); + }, updateReturnMode, existsAlmostCertainly).map(this::deserialize); + } + + @Override + public Mono> updateAndGetDelta(Function<@Nullable U, @Nullable U> updater, + boolean existsAlmostCertainly) { + return dictionary.updateAndGetDelta(key.retain(), (oldValueSer) -> { + var result = updater.apply(oldValueSer == null ? null : this.deserialize(oldValueSer)); + if (result == null) { + return null; + } else { + return this.serialize(result); + } + }, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, this::deserialize)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java new file mode 100644 index 0000000..80190b2 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java @@ -0,0 +1,181 @@ +package it.cavallium.dbengine.database.collections; + +import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.Delta; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.UpdateReturnMode; +import it.unimi.dsi.fastutil.objects.ObjectArraySet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.warp.commonutils.functional.TriFunction; +import reactor.core.publisher.Mono; + +@SuppressWarnings("unused") +public class DatabaseSingleBucket implements DatabaseStageEntry { + + private final DatabaseStageEntry>> bucketStage; + private final K key; + + public DatabaseSingleBucket(DatabaseStageEntry>> bucketStage, K key) { + this.bucketStage = bucketStage; + this.key = key; + } + + @Override + public Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { + return bucketStage.get(snapshot, existsAlmostCertainly).flatMap(this::extractValueTransformation); + } + + @Override + public Mono getOrDefault(@Nullable CompositeSnapshot snapshot, Mono defaultValue) { + return bucketStage.get(snapshot).flatMap(this::extractValueTransformation).switchIfEmpty(defaultValue); + } + + @Override + public Mono set(V value) { + return this.update(prev -> value, UpdateReturnMode.NOTHING).then(); + } + + @Override + public Mono setAndGetPrevious(V value) { + return this.update(prev -> value, UpdateReturnMode.GET_OLD_VALUE); + } + + @Override + public Mono setAndGetChanged(V value) { + return this.updateAndGetDelta(prev -> value).map(LLUtils::isDeltaChanged); + } + + @Override + public Mono update(Function<@Nullable V, @Nullable V> updater, + UpdateReturnMode updateReturnMode, + boolean existsAlmostCertainly) { + return bucketStage + .update(oldBucket -> { + V oldValue = extractValue(oldBucket); + V newValue = updater.apply(oldValue); + + if (newValue == null) { + return this.removeValueOrDelete(oldBucket); + } else { + return this.insertValueOrCreate(oldBucket, newValue); + } + }, updateReturnMode, existsAlmostCertainly) + .flatMap(this::extractValueTransformation); + } + + @Override + public Mono> updateAndGetDelta(Function<@Nullable V, @Nullable V> updater, boolean existsAlmostCertainly) { + return bucketStage + .updateAndGetDelta(oldBucket -> { + V oldValue = extractValue(oldBucket); + var result = updater.apply(oldValue); + if (result == null) { + return this.removeValueOrDelete(oldBucket); + } else { + return this.insertValueOrCreate(oldBucket, result); + } + }, existsAlmostCertainly) + .transform(mono -> LLUtils.mapDelta(mono, this::extractValue)); + } + + @Override + public Mono clear() { + return this.update(prev -> null, UpdateReturnMode.NOTHING).then(); + } + + @Override + public Mono clearAndGetPrevious() { + return this.update(prev -> null, UpdateReturnMode.GET_OLD_VALUE); + } + + @Override + public Mono clearAndGetStatus() { + return this.updateAndGetDelta(prev -> null).map(LLUtils::isDeltaChanged); + } + + @Override + public Mono close() { + return bucketStage.close(); + } + + @Override + public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { + return this.get(snapshot).map(prev -> 1L).defaultIfEmpty(0L); + } + + @Override + public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { + return this.get(snapshot).map(prev -> true).defaultIfEmpty(true); + } + + @Override + public DatabaseStageEntry entry() { + return this; + } + + @Override + public void release() { + bucketStage.release(); + } + + private Mono extractValueTransformation(Set> entries) { + return Mono.fromCallable(() -> extractValue(entries)); + } + + @Nullable + private V extractValue(Set> entries) { + if (entries == null) return null; + for (Entry entry : entries) { + if (Objects.equals(entry.getKey(), key)) { + return entry.getValue(); + } + } + return null; + } + + @NotNull + private Set> insertValueOrCreate(@Nullable Set> entries, V value) { + if (entries != null) { + var it = entries.iterator(); + while (it.hasNext()) { + var entry = it.next(); + if (Objects.equals(entry.getKey(), key)) { + it.remove(); + break; + } + } + entries.add(Map.entry(key, value)); + return entries; + } else { + return new ObjectArraySet<>(new Object[] {Map.entry(key, value)}); + } + } + + @Nullable + private Set> removeValueOrDelete(@Nullable Set> entries) { + if (entries != null) { + var it = entries.iterator(); + while (it.hasNext()) { + var entry = it.next(); + if (Objects.equals(entry.getKey(), key)) { + it.remove(); + break; + } + } + if (entries.size() == 0) { + return null; + } else { + return entries; + } + } else { + return null; + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java index b58f1fc..88c0e42 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java @@ -1,6 +1,9 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.Delta; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.Serializer; import java.util.function.Function; import org.jetbrains.annotations.Nullable; @@ -43,7 +46,9 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { } @Override - public Mono update(Function<@Nullable A, @Nullable A> updater, boolean existsAlmostCertainly) { + public Mono update(Function<@Nullable A, @Nullable A> updater, + UpdateReturnMode updateReturnMode, + boolean existsAlmostCertainly) { return serializedSingle.update(oldValue -> { var result = updater.apply(oldValue == null ? null : this.deserialize(oldValue)); if (result == null) { @@ -51,7 +56,20 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { } else { return this.serialize(result); } - }, existsAlmostCertainly); + }, updateReturnMode, existsAlmostCertainly).map(this::deserialize); + } + + @Override + public Mono> updateAndGetDelta(Function<@Nullable A, @Nullable A> updater, + boolean existsAlmostCertainly) { + return serializedSingle.updateAndGetDelta(oldValue -> { + var result = updater.apply(oldValue == null ? null : this.deserialize(oldValue)); + if (result == null) { + return null; + } else { + return this.serialize(result); + } + }, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, this::deserialize)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java index d4af134..f00ae47 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java @@ -1,6 +1,9 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.Delta; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.UpdateReturnMode; import java.util.Objects; import java.util.function.Function; import org.jetbrains.annotations.Nullable; @@ -39,10 +42,23 @@ public interface DatabaseStage extends DatabaseStageWithEntry { .switchIfEmpty(Mono.fromSupplier(() -> value != null)); } - Mono update(Function<@Nullable T, @Nullable T> updater, boolean existsAlmostCertainly); + default Mono update(Function<@Nullable T, @Nullable T> updater, + UpdateReturnMode updateReturnMode, + boolean existsAlmostCertainly) { + return this + .updateAndGetDelta(updater, existsAlmostCertainly) + .transform(prev -> LLUtils.resolveDelta(prev, updateReturnMode)); + } - default Mono update(Function<@Nullable T, @Nullable T> updater) { - return update(updater, false); + default Mono update(Function<@Nullable T, @Nullable T> updater, UpdateReturnMode updateReturnMode) { + return update(updater, updateReturnMode, false); + } + + Mono> updateAndGetDelta(Function<@Nullable T, @Nullable T> updater, + boolean existsAlmostCertainly); + + default Mono> updateAndGetDelta(Function<@Nullable T, @Nullable T> updater) { + return updateAndGetDelta(updater, false); } default Mono clear() { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index a05d018..abe1659 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -1,7 +1,11 @@ package it.cavallium.dbengine.database.collections; +import io.netty.buffer.ByteBuf; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.Delta; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking; import java.util.HashMap; @@ -14,6 +18,7 @@ import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; import reactor.util.function.Tuples; @SuppressWarnings("unused") @@ -39,18 +44,40 @@ public interface DatabaseStageMap> extends Dat Mono getUpdateMode(); - default Mono updateValue(T key, boolean existsAlmostCertainly, Function<@Nullable U, @Nullable U> updater) { + default Mono updateValue(T key, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly, Function<@Nullable U, @Nullable U> updater) { return this .at(null, key) .single() .flatMap(v -> v - .update(updater, existsAlmostCertainly) + .update(updater, updateReturnMode, existsAlmostCertainly) .doFinally(s -> v.release()) ); } + default Mono updateValue(T key, UpdateReturnMode updateReturnMode, Function<@Nullable U, @Nullable U> updater) { + return updateValue(key, updateReturnMode, false, updater); + } + default Mono updateValue(T key, Function<@Nullable U, @Nullable U> updater) { - return updateValue(key, false, updater); + return updateValueAndGetDelta(key, false, updater).map(LLUtils::isDeltaChanged).single(); + } + + default Mono updateValue(T key, boolean existsAlmostCertainly, Function<@Nullable U, @Nullable U> updater) { + return updateValueAndGetDelta(key, existsAlmostCertainly, updater).map(LLUtils::isDeltaChanged).single(); + } + + default Mono> updateValueAndGetDelta(T key, boolean existsAlmostCertainly, Function<@Nullable U, @Nullable U> updater) { + return this + .at(null, key) + .single() + .flatMap(v -> v + .updateAndGetDelta(updater, existsAlmostCertainly) + .doFinally(s -> v.release()) + ); + } + + default Mono> updateValueAndGetDelta(T key, Function<@Nullable U, @Nullable U> updater) { + return updateValueAndGetDelta(key, false, updater); } default Mono putValueAndGetPrevious(T key, U value) { @@ -156,7 +183,8 @@ public interface DatabaseStageMap> extends Dat } @Override - default Mono update(Function<@Nullable Map, @Nullable Map> updater, boolean existsAlmostCertainly) { + default Mono>> updateAndGetDelta(Function<@Nullable Map, @Nullable Map> updater, + boolean existsAlmostCertainly) { return this .getUpdateMode() .single() @@ -166,7 +194,7 @@ public interface DatabaseStageMap> extends Dat .getAllValues(null) .collectMap(Entry::getKey, Entry::getValue, HashMap::new) .single() - .>, Boolean>>handle((v, sink) -> { + .>, Optional>>>handle((v, sink) -> { if (v.isEmpty()) { v = null; } @@ -174,13 +202,12 @@ public interface DatabaseStageMap> extends Dat if (result != null && result.isEmpty()) { result = null; } - boolean changed = !Objects.equals(v, result); - sink.next(Tuples.of(Optional.ofNullable(result), changed)); + sink.next(Tuples.of(Optional.ofNullable(v), Optional.ofNullable(result))); }) .flatMap(result -> Mono - .justOrEmpty(result.getT1()) + .justOrEmpty(result.getT2()) .flatMap(values -> this.setAllValues(Flux.fromIterable(values.entrySet()))) - .thenReturn(result.getT2()) + .thenReturn(Delta.of(result.getT1().orElse(null), result.getT2().orElse(null))) ); } else if (updateMode == UpdateMode.ALLOW) { return Mono.fromCallable(() -> { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java new file mode 100644 index 0000000..bb09c0c --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java @@ -0,0 +1,50 @@ +package it.cavallium.dbengine.database.collections; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.serialization.Serializer; +import java.util.Map; +import java.util.Map.Entry; +import org.jetbrains.annotations.NotNull; + +class ValueWithHashSerializer implements Serializer, ByteBuf> { + + private final ByteBufAllocator allocator; + private final Serializer keySuffixSerializer; + private final Serializer valueSerializer; + + ValueWithHashSerializer(ByteBufAllocator allocator, + Serializer keySuffixSerializer, + Serializer valueSerializer) { + this.allocator = allocator; + this.keySuffixSerializer = keySuffixSerializer; + this.valueSerializer = valueSerializer; + } + + @Override + public @NotNull Entry deserialize(@NotNull ByteBuf serialized) { + try { + X deserializedKey = keySuffixSerializer.deserialize(serialized.retain()); + Y deserializedValue = valueSerializer.deserialize(serialized.retain()); + return Map.entry(deserializedKey, deserializedValue); + } finally { + serialized.release(); + } + } + + @Override + public @NotNull ByteBuf serialize(@NotNull Entry deserialized) { + ByteBuf keySuffix = keySuffixSerializer.serialize(deserialized.getKey()); + try { + ByteBuf value = valueSerializer.serialize(deserialized.getValue()); + try { + return LLUtils.compositeBuffer(allocator, keySuffix.retain(), value.retain()); + } finally { + value.release(); + } + } finally { + keySuffix.release(); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java new file mode 100644 index 0000000..500aecc --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java @@ -0,0 +1,53 @@ +package it.cavallium.dbengine.database.collections; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import it.cavallium.dbengine.database.serialization.Serializer; +import it.unimi.dsi.fastutil.objects.ObjectArraySet; +import java.util.Set; +import org.jetbrains.annotations.NotNull; + +class ValuesSetSerializer implements Serializer, ByteBuf> { + + private final ByteBufAllocator allocator; + private final Serializer entrySerializer; + + ValuesSetSerializer(ByteBufAllocator allocator, Serializer entrySerializer) { + this.allocator = allocator; + this.entrySerializer = entrySerializer; + } + + @Override + public @NotNull Set deserialize(@NotNull ByteBuf serialized) { + try { + int entriesLength = serialized.readInt(); + Object[] values = new Object[entriesLength]; + for (int i = 0; i < entriesLength; i++) { + X entry = entrySerializer.deserialize(serialized.retain()); + values[i] = entry; + } + return new ObjectArraySet<>(values); + } finally { + serialized.release(); + } + } + + @Override + public @NotNull ByteBuf serialize(@NotNull Set deserialized) { + ByteBuf output = allocator.buffer(); + try { + output.writeInt(deserialized.size()); + deserialized.forEach((entry) -> { + ByteBuf serialized = entrySerializer.serialize(entry); + try { + output.writeBytes(serialized); + } finally { + serialized.release(); + } + }); + return output.retain(); + } finally { + output.release(); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 7620e47..d774fb2 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -4,12 +4,14 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; import io.netty.util.ReferenceCounted; +import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.UpdateReturnMode; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; @@ -165,7 +167,7 @@ public class LLLocalDictionary implements LLDictionary { } private int getLockIndex(ByteBuf key) { - return Math.abs(key.hashCode() % STRIPES); + return Math.abs(LLUtils.hashCode(key) % STRIPES); } private IntArrayList getLockIndices(List keys) { @@ -248,7 +250,7 @@ public class LLLocalDictionary implements LLDictionary { assert resultNioBuf.isDirect(); valueSize = db.get(cfh, Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS), - keyNioBuffer, + keyNioBuffer.position(0), resultNioBuf ); if (valueSize != RocksDB.NOT_FOUND) { @@ -492,13 +494,18 @@ public class LLLocalDictionary implements LLDictionary { return Mono.fromSupplier(() -> updateMode); } + // Remember to change also updateAndGetDelta() if you are modifying this function + @SuppressWarnings("DuplicatedCode") @Override - public Mono update(ByteBuf key, + public Mono update(ByteBuf key, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, + UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { return Mono .fromCallable(() -> { - if (updateMode == UpdateMode.DISALLOW) throw new UnsupportedOperationException("update() is disallowed"); + if (updateMode == UpdateMode.DISALLOW) { + throw new UnsupportedOperationException("update() is disallowed"); + } StampedLock lock; long stamp; if (updateMode == UpdateMode.ALLOW) { @@ -514,7 +521,6 @@ public class LLLocalDictionary implements LLDictionary { logger.trace("Reading {}", LLUtils.toString(key)); } while (true) { - boolean changed = false; @Nullable ByteBuf prevData; var prevDataHolder = existsAlmostCertainly ? null : new Holder(); if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) { @@ -561,7 +567,6 @@ public class LLLocalDictionary implements LLDictionary { if (logger.isTraceEnabled()) { logger.trace("Deleting {}", LLUtils.toString(key)); } - changed = true; dbDelete(cfh, null, key.retain()); } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { @@ -580,10 +585,134 @@ public class LLLocalDictionary implements LLDictionary { if (logger.isTraceEnabled()) { logger.trace("Writing {}: {}", LLUtils.toString(key), LLUtils.toString(newData)); } - changed = true; dbPut(cfh, null, key.retain(), newData.retain()); } - return changed; + switch (updateReturnMode) { + case GET_NEW_VALUE: + return newData != null ? newData.retain() : null; + case GET_OLD_VALUE: + return prevData != null ? prevData.retain() : null; + case NOTHING: + return null; + default: + throw new IllegalArgumentException(); + } + } finally { + if (newData != null) { + newData.release(); + } + } + } finally { + if (prevData != null) { + prevData.release(); + } + } + } + } finally { + if (updateMode == UpdateMode.ALLOW) { + lock.unlock(stamp); + } + } + }) + .onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause)) + .subscribeOn(dbScheduler) + .doFinally(s -> key.release()); + } + + // Remember to change also update() if you are modifying this function + @SuppressWarnings("DuplicatedCode") + @Override + public Mono> updateAndGetDelta(ByteBuf key, + Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, + boolean existsAlmostCertainly) { + return Mono + .fromCallable(() -> { + if (updateMode == UpdateMode.DISALLOW) throw new UnsupportedOperationException("update() is disallowed"); + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); + + stamp = lock.readLock(); + } else { + lock = null; + stamp = 0; + } + try { + if (logger.isTraceEnabled()) { + logger.trace("Reading {}", LLUtils.toString(key)); + } + while (true) { + @Nullable ByteBuf prevData; + var prevDataHolder = existsAlmostCertainly ? null : new Holder(); + if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) { + if (!existsAlmostCertainly && prevDataHolder.getValue() != null) { + byte @Nullable [] prevDataBytes = prevDataHolder.getValue(); + if (prevDataBytes != null) { + prevData = wrappedBuffer(prevDataBytes); + } else { + prevData = null; + } + } else { + prevData = dbGet(cfh, null, key.retain(), existsAlmostCertainly); + } + } else { + prevData = null; + } + try { + @Nullable ByteBuf newData; + ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice(); + try { + newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain()); + assert prevDataToSendToUpdater == null + || prevDataToSendToUpdater.readerIndex() == 0 + || !prevDataToSendToUpdater.isReadable(); + } finally { + if (prevDataToSendToUpdater != null) { + prevDataToSendToUpdater.release(); + } + } + try { + if (prevData != null && newData == null) { + //noinspection DuplicatedCode + if (updateMode == UpdateMode.ALLOW) { + var ws = lock.tryConvertToWriteLock(stamp); + if (ws != 0) { + stamp = ws; + } else { + lock.unlockRead(stamp); + + stamp = lock.writeLock(); + continue; + } + } + if (logger.isTraceEnabled()) { + logger.trace("Deleting {}", LLUtils.toString(key)); + } + dbDelete(cfh, null, key.retain()); + } else if (newData != null + && (prevData == null || !LLUtils.equals(prevData, newData))) { + //noinspection DuplicatedCode + if (updateMode == UpdateMode.ALLOW) { + var ws = lock.tryConvertToWriteLock(stamp); + if (ws != 0) { + stamp = ws; + } else { + lock.unlockRead(stamp); + + stamp = lock.writeLock(); + continue; + } + } + if (logger.isTraceEnabled()) { + logger.trace("Writing {}: {}", LLUtils.toString(key), LLUtils.toString(newData)); + } + dbPut(cfh, null, key.retain(), newData.retain()); + } + return Delta.of( + prevData != null ? prevData.retain() : null, + newData != null ? newData.retain() : null + ); } finally { if (newData != null) { newData.release(); diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java index d766527..ba5b758 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java @@ -42,8 +42,9 @@ public interface Serializer { @Override public @NotNull String deserialize(@NotNull ByteBuf serialized) { try { - var result = serialized.toString(StandardCharsets.UTF_8); - serialized.readerIndex(serialized.writerIndex()); + var length = serialized.readInt(); + var result = serialized.toString(serialized.readerIndex(), length, StandardCharsets.UTF_8); + serialized.readerIndex(serialized.readerIndex() + length); return result; } finally { serialized.release(); @@ -53,7 +54,9 @@ public interface Serializer { @Override public @NotNull ByteBuf serialize(@NotNull String deserialized) { // UTF-8 uses max. 3 bytes per char, so calculate the worst case. - ByteBuf buf = allocator.buffer(ByteBufUtil.utf8MaxBytes(deserialized)); + int length = ByteBufUtil.utf8Bytes(deserialized); + ByteBuf buf = allocator.buffer(Integer.BYTES + length); + buf.writeInt(length); ByteBufUtil.writeUtf8(buf, deserialized); return buf; } diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index 04fd0bd..154719f 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -102,21 +102,21 @@ public class DbTestUtils { ); } else { return DatabaseMapDictionaryHashed.simple(dictionary, - SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, keyBytes), Serializer.utf8(DbTestUtils.ALLOCATOR), - String::hashCode, + Serializer.utf8(DbTestUtils.ALLOCATOR), + s -> (short) s.hashCode(), new SerializerFixedBinaryLength<>() { @Override public int getSerializedBinaryLength() { - return keyBytes; + return Short.BYTES; } @Override - public @NotNull Integer deserialize(@NotNull ByteBuf serialized) { + public @NotNull Short deserialize(@NotNull ByteBuf serialized) { try { var prevReaderIdx = serialized.readerIndex(); - var val = serialized.readInt(); - serialized.readerIndex(prevReaderIdx + keyBytes); + var val = serialized.readShort(); + serialized.readerIndex(prevReaderIdx + Short.BYTES); return val; } finally { serialized.release(); @@ -124,11 +124,11 @@ public class DbTestUtils { } @Override - public @NotNull ByteBuf serialize(@NotNull Integer deserialized) { - var out = DbTestUtils.ALLOCATOR.directBuffer(keyBytes); + public @NotNull ByteBuf serialize(@NotNull Short deserialized) { + var out = DbTestUtils.ALLOCATOR.directBuffer(Short.BYTES); try { - out.writeInt(deserialized); - out.writerIndex(keyBytes); + out.writeShort(deserialized); + out.writerIndex(Short.BYTES); return out.retain(); } finally { out.release(); @@ -139,14 +139,32 @@ public class DbTestUtils { } } - public static DatabaseMapDictionaryDeep, DatabaseMapDictionary> tempDatabaseMapDictionaryDeepMap( + public static DatabaseMapDictionaryDeep, + DatabaseMapDictionary> tempDatabaseMapDictionaryDeepMap( LLDictionary dictionary, int key1Bytes, int key2Bytes) { return DatabaseMapDictionaryDeep.deepTail(dictionary, SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key1Bytes), key2Bytes, - new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key2Bytes), Serializer.utf8(DbTestUtils.ALLOCATOR)) + new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key2Bytes), + Serializer.utf8(DbTestUtils.ALLOCATOR) + ) + ); + } + + public static DatabaseMapDictionaryDeep, + DatabaseMapDictionaryHashed> tempDatabaseMapDictionaryDeepMapHashMap( + LLDictionary dictionary, + int key1Bytes) { + return DatabaseMapDictionaryDeep.deepTail(dictionary, + SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key1Bytes), + Integer.BYTES, + new SubStageGetterHashMap<>(Serializer.utf8(DbTestUtils.ALLOCATOR), + Serializer.utf8(DbTestUtils.ALLOCATOR), + String::hashCode, + SerializerFixedBinaryLength.intSerializer(DbTestUtils.ALLOCATOR) + ) ); } diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java index e294deb..6851e27 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java @@ -4,6 +4,7 @@ import static it.cavallium.dbengine.DbTestUtils.*; import it.cavallium.dbengine.database.UpdateMode; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -36,25 +37,25 @@ public class TestDictionaryMap { + "01234567890123456789012345678901234567890123456789012345678901234567890123456789"; private static Stream provideArgumentsPut() { - var goodKeys = Set.of("12345"); - Set badKeys; + var goodKeys = List.of("12345"); + List badKeys; if (isTestBadKeysEnabled()) { - badKeys = Set.of("", "aaaa", "aaaaaa"); + badKeys = List.of("", "aaaa", "aaaaaa"); } else { - badKeys = Set.of(); + badKeys = List.of(); } - Set> keys = Stream.concat( + List> keys = Stream.concat( goodKeys.stream().map(s -> Tuples.of(s, false)), badKeys.stream().map(s -> Tuples.of(s, true)) - ).collect(Collectors.toSet()); - var values = Set.of("", "\0", BIG_STRING); + ).collect(Collectors.toList()); + var values = List.of("", "\0", BIG_STRING); return keys .stream() .flatMap(keyTuple -> { Stream strm; if (keyTuple.getT2()) { - strm = values.stream().limit(1); + strm = values.stream().findFirst().stream(); } else { strm = values.stream(); } @@ -72,8 +73,9 @@ public class TestDictionaryMap { ), Tuples.of(DbType.HASH_MAP, entryTuple.getT1(), entryTuple.getT2(), entryTuple.getT3(), - entryTuple.getT4() + false ))) + .filter(tuple -> !(tuple.getT1() == DbType.HASH_MAP && tuple.getT2() != UpdateMode.ALLOW)) .map(fullTuple -> Arguments.of(fullTuple.getT1(), fullTuple.getT2(), fullTuple.getT3(), fullTuple.getT4(), fullTuple.getT5())); } @@ -290,18 +292,18 @@ public class TestDictionaryMap { } private static Stream provideArgumentsPutMulti() { - var goodKeys = Set.of(Set.of("12345", "67890"), Set.of()); - Set> badKeys; + var goodKeys = List.of(List.of("12345", "67890"), List.of()); + List> badKeys; if (isTestBadKeysEnabled()) { - badKeys = Set.of(Set.of("", "12345"), Set.of("45678", "aaaa"), Set.of("aaaaaa", "capra")); + badKeys = List.of(List.of("", "12345"), List.of("45678", "aaaa"), List.of("aaaaaa", "capra")); } else { - badKeys = Set.of(); + badKeys = List.of(); } - Set, Boolean>> keys = Stream.concat( + List, Boolean>> keys = Stream.concat( goodKeys.stream().map(s -> Tuples.of(s, false)), badKeys.stream().map(s -> Tuples.of(s, true)) - ).collect(Collectors.toSet()); - var values = Set.of("", "\0", BIG_STRING); + ).collect(Collectors.toList()); + var values = List.of("", "\0", BIG_STRING); return keys .stream() @@ -319,8 +321,9 @@ public class TestDictionaryMap { entryTuple.getT3() ), Tuples.of(DbType.HASH_MAP, entryTuple.getT1(), entryTuple.getT2(), - entryTuple.getT3() + false ))) + .filter(tuple -> !(tuple.getT1() == DbType.HASH_MAP && tuple.getT2() != UpdateMode.ALLOW)) .map(fullTuple -> Arguments.of(fullTuple.getT1(), fullTuple.getT2(), fullTuple.getT3(), fullTuple.getT4())); } @@ -438,7 +441,7 @@ public class TestDictionaryMap { if (entries.isEmpty()) { removalMono = Mono.empty(); } else { - removalMono = map.remove(entries.keySet().stream().findAny().orElseThrow()); + removalMono = map.remove(entries.keySet().stream().findFirst().orElseThrow()); } return Flux .concat( @@ -609,9 +612,10 @@ public class TestDictionaryMap { ) .doFinally(s -> map.release()) ) + .flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val)) )); if (shouldFail) { - stpVer.expectNext(true).verifyError(); + stpVer.verifyError(); } else { stpVer.expectNext(true, entries.isEmpty()).verifyComplete(); } @@ -634,9 +638,10 @@ public class TestDictionaryMap { ) .doFinally(s -> map.release()) ) + .flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val)) )); if (shouldFail) { - stpVer.expectNext(true).verifyError(); + stpVer.verifyError(); } else { stpVer.expectNext(true, entries.isEmpty(), true).verifyComplete(); } diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java new file mode 100644 index 0000000..053b10e --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeepHashMap.java @@ -0,0 +1,121 @@ +package it.cavallium.dbengine; + +import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryDeepMapHashMap; +import static it.cavallium.dbengine.DbTestUtils.tempDb; +import static it.cavallium.dbengine.DbTestUtils.tempDictionary; + +import it.cavallium.dbengine.database.UpdateMode; +import java.util.Arrays; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import reactor.test.StepVerifier.Step; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuple4; +import reactor.util.function.Tuples; + +public class TestDictionaryMapDeepHashMap { + + private static boolean isTestBadKeysEnabled() { + return System.getProperty("badkeys", "true").equalsIgnoreCase("true"); + } + + private static final String BIG_STRING + = "01234567890123456789012345678901234567890123456789012345678901234567890123456789" + + "01234567890123456789012345678901234567890123456789012345678901234567890123456789" + + "01234567890123456789012345678901234567890123456789012345678901234567890123456789" + + "01234567890123456789012345678901234567890123456789012345678901234567890123456789" + + "01234567890123456789012345678901234567890123456789012345678901234567890123456789" + + "01234567890123456789012345678901234567890123456789012345678901234567890123456789"; + + private static Stream provideArgumentsPut() { + var goodKeys1 = Set.of("12345", "zebra"); + Set badKeys1; + if (isTestBadKeysEnabled()) { + badKeys1 = Set.of("", "a", "aaaa", "aaaaaa"); + } else { + badKeys1 = Set.of(); + } + var goodKeys2 = Set.of("123456", "anatra", "", "a", "aaaaa", "aaaaaaa"); + + var values = Set.of("a", "", "\0", "\0\0", "z", "azzszgzczqz", BIG_STRING); + + Flux> failOnKeys1 = Flux + .fromIterable(badKeys1) + .map(badKey1 -> Tuples.of( + badKey1, + goodKeys2.stream().findAny().orElseThrow(), + values.stream().findAny().orElseThrow(), + true + )); + + Flux> goodKeys1And2 = Flux + .fromIterable(values) + .map(value -> Tuples.of( + goodKeys1.stream().findAny().orElseThrow(), + goodKeys2.stream().findAny().orElseThrow(), + value, + false + )); + + Flux> keys1And2 = Flux + .concat( + goodKeys1And2, + failOnKeys1 + ); + + return keys1And2 + .flatMap(entryTuple -> Flux + .fromArray(UpdateMode.values()) + .map(updateMode -> Tuples.of(updateMode, + entryTuple.getT1(), + entryTuple.getT2(), + entryTuple.getT3(), + entryTuple.getT4() + )) + ) + .map(fullTuple -> Arguments.of(fullTuple.getT1(), + fullTuple.getT2(), + fullTuple.getT3(), + fullTuple.getT4(), + fullTuple.getT1() != UpdateMode.ALLOW || fullTuple.getT5() + )) + .toStream(); + } + + @ParameterizedTest + @MethodSource("provideArgumentsPut") + public void testAtPutValueGetAllValues(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) { + var stpVer = StepVerifier + .create(tempDb(db -> tempDictionary(db, updateMode) + .map(dict -> tempDatabaseMapDictionaryDeepMapHashMap(dict, 5)) + .flatMapMany(map -> map + .at(null, key1).flatMap(v -> v.putValue(key2, value).doFinally(s -> v.release())) + .thenMany(map + .getAllValues(null) + .map(Entry::getValue) + .flatMap(maps -> Flux.fromIterable(maps.entrySet())) + .map(Entry::getValue) + ) + .doFinally(s -> map.release()) + ) + )); + if (shouldFail) { + stpVer.verifyError(); + } else { + stpVer.expectNext(value).verifyComplete(); + } + } + +}