From a83f1ff1a605cd5b58481697869368c6c8133f32 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 27 Mar 2023 22:00:32 +0200 Subject: [PATCH] Optimize database --- .../it/cavallium/dbengine/database/Delta.java | 6 ++ .../dbengine/database/LLDictionary.java | 6 +- .../database/LLMultiDatabaseConnection.java | 13 ++++ .../dbengine/database/LLSingleton.java | 6 +- .../collections/DatabaseMapDictionary.java | 35 ++++------ .../collections/DatabaseMapSingle.java | 29 +++----- .../collections/DatabaseSingleMapped.java | 26 ++------ .../collections/DatabaseSingleton.java | 42 +++--------- .../database/disk/AbstractRocksDBColumn.java | 5 +- .../disk/BinarySerializationFunction.java | 7 -- .../disk/CachedSerializationFunction.java | 66 +++++++++++++++++++ .../disk/LLLocalDatabaseConnection.java | 11 ++++ .../database/disk/LLLocalDictionary.java | 5 +- .../database/disk/LLLocalSingleton.java | 5 +- .../disk/OptimisticRocksDBColumn.java | 4 +- .../disk/PessimisticRocksDBColumn.java | 4 +- .../dbengine/database/disk/RocksDBColumn.java | 3 +- .../database/disk/StandardRocksDBColumn.java | 3 +- .../memory/LLMemoryDatabaseConnection.java | 9 +++ .../database/memory/LLMemoryDictionary.java | 4 +- .../database/memory/LLMemorySingleton.java | 6 +- 21 files changed, 172 insertions(+), 123 deletions(-) delete mode 100644 src/main/java/it/cavallium/dbengine/database/disk/BinarySerializationFunction.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/CachedSerializationFunction.java diff --git a/src/main/java/it/cavallium/dbengine/database/Delta.java b/src/main/java/it/cavallium/dbengine/database/Delta.java index 425b7f6..156d2cc 100644 --- a/src/main/java/it/cavallium/dbengine/database/Delta.java +++ b/src/main/java/it/cavallium/dbengine/database/Delta.java @@ -5,6 +5,7 @@ import org.jetbrains.annotations.Nullable; public class Delta { + private static final Delta EMPTY = new Delta<>(null, null); private final @Nullable T previous; private final @Nullable T current; @@ -25,6 +26,11 @@ public class Delta { return current; } + public static Delta empty() { + //noinspection unchecked + return (Delta) EMPTY; + } + @Override public boolean equals(Object obj) { if (obj == this) diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index a196572..f9909dc 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -2,8 +2,8 @@ package it.cavallium.dbengine.database; import it.cavallium.buffer.Buf; import it.cavallium.dbengine.client.BadBlock; -import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.io.IOException; import java.util.List; import java.util.function.Function; @@ -23,12 +23,12 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { UpdateMode getUpdateMode(); - default Buf update(Buf key, BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) { + default Buf update(Buf key, SerializationFunction<@Nullable Buf, @Nullable Buf> updater, UpdateReturnMode updateReturnMode) { LLDelta prev = this.updateAndGetDelta(key, updater); return LLUtils.resolveLLDelta(prev, updateReturnMode); } - LLDelta updateAndGetDelta(Buf key, BinarySerializationFunction updater); + LLDelta updateAndGetDelta(Buf key, SerializationFunction<@Nullable Buf, @Nullable Buf> updater); void clear(); diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java index 9b44614..e51f85c 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.StringJoiner; import java.util.concurrent.CompletionException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -175,4 +176,16 @@ public class LLMultiDatabaseConnection implements LLDatabaseConnection { } })); } + + @Override + public String toString() { + return new StringJoiner(", ", LLMultiDatabaseConnection.class.getSimpleName() + "[", "]") + .add("databaseShardConnections=" + databaseShardConnections) + .add("luceneShardConnections=" + luceneShardConnections) + .add("allConnections=" + allConnections) + .add("defaultDatabaseConnection=" + defaultDatabaseConnection) + .add("defaultLuceneConnection=" + defaultLuceneConnection) + .add("anyConnection=" + anyConnection) + .toString(); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLSingleton.java b/src/main/java/it/cavallium/dbengine/database/LLSingleton.java index c642cb5..00498f6 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/LLSingleton.java @@ -1,7 +1,7 @@ package it.cavallium.dbengine.database; import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.database.disk.BinarySerializationFunction; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.io.IOException; import org.jetbrains.annotations.Nullable; @@ -11,12 +11,12 @@ public interface LLSingleton extends LLKeyValueDatabaseStructure { void set(Buf value); - default Buf update(BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) { + default Buf update(SerializationFunction<@Nullable Buf, @Nullable Buf> updater, UpdateReturnMode updateReturnMode) { var prev = this.updateAndGetDelta(updater); return LLUtils.resolveLLDelta(prev, updateReturnMode); } - LLDelta updateAndGetDelta(BinarySerializationFunction updater); + LLDelta updateAndGetDelta(SerializationFunction<@Nullable Buf, @Nullable Buf> updater); String getColumnName(); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 53f9978..b83e841 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -5,7 +5,6 @@ import it.cavallium.buffer.BufDataInput; import it.cavallium.buffer.BufDataOutput; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; -import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLEntry; @@ -15,13 +14,12 @@ import it.cavallium.dbengine.database.SerializedKey; import it.cavallium.dbengine.database.SubStageEntry; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; -import it.cavallium.dbengine.database.disk.BinarySerializationFunction; +import it.cavallium.dbengine.database.disk.CachedSerializationFunction; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; -import it.cavallium.dbengine.utils.DBException; import it.cavallium.dbengine.utils.StreamUtils; import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; @@ -32,7 +30,6 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; @@ -108,6 +105,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep extends DatabaseMapDictionaryDeep updater) { var keyMono = serializeKeySuffixToKey(keySuffix); - var result = dictionary.update(keyMono, getSerializedUpdater(updater), updateReturnMode); - return deserializeValue(keySuffix, BufDataInput.create(result)); + var serializedUpdater = getSerializedUpdater(updater); + dictionary.update(keyMono, serializedUpdater, UpdateReturnMode.NOTHING); + return serializedUpdater.getResult(updateReturnMode); } @Override public Delta updateValueAndGetDelta(T keySuffix, SerializationFunction<@Nullable U, @Nullable U> updater) { var keyMono = serializeKeySuffixToKey(keySuffix); - LLDelta delta = dictionary.updateAndGetDelta(keyMono, getSerializedUpdater(updater)); - return LLUtils.mapLLDelta(delta, in -> valueSerializer.deserialize(BufDataInput.create(in))); + var serializedUpdater = getSerializedUpdater(updater); + dictionary.update(keyMono, serializedUpdater, UpdateReturnMode.NOTHING); + return serializedUpdater.getDelta(); } - public BinarySerializationFunction getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) { - return oldSerialized -> { - U result; - if (oldSerialized == null) { - result = updater.apply(null); - } else { - result = updater.apply(valueSerializer.deserialize(BufDataInput.create(oldSerialized))); - } - if (result == null) { - return null; - } else { - return serializeValue(result); - } - }; + public CachedSerializationFunction getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) { + return new CachedSerializationFunction<>(updater, this::serializeValue, this::deserializeValue); } public KVSerializationFunction<@NotNull T, @Nullable Buf, @Nullable Buf> getSerializedUpdater( diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java index 7ba2b61..22cf09f 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java @@ -12,7 +12,7 @@ import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; -import it.cavallium.dbengine.database.disk.BinarySerializationFunction; +import it.cavallium.dbengine.database.disk.CachedSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; @@ -89,31 +89,20 @@ public final class DatabaseMapSingle implements DatabaseStageEntry { @Override public U update(SerializationFunction<@Nullable U, @Nullable U> updater, UpdateReturnMode updateReturnMode) { - Buf resultBytes = dictionary.update(key, this.createUpdater(updater), updateReturnMode); - return resultBytes != null ? deserializeValue(resultBytes) : null; + var serializedUpdater = createUpdater(updater); + dictionary.update(key, serializedUpdater, UpdateReturnMode.NOTHING); + return serializedUpdater.getResult(updateReturnMode); } @Override public Delta updateAndGetDelta(SerializationFunction<@Nullable U, @Nullable U> updater) { - var delta = dictionary.updateAndGetDelta(key, this.createUpdater(updater)); - return LLUtils.mapLLDelta(delta, bytes -> serializer.deserialize(BufDataInput.create(bytes))); + var serializedUpdater = createUpdater(updater); + dictionary.update(key, serializedUpdater, UpdateReturnMode.NOTHING); + return serializedUpdater.getDelta(); } - private BinarySerializationFunction createUpdater(SerializationFunction updater) { - return oldBytes -> { - U result; - if (oldBytes == null) { - result = updater.apply(null); - } else { - U deserializedValue = serializer.deserialize(BufDataInput.create(oldBytes)); - result = updater.apply(deserializedValue); - } - if (result == null) { - return null; - } else { - return serializeValue(result); - } - }; + private CachedSerializationFunction createUpdater(SerializationFunction updater) { + return new CachedSerializationFunction<>(updater, this::serializeValue, this::deserializeValue); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java index fd5adaa..467fdf6 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java @@ -4,8 +4,8 @@ import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.Mapper; import it.cavallium.dbengine.database.Delta; -import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.disk.CachedSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.util.stream.Stream; @@ -63,28 +63,16 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { @Override public A update(SerializationFunction<@Nullable A, @Nullable A> updater, UpdateReturnMode updateReturnMode) { - B prev = serializedSingle.update(oldValue -> { - var result = updater.apply(oldValue == null ? null : this.unMap(oldValue)); - if (result == null) { - return null; - } else { - return this.map(result); - } - }, updateReturnMode); - return prev != null ? unMap(prev) : null; + var mappedUpdater = new CachedSerializationFunction<>(updater, this::map, this::unMap); + serializedSingle.update(mappedUpdater, UpdateReturnMode.NOTHING); + return mappedUpdater.getResult(updateReturnMode); } @Override public Delta updateAndGetDelta(SerializationFunction<@Nullable A, @Nullable A> updater) { - var delta = serializedSingle.updateAndGetDelta(oldValue -> { - var result = updater.apply(oldValue == null ? null : this.unMap(oldValue)); - if (result == null) { - return null; - } else { - return this.map(result); - } - }); - return LLUtils.mapDelta(delta, this::unMap); + var mappedUpdater = new CachedSerializationFunction<>(updater, this::map, this::unMap); + serializedSingle.update(mappedUpdater, UpdateReturnMode.NOTHING); + return mappedUpdater.getDelta(); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java index 2ce42a5..8d2b4c5 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java @@ -8,8 +8,8 @@ import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; -import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.disk.CachedSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; @@ -83,43 +83,17 @@ public class DatabaseSingleton implements DatabaseStageEntry { } @Override - public U update(SerializationFunction<@Nullable U, @Nullable U> updater, - UpdateReturnMode updateReturnMode) { - Buf resultBuf = singleton - .update((oldValueSer) -> { - U result; - if (oldValueSer == null) { - result = updater.apply(null); - } else { - U deserializedValue = serializer.deserialize(BufDataInput.create(oldValueSer)); - result = updater.apply(deserializedValue); - } - if (result == null) { - return null; - } else { - return serializeValue(result); - } - }, updateReturnMode); - return this.deserializeValue(resultBuf); + public U update(SerializationFunction<@Nullable U, @Nullable U> updater, UpdateReturnMode updateReturnMode) { + var serializedUpdater = new CachedSerializationFunction<>(updater, this::serializeValue, this::deserializeValue); + singleton.update(serializedUpdater, UpdateReturnMode.NOTHING); + return serializedUpdater.getResult(updateReturnMode); } @Override public Delta updateAndGetDelta(SerializationFunction<@Nullable U, @Nullable U> updater) { - var mono = singleton.updateAndGetDelta((oldValueSer) -> { - U result; - if (oldValueSer == null) { - result = updater.apply(null); - } else { - U deserializedValue = serializer.deserialize(BufDataInput.create(oldValueSer)); - result = updater.apply(deserializedValue); - } - if (result == null) { - return null; - } else { - return serializeValue(result); - } - }); - return LLUtils.mapLLDelta(mono, serialized -> serializer.deserialize(BufDataInput.create(serialized))); + var serializedUpdater = new CachedSerializationFunction<>(updater, this::serializeValue, this::deserializeValue); + singleton.update(serializedUpdater, UpdateReturnMode.NOTHING); + return serializedUpdater.getDelta(); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 800dfbd..afb1f12 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -11,6 +11,7 @@ import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.RepeatedElementList; import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -503,7 +504,7 @@ public sealed abstract class AbstractRocksDBColumn implements public final @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions, Buf key, - BinarySerializationFunction updater, + SerializationFunction<@Nullable Buf, @Nullable Buf> updater, UpdateAtomicResultMode returnMode) { var closeReadLock = closeLock.readLock(); try { @@ -542,7 +543,7 @@ public sealed abstract class AbstractRocksDBColumn implements protected abstract @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions, Buf key, - BinarySerializationFunction updater, + SerializationFunction<@Nullable Buf, @Nullable Buf> updater, UpdateAtomicResultMode returnMode); @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/BinarySerializationFunction.java b/src/main/java/it/cavallium/dbengine/database/disk/BinarySerializationFunction.java deleted file mode 100644 index 18adba5..0000000 --- a/src/main/java/it/cavallium/dbengine/database/disk/BinarySerializationFunction.java +++ /dev/null @@ -1,7 +0,0 @@ -package it.cavallium.dbengine.database.disk; - -import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.database.serialization.SerializationFunction; -import org.jetbrains.annotations.Nullable; - -public interface BinarySerializationFunction extends SerializationFunction<@Nullable Buf, @Nullable Buf> {} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedSerializationFunction.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedSerializationFunction.java new file mode 100644 index 0000000..c4fa119 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedSerializationFunction.java @@ -0,0 +1,66 @@ +package it.cavallium.dbengine.database.disk; + +import it.cavallium.dbengine.database.Delta; +import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.SerializationFunction; +import java.util.function.Function; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class CachedSerializationFunction implements SerializationFunction { + + private final SerializationFunction updater; + private final Function serializer; + private final Function deserializer; + + private @Nullable U prevDeserialized; + private @Nullable U currDeserialized; + + public CachedSerializationFunction(SerializationFunction<@Nullable U, @Nullable U> updater, + Function<@NotNull U, @Nullable B> serializer, + Function<@NotNull A, @Nullable U> deserializer) { + this.updater = updater; + this.serializer = serializer; + this.deserializer = deserializer; + } + + @Override + public @Nullable B apply(@Nullable A oldSerialized) throws SerializationException { + U result; + U old; + if (oldSerialized == null) { + old = null; + } else { + old = deserializer.apply(oldSerialized); + } + this.prevDeserialized = old; + result = updater.apply(old); + this.currDeserialized = result; + if (result == null) { + return null; + } else { + return serializer.apply(result); + } + } + + public @Nullable U getPrevDeserialized() { + return prevDeserialized; + } + + public @Nullable U getCurrDeserialized() { + return currDeserialized; + } + + public Delta getDelta() { + return new Delta<>(prevDeserialized, currDeserialized); + } + + public U getResult(UpdateReturnMode updateReturnMode) { + return switch (updateReturnMode) { + case GET_NEW_VALUE -> currDeserialized; + case GET_OLD_VALUE -> prevDeserialized; + case NOTHING -> null; + }; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java index 96a9080..a8b2205 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java @@ -16,6 +16,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.LinkedList; import java.util.List; +import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicBoolean; import org.jetbrains.annotations.Nullable; @@ -103,4 +104,14 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { if (connected.compareAndSet(true, false)) { } } + + @Override + public String toString() { + return new StringJoiner(", ", LLLocalDatabaseConnection.class.getSimpleName() + "[", "]") + .add("connected=" + connected) + .add("meterRegistry=" + meterRegistry) + .add("basePath=" + basePath) + .add("inMemory=" + inMemory) + .toString(); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index f483b31..02b79ce 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -36,6 +36,7 @@ import it.cavallium.dbengine.database.SerializedKey; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import it.cavallium.dbengine.utils.DBException; import it.cavallium.dbengine.utils.StreamUtils; @@ -371,7 +372,7 @@ public class LLLocalDictionary implements LLDictionary { @SuppressWarnings("DuplicatedCode") @Override public Buf update(Buf key, - BinarySerializationFunction updater, + SerializationFunction<@Nullable Buf, @Nullable Buf> updater, UpdateReturnMode updateReturnMode) { assert !LLUtils.isInNonBlockingThread() : "Called update in a nonblocking thread"; if (updateMode == UpdateMode.DISALLOW) { @@ -409,7 +410,7 @@ public class LLLocalDictionary implements LLDictionary { @SuppressWarnings("DuplicatedCode") @Override - public LLDelta updateAndGetDelta(Buf key, BinarySerializationFunction updater) { + public LLDelta updateAndGetDelta(Buf key, SerializationFunction<@Nullable Buf, @Nullable Buf> updater) { assert !LLUtils.isInNonBlockingThread() : "Called update in a nonblocking thread"; if (updateMode == UpdateMode.DISALLOW) { throw new UnsupportedOperationException("update() is disallowed"); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index 2e592fb..0f6cb04 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.utils.DBException; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -89,7 +90,7 @@ public class LLLocalSingleton implements LLSingleton { } @Override - public Buf update(BinarySerializationFunction updater, + public Buf update(SerializationFunction<@Nullable Buf, @Nullable Buf> updater, UpdateReturnMode updateReturnMode) { if (LLUtils.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called update in a nonblocking thread"); @@ -111,7 +112,7 @@ public class LLLocalSingleton implements LLSingleton { } @Override - public LLDelta updateAndGetDelta(BinarySerializationFunction updater) { + public LLDelta updateAndGetDelta(SerializationFunction<@Nullable Buf, @Nullable Buf> updater) { if (LLUtils.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called update in a nonblocking thread"); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java index 8f535ba..ad22f95 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java @@ -7,6 +7,7 @@ import io.micrometer.core.instrument.MeterRegistry; import it.cavallium.buffer.Buf; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.ExponentialPageLimits; import it.cavallium.dbengine.utils.DBException; import java.io.IOException; @@ -14,6 +15,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.StampedLock; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.ReadOptions; @@ -75,7 +77,7 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn updater, UpdateAtomicResultMode returnMode) { long initNanoTime = System.nanoTime(); try { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java index 54b4993..778fd28 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java @@ -6,10 +6,12 @@ import io.micrometer.core.instrument.MeterRegistry; import it.cavallium.buffer.Buf; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.utils.DBException; import java.io.IOException; import java.util.concurrent.locks.StampedLock; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; @@ -46,7 +48,7 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn updater, UpdateAtomicResultMode returnMode) { long initNanoTime = System.nanoTime(); try { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java index 164d1eb..285ccde 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java @@ -5,6 +5,7 @@ import it.cavallium.buffer.Buf; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.io.IOException; import java.util.List; import org.jetbrains.annotations.NotNull; @@ -55,7 +56,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions, Buf key, - BinarySerializationFunction updater, + SerializationFunction<@Nullable Buf, @Nullable Buf> updater, UpdateAtomicResultMode returnMode); void delete(WriteOptions writeOptions, Buf key) throws RocksDBException; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java index 8055369..1837339 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java @@ -6,6 +6,7 @@ import io.micrometer.core.instrument.MeterRegistry; import it.cavallium.buffer.Buf; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.utils.DBException; import java.io.IOException; import java.util.concurrent.locks.StampedLock; @@ -43,7 +44,7 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions, Buf key, - BinarySerializationFunction updater, + SerializationFunction<@Nullable Buf, @Nullable Buf> updater, UpdateAtomicResultMode returnMode) { long initNanoTime = System.nanoTime(); try { diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java index 1cfbefb..d3e8c1b 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java @@ -16,6 +16,7 @@ import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure; import it.cavallium.dbengine.rpc.current.data.LuceneOptions; import it.cavallium.dbengine.rpc.current.data.LuceneOptionsBuilder; import java.util.List; +import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.jetbrains.annotations.Nullable; @@ -74,4 +75,12 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { public void disconnect() { connected.compareAndSet(true, false); } + + @Override + public String toString() { + return new StringJoiner(", ", LLMemoryDatabaseConnection.class.getSimpleName() + "[", "]") + .add("connected=" + connected) + .add("meterRegistry=" + meterRegistry) + .toString(); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index 43b6b95..d8d8670 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -16,9 +16,9 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.OptionalBuf; import it.cavallium.dbengine.database.SerializedKey; import it.cavallium.dbengine.database.UpdateMode; -import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.utils.DBException; import java.io.IOException; import java.util.List; @@ -176,7 +176,7 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public LLDelta updateAndGetDelta(Buf key, BinarySerializationFunction updater) { + public LLDelta updateAndGetDelta(Buf key, SerializationFunction<@Nullable Buf, @Nullable Buf> updater) { if (updateMode == UpdateMode.DISALLOW) { throw new UnsupportedOperationException("update() is disallowed"); } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java index 5dd81a2..86a2eee 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java @@ -6,7 +6,7 @@ import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.UpdateReturnMode; -import it.cavallium.dbengine.database.disk.BinarySerializationFunction; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.nio.charset.StandardCharsets; import org.jetbrains.annotations.Nullable; @@ -38,13 +38,13 @@ public class LLMemorySingleton implements LLSingleton { } @Override - public Buf update(BinarySerializationFunction updater, + public Buf update(SerializationFunction<@Nullable Buf, @Nullable Buf> updater, UpdateReturnMode updateReturnMode) { return dict.update(singletonName, updater, updateReturnMode); } @Override - public LLDelta updateAndGetDelta(BinarySerializationFunction updater) { + public LLDelta updateAndGetDelta(SerializationFunction<@Nullable Buf, @Nullable Buf> updater) { return dict.updateAndGetDelta(singletonName, updater); }