Optimize database
This commit is contained in:
parent
468886d154
commit
a83f1ff1a6
@ -5,6 +5,7 @@ import org.jetbrains.annotations.Nullable;
|
||||
|
||||
public class Delta<T> {
|
||||
|
||||
private static final Delta<?> EMPTY = new Delta<>(null, null);
|
||||
private final @Nullable T previous;
|
||||
private final @Nullable T current;
|
||||
|
||||
@ -25,6 +26,11 @@ public class Delta<T> {
|
||||
return current;
|
||||
}
|
||||
|
||||
public static <X> Delta<X> empty() {
|
||||
//noinspection unchecked
|
||||
return (Delta<X>) EMPTY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this)
|
||||
|
@ -2,8 +2,8 @@ package it.cavallium.dbengine.database;
|
||||
|
||||
import it.cavallium.buffer.Buf;
|
||||
import it.cavallium.dbengine.client.BadBlock;
|
||||
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
@ -23,12 +23,12 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
|
||||
|
||||
UpdateMode getUpdateMode();
|
||||
|
||||
default Buf update(Buf key, BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) {
|
||||
default Buf update(Buf key, SerializationFunction<@Nullable Buf, @Nullable Buf> updater, UpdateReturnMode updateReturnMode) {
|
||||
LLDelta prev = this.updateAndGetDelta(key, updater);
|
||||
return LLUtils.resolveLLDelta(prev, updateReturnMode);
|
||||
}
|
||||
|
||||
LLDelta updateAndGetDelta(Buf key, BinarySerializationFunction updater);
|
||||
LLDelta updateAndGetDelta(Buf key, SerializationFunction<@Nullable Buf, @Nullable Buf> updater);
|
||||
|
||||
void clear();
|
||||
|
||||
|
@ -26,6 +26,7 @@ import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -175,4 +176,16 @@ public class LLMultiDatabaseConnection implements LLDatabaseConnection {
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ", LLMultiDatabaseConnection.class.getSimpleName() + "[", "]")
|
||||
.add("databaseShardConnections=" + databaseShardConnections)
|
||||
.add("luceneShardConnections=" + luceneShardConnections)
|
||||
.add("allConnections=" + allConnections)
|
||||
.add("defaultDatabaseConnection=" + defaultDatabaseConnection)
|
||||
.add("defaultLuceneConnection=" + defaultLuceneConnection)
|
||||
.add("anyConnection=" + anyConnection)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package it.cavallium.dbengine.database;
|
||||
|
||||
import it.cavallium.buffer.Buf;
|
||||
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import java.io.IOException;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
@ -11,12 +11,12 @@ public interface LLSingleton extends LLKeyValueDatabaseStructure {
|
||||
|
||||
void set(Buf value);
|
||||
|
||||
default Buf update(BinarySerializationFunction updater, UpdateReturnMode updateReturnMode) {
|
||||
default Buf update(SerializationFunction<@Nullable Buf, @Nullable Buf> updater, UpdateReturnMode updateReturnMode) {
|
||||
var prev = this.updateAndGetDelta(updater);
|
||||
return LLUtils.resolveLLDelta(prev, updateReturnMode);
|
||||
}
|
||||
|
||||
LLDelta updateAndGetDelta(BinarySerializationFunction updater);
|
||||
LLDelta updateAndGetDelta(SerializationFunction<@Nullable Buf, @Nullable Buf> updater);
|
||||
|
||||
String getColumnName();
|
||||
|
||||
|
@ -5,7 +5,6 @@ import it.cavallium.buffer.BufDataInput;
|
||||
import it.cavallium.buffer.BufDataOutput;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.Delta;
|
||||
import it.cavallium.dbengine.database.LLDelta;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.LLDictionaryResultType;
|
||||
import it.cavallium.dbengine.database.LLEntry;
|
||||
@ -15,13 +14,12 @@ import it.cavallium.dbengine.database.SerializedKey;
|
||||
import it.cavallium.dbengine.database.SubStageEntry;
|
||||
import it.cavallium.dbengine.database.UpdateMode;
|
||||
import it.cavallium.dbengine.database.UpdateReturnMode;
|
||||
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
|
||||
import it.cavallium.dbengine.database.disk.CachedSerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationException;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import it.cavallium.dbengine.utils.DBException;
|
||||
import it.cavallium.dbengine.utils.StreamUtils;
|
||||
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
|
||||
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
|
||||
@ -32,7 +30,6 @@ import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -108,6 +105,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
return stagesFlux.map(Entry::getKey);
|
||||
}
|
||||
|
||||
private U deserializeValue(Buf value) {
|
||||
return valueSerializer.deserialize(BufDataInput.create(value));
|
||||
}
|
||||
|
||||
private @Nullable U deserializeValue(T keySuffix, BufDataInput value) {
|
||||
try {
|
||||
return valueSerializer.deserialize(value);
|
||||
@ -261,31 +262,21 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
UpdateReturnMode updateReturnMode,
|
||||
SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
var keyMono = serializeKeySuffixToKey(keySuffix);
|
||||
var result = dictionary.update(keyMono, getSerializedUpdater(updater), updateReturnMode);
|
||||
return deserializeValue(keySuffix, BufDataInput.create(result));
|
||||
var serializedUpdater = getSerializedUpdater(updater);
|
||||
dictionary.update(keyMono, serializedUpdater, UpdateReturnMode.NOTHING);
|
||||
return serializedUpdater.getResult(updateReturnMode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Delta<U> updateValueAndGetDelta(T keySuffix, SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
var keyMono = serializeKeySuffixToKey(keySuffix);
|
||||
LLDelta delta = dictionary.updateAndGetDelta(keyMono, getSerializedUpdater(updater));
|
||||
return LLUtils.mapLLDelta(delta, in -> valueSerializer.deserialize(BufDataInput.create(in)));
|
||||
var serializedUpdater = getSerializedUpdater(updater);
|
||||
dictionary.update(keyMono, serializedUpdater, UpdateReturnMode.NOTHING);
|
||||
return serializedUpdater.getDelta();
|
||||
}
|
||||
|
||||
public BinarySerializationFunction getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
return oldSerialized -> {
|
||||
U result;
|
||||
if (oldSerialized == null) {
|
||||
result = updater.apply(null);
|
||||
} else {
|
||||
result = updater.apply(valueSerializer.deserialize(BufDataInput.create(oldSerialized)));
|
||||
}
|
||||
if (result == null) {
|
||||
return null;
|
||||
} else {
|
||||
return serializeValue(result);
|
||||
}
|
||||
};
|
||||
public CachedSerializationFunction<U, Buf, Buf> getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
return new CachedSerializationFunction<>(updater, this::serializeValue, this::deserializeValue);
|
||||
}
|
||||
|
||||
public KVSerializationFunction<@NotNull T, @Nullable Buf, @Nullable Buf> getSerializedUpdater(
|
||||
|
@ -12,7 +12,7 @@ import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.UpdateReturnMode;
|
||||
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
|
||||
import it.cavallium.dbengine.database.disk.CachedSerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationException;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
@ -89,31 +89,20 @@ public final class DatabaseMapSingle<U> implements DatabaseStageEntry<U> {
|
||||
|
||||
@Override
|
||||
public U update(SerializationFunction<@Nullable U, @Nullable U> updater, UpdateReturnMode updateReturnMode) {
|
||||
Buf resultBytes = dictionary.update(key, this.createUpdater(updater), updateReturnMode);
|
||||
return resultBytes != null ? deserializeValue(resultBytes) : null;
|
||||
var serializedUpdater = createUpdater(updater);
|
||||
dictionary.update(key, serializedUpdater, UpdateReturnMode.NOTHING);
|
||||
return serializedUpdater.getResult(updateReturnMode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Delta<U> updateAndGetDelta(SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
var delta = dictionary.updateAndGetDelta(key, this.createUpdater(updater));
|
||||
return LLUtils.mapLLDelta(delta, bytes -> serializer.deserialize(BufDataInput.create(bytes)));
|
||||
var serializedUpdater = createUpdater(updater);
|
||||
dictionary.update(key, serializedUpdater, UpdateReturnMode.NOTHING);
|
||||
return serializedUpdater.getDelta();
|
||||
}
|
||||
|
||||
private BinarySerializationFunction createUpdater(SerializationFunction<U, U> updater) {
|
||||
return oldBytes -> {
|
||||
U result;
|
||||
if (oldBytes == null) {
|
||||
result = updater.apply(null);
|
||||
} else {
|
||||
U deserializedValue = serializer.deserialize(BufDataInput.create(oldBytes));
|
||||
result = updater.apply(deserializedValue);
|
||||
}
|
||||
if (result == null) {
|
||||
return null;
|
||||
} else {
|
||||
return serializeValue(result);
|
||||
}
|
||||
};
|
||||
private CachedSerializationFunction<U, Buf, Buf> createUpdater(SerializationFunction<U, U> updater) {
|
||||
return new CachedSerializationFunction<>(updater, this::serializeValue, this::deserializeValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -4,8 +4,8 @@ import it.cavallium.dbengine.client.BadBlock;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.client.Mapper;
|
||||
import it.cavallium.dbengine.database.Delta;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.UpdateReturnMode;
|
||||
import it.cavallium.dbengine.database.disk.CachedSerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationException;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import java.util.stream.Stream;
|
||||
@ -63,28 +63,16 @@ public class DatabaseSingleMapped<A, B> implements DatabaseStageEntry<A> {
|
||||
|
||||
@Override
|
||||
public A update(SerializationFunction<@Nullable A, @Nullable A> updater, UpdateReturnMode updateReturnMode) {
|
||||
B prev = serializedSingle.update(oldValue -> {
|
||||
var result = updater.apply(oldValue == null ? null : this.unMap(oldValue));
|
||||
if (result == null) {
|
||||
return null;
|
||||
} else {
|
||||
return this.map(result);
|
||||
}
|
||||
}, updateReturnMode);
|
||||
return prev != null ? unMap(prev) : null;
|
||||
var mappedUpdater = new CachedSerializationFunction<>(updater, this::map, this::unMap);
|
||||
serializedSingle.update(mappedUpdater, UpdateReturnMode.NOTHING);
|
||||
return mappedUpdater.getResult(updateReturnMode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Delta<A> updateAndGetDelta(SerializationFunction<@Nullable A, @Nullable A> updater) {
|
||||
var delta = serializedSingle.updateAndGetDelta(oldValue -> {
|
||||
var result = updater.apply(oldValue == null ? null : this.unMap(oldValue));
|
||||
if (result == null) {
|
||||
return null;
|
||||
} else {
|
||||
return this.map(result);
|
||||
}
|
||||
});
|
||||
return LLUtils.mapDelta(delta, this::unMap);
|
||||
var mappedUpdater = new CachedSerializationFunction<>(updater, this::map, this::unMap);
|
||||
serializedSingle.update(mappedUpdater, UpdateReturnMode.NOTHING);
|
||||
return mappedUpdater.getDelta();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -8,8 +8,8 @@ import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.Delta;
|
||||
import it.cavallium.dbengine.database.LLSingleton;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.UpdateReturnMode;
|
||||
import it.cavallium.dbengine.database.disk.CachedSerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationException;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
@ -83,43 +83,17 @@ public class DatabaseSingleton<U> implements DatabaseStageEntry<U> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public U update(SerializationFunction<@Nullable U, @Nullable U> updater,
|
||||
UpdateReturnMode updateReturnMode) {
|
||||
Buf resultBuf = singleton
|
||||
.update((oldValueSer) -> {
|
||||
U result;
|
||||
if (oldValueSer == null) {
|
||||
result = updater.apply(null);
|
||||
} else {
|
||||
U deserializedValue = serializer.deserialize(BufDataInput.create(oldValueSer));
|
||||
result = updater.apply(deserializedValue);
|
||||
}
|
||||
if (result == null) {
|
||||
return null;
|
||||
} else {
|
||||
return serializeValue(result);
|
||||
}
|
||||
}, updateReturnMode);
|
||||
return this.deserializeValue(resultBuf);
|
||||
public U update(SerializationFunction<@Nullable U, @Nullable U> updater, UpdateReturnMode updateReturnMode) {
|
||||
var serializedUpdater = new CachedSerializationFunction<>(updater, this::serializeValue, this::deserializeValue);
|
||||
singleton.update(serializedUpdater, UpdateReturnMode.NOTHING);
|
||||
return serializedUpdater.getResult(updateReturnMode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Delta<U> updateAndGetDelta(SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
var mono = singleton.updateAndGetDelta((oldValueSer) -> {
|
||||
U result;
|
||||
if (oldValueSer == null) {
|
||||
result = updater.apply(null);
|
||||
} else {
|
||||
U deserializedValue = serializer.deserialize(BufDataInput.create(oldValueSer));
|
||||
result = updater.apply(deserializedValue);
|
||||
}
|
||||
if (result == null) {
|
||||
return null;
|
||||
} else {
|
||||
return serializeValue(result);
|
||||
}
|
||||
});
|
||||
return LLUtils.mapLLDelta(mono, serialized -> serializer.deserialize(BufDataInput.create(serialized)));
|
||||
var serializedUpdater = new CachedSerializationFunction<>(updater, this::serializeValue, this::deserializeValue);
|
||||
singleton.update(serializedUpdater, UpdateReturnMode.NOTHING);
|
||||
return serializedUpdater.getDelta();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -11,6 +11,7 @@ import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.RepeatedElementList;
|
||||
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@ -503,7 +504,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
public final @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Buf key,
|
||||
BinarySerializationFunction updater,
|
||||
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
|
||||
UpdateAtomicResultMode returnMode) {
|
||||
var closeReadLock = closeLock.readLock();
|
||||
try {
|
||||
@ -542,7 +543,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
protected abstract @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Buf key,
|
||||
BinarySerializationFunction updater,
|
||||
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
|
||||
UpdateAtomicResultMode returnMode);
|
||||
|
||||
@Override
|
||||
|
@ -1,7 +0,0 @@
|
||||
package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import it.cavallium.buffer.Buf;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
public interface BinarySerializationFunction extends SerializationFunction<@Nullable Buf, @Nullable Buf> {}
|
@ -0,0 +1,66 @@
|
||||
package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import it.cavallium.dbengine.database.Delta;
|
||||
import it.cavallium.dbengine.database.UpdateReturnMode;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationException;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
public class CachedSerializationFunction<U, A, B> implements SerializationFunction<A, B> {
|
||||
|
||||
private final SerializationFunction<U, U> updater;
|
||||
private final Function<U, B> serializer;
|
||||
private final Function<A, U> deserializer;
|
||||
|
||||
private @Nullable U prevDeserialized;
|
||||
private @Nullable U currDeserialized;
|
||||
|
||||
public CachedSerializationFunction(SerializationFunction<@Nullable U, @Nullable U> updater,
|
||||
Function<@NotNull U, @Nullable B> serializer,
|
||||
Function<@NotNull A, @Nullable U> deserializer) {
|
||||
this.updater = updater;
|
||||
this.serializer = serializer;
|
||||
this.deserializer = deserializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable B apply(@Nullable A oldSerialized) throws SerializationException {
|
||||
U result;
|
||||
U old;
|
||||
if (oldSerialized == null) {
|
||||
old = null;
|
||||
} else {
|
||||
old = deserializer.apply(oldSerialized);
|
||||
}
|
||||
this.prevDeserialized = old;
|
||||
result = updater.apply(old);
|
||||
this.currDeserialized = result;
|
||||
if (result == null) {
|
||||
return null;
|
||||
} else {
|
||||
return serializer.apply(result);
|
||||
}
|
||||
}
|
||||
|
||||
public @Nullable U getPrevDeserialized() {
|
||||
return prevDeserialized;
|
||||
}
|
||||
|
||||
public @Nullable U getCurrDeserialized() {
|
||||
return currDeserialized;
|
||||
}
|
||||
|
||||
public Delta<U> getDelta() {
|
||||
return new Delta<>(prevDeserialized, currDeserialized);
|
||||
}
|
||||
|
||||
public U getResult(UpdateReturnMode updateReturnMode) {
|
||||
return switch (updateReturnMode) {
|
||||
case GET_NEW_VALUE -> currDeserialized;
|
||||
case GET_OLD_VALUE -> prevDeserialized;
|
||||
case NOTHING -> null;
|
||||
};
|
||||
}
|
||||
}
|
@ -16,6 +16,7 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
@ -103,4 +104,14 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
|
||||
if (connected.compareAndSet(true, false)) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ", LLLocalDatabaseConnection.class.getSimpleName() + "[", "]")
|
||||
.add("connected=" + connected)
|
||||
.add("meterRegistry=" + meterRegistry)
|
||||
.add("basePath=" + basePath)
|
||||
.add("inMemory=" + inMemory)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ import it.cavallium.dbengine.database.SerializedKey;
|
||||
import it.cavallium.dbengine.database.UpdateMode;
|
||||
import it.cavallium.dbengine.database.UpdateReturnMode;
|
||||
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
|
||||
import it.cavallium.dbengine.utils.DBException;
|
||||
import it.cavallium.dbengine.utils.StreamUtils;
|
||||
@ -371,7 +372,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
@SuppressWarnings("DuplicatedCode")
|
||||
@Override
|
||||
public Buf update(Buf key,
|
||||
BinarySerializationFunction updater,
|
||||
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
|
||||
UpdateReturnMode updateReturnMode) {
|
||||
assert !LLUtils.isInNonBlockingThread() : "Called update in a nonblocking thread";
|
||||
if (updateMode == UpdateMode.DISALLOW) {
|
||||
@ -409,7 +410,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
|
||||
@SuppressWarnings("DuplicatedCode")
|
||||
@Override
|
||||
public LLDelta updateAndGetDelta(Buf key, BinarySerializationFunction updater) {
|
||||
public LLDelta updateAndGetDelta(Buf key, SerializationFunction<@Nullable Buf, @Nullable Buf> updater) {
|
||||
assert !LLUtils.isInNonBlockingThread() : "Called update in a nonblocking thread";
|
||||
if (updateMode == UpdateMode.DISALLOW) {
|
||||
throw new UnsupportedOperationException("update() is disallowed");
|
||||
|
@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLSingleton;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.UpdateReturnMode;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import it.cavallium.dbengine.utils.DBException;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@ -89,7 +90,7 @@ public class LLLocalSingleton implements LLSingleton {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buf update(BinarySerializationFunction updater,
|
||||
public Buf update(SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
|
||||
UpdateReturnMode updateReturnMode) {
|
||||
if (LLUtils.isInNonBlockingThread()) {
|
||||
throw new UnsupportedOperationException("Called update in a nonblocking thread");
|
||||
@ -111,7 +112,7 @@ public class LLLocalSingleton implements LLSingleton {
|
||||
}
|
||||
|
||||
@Override
|
||||
public LLDelta updateAndGetDelta(BinarySerializationFunction updater) {
|
||||
public LLDelta updateAndGetDelta(SerializationFunction<@Nullable Buf, @Nullable Buf> updater) {
|
||||
if (LLUtils.isInNonBlockingThread()) {
|
||||
throw new UnsupportedOperationException("Called update in a nonblocking thread");
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import io.micrometer.core.instrument.MeterRegistry;
|
||||
import it.cavallium.buffer.Buf;
|
||||
import it.cavallium.dbengine.database.LLDelta;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import it.cavallium.dbengine.lucene.ExponentialPageLimits;
|
||||
import it.cavallium.dbengine.utils.DBException;
|
||||
import java.io.IOException;
|
||||
@ -14,6 +15,7 @@ import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import java.util.concurrent.locks.StampedLock;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.rocksdb.ColumnFamilyHandle;
|
||||
import org.rocksdb.OptimisticTransactionDB;
|
||||
import org.rocksdb.ReadOptions;
|
||||
@ -75,7 +77,7 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
|
||||
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Buf key,
|
||||
BinarySerializationFunction updater,
|
||||
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
|
||||
UpdateAtomicResultMode returnMode) {
|
||||
long initNanoTime = System.nanoTime();
|
||||
try {
|
||||
|
@ -6,10 +6,12 @@ import io.micrometer.core.instrument.MeterRegistry;
|
||||
import it.cavallium.buffer.Buf;
|
||||
import it.cavallium.dbengine.database.LLDelta;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import it.cavallium.dbengine.utils.DBException;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.locks.StampedLock;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.rocksdb.ColumnFamilyHandle;
|
||||
import org.rocksdb.ReadOptions;
|
||||
import org.rocksdb.RocksDBException;
|
||||
@ -46,7 +48,7 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
|
||||
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Buf key,
|
||||
BinarySerializationFunction updater,
|
||||
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
|
||||
UpdateAtomicResultMode returnMode) {
|
||||
long initNanoTime = System.nanoTime();
|
||||
try {
|
||||
|
@ -5,6 +5,7 @@ import it.cavallium.buffer.Buf;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
@ -55,7 +56,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
|
||||
@NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Buf key,
|
||||
BinarySerializationFunction updater,
|
||||
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
|
||||
UpdateAtomicResultMode returnMode);
|
||||
|
||||
void delete(WriteOptions writeOptions, Buf key) throws RocksDBException;
|
||||
|
@ -6,6 +6,7 @@ import io.micrometer.core.instrument.MeterRegistry;
|
||||
import it.cavallium.buffer.Buf;
|
||||
import it.cavallium.dbengine.database.LLDelta;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import it.cavallium.dbengine.utils.DBException;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.locks.StampedLock;
|
||||
@ -43,7 +44,7 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
|
||||
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Buf key,
|
||||
BinarySerializationFunction updater,
|
||||
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
|
||||
UpdateAtomicResultMode returnMode) {
|
||||
long initNanoTime = System.nanoTime();
|
||||
try {
|
||||
|
@ -16,6 +16,7 @@ import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure;
|
||||
import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
|
||||
import it.cavallium.dbengine.rpc.current.data.LuceneOptionsBuilder;
|
||||
import java.util.List;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
@ -74,4 +75,12 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection {
|
||||
public void disconnect() {
|
||||
connected.compareAndSet(true, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ", LLMemoryDatabaseConnection.class.getSimpleName() + "[", "]")
|
||||
.add("connected=" + connected)
|
||||
.add("meterRegistry=" + meterRegistry)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
@ -16,9 +16,9 @@ import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.OptionalBuf;
|
||||
import it.cavallium.dbengine.database.SerializedKey;
|
||||
import it.cavallium.dbengine.database.UpdateMode;
|
||||
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationException;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import it.cavallium.dbengine.utils.DBException;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
@ -176,7 +176,7 @@ public class LLMemoryDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
@Override
|
||||
public LLDelta updateAndGetDelta(Buf key, BinarySerializationFunction updater) {
|
||||
public LLDelta updateAndGetDelta(Buf key, SerializationFunction<@Nullable Buf, @Nullable Buf> updater) {
|
||||
if (updateMode == UpdateMode.DISALLOW) {
|
||||
throw new UnsupportedOperationException("update() is disallowed");
|
||||
}
|
||||
|
@ -6,7 +6,7 @@ import it.cavallium.dbengine.database.LLDictionaryResultType;
|
||||
import it.cavallium.dbengine.database.LLSingleton;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.UpdateReturnMode;
|
||||
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
@ -38,13 +38,13 @@ public class LLMemorySingleton implements LLSingleton {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buf update(BinarySerializationFunction updater,
|
||||
public Buf update(SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
|
||||
UpdateReturnMode updateReturnMode) {
|
||||
return dict.update(singletonName, updater, updateReturnMode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LLDelta updateAndGetDelta(BinarySerializationFunction updater) {
|
||||
public LLDelta updateAndGetDelta(SerializationFunction<@Nullable Buf, @Nullable Buf> updater) {
|
||||
return dict.updateAndGetDelta(singletonName, updater);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user