Implement singletons

This commit is contained in:
Andrea Cavalli 2022-03-20 14:33:27 +01:00
parent e866241ff1
commit de5be6564e
25 changed files with 697 additions and 165 deletions

View File

@ -113,6 +113,9 @@ versions:
javaClass: it.cavallium.dbengine.database.LLSnapshot
serializer: it.cavallium.dbengine.database.remote.LLSnapshotSerializer
Bytes:
javaClass: it.unimi.dsi.fastutil.bytes.ByteList
serializer: it.cavallium.dbengine.database.remote.ByteListSerializer
StringMap:
javaClass: java.util.Map<java.lang.String, java.lang.String>
serializer: it.cavallium.dbengine.database.remote.StringMapSerializer
@ -150,7 +153,7 @@ versions:
databaseId: long
singletonListColumnName: byte[]
name: byte[]
defaultValue: byte[]
defaultValue: -Bytes
SingletonGet:
data:
singletonId: long
@ -158,7 +161,7 @@ versions:
SingletonSet:
data:
singletonId: long
value: byte[]
value: -Bytes
SingletonUpdateInit:
data:
singletonId: long

View File

@ -7,13 +7,14 @@ import io.netty5.buffer.api.BufferAllocator;
import it.cavallium.dbengine.client.MemoryStats;
import it.cavallium.dbengine.database.collections.DatabaseInt;
import it.cavallium.dbengine.database.collections.DatabaseLong;
import it.cavallium.dbengine.rpc.current.data.Column;
import it.cavallium.dbengine.database.collections.DatabaseSingleton;
import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseStructure {
Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue);
Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte @Nullable[] defaultValue);
Mono<? extends LLDictionary> getDictionary(byte[] columnName, UpdateMode updateMode);
@ -26,6 +27,13 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS
return getDictionary(ColumnUtils.dictionary(name).name().getBytes(StandardCharsets.US_ASCII), updateMode);
}
default Mono<? extends LLSingleton> getSingleton(String singletonListName, String name) {
return getSingleton(ColumnUtils.special(singletonListName).name().getBytes(StandardCharsets.US_ASCII),
name.getBytes(StandardCharsets.US_ASCII),
null
);
}
default Mono<DatabaseInt> getInteger(String singletonListName, String name, int defaultValue) {
return this
.getSingleton(ColumnUtils.special(singletonListName).name().getBytes(StandardCharsets.US_ASCII),

View File

@ -14,10 +14,20 @@ public interface LLSingleton extends LLKeyValueDatabaseStructure {
BufferAllocator getAllocator();
Mono<byte[]> get(@Nullable LLSnapshot snapshot);
Mono<Send<Buffer>> get(@Nullable LLSnapshot snapshot);
Mono<Void> set(byte[] value);
Mono<Void> set(Mono<Send<Buffer>> value);
Mono<Send<Buffer>> update(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
UpdateReturnMode updateReturnMode);
default Mono<Send<Buffer>> update(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
UpdateReturnMode updateReturnMode) {
return this
.updateAndGetDelta(updater)
.transform(prev -> LLUtils.resolveLLDelta(prev, updateReturnMode));
}
Mono<Send<LLDelta>> updateAndGetDelta(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater);
String getColumnName();
String getName();
}

View File

@ -84,6 +84,7 @@ public class LLUtils {
private static final byte[] RESPONSE_FALSE_BUF = new byte[]{0};
public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1];
public static final AtomicBoolean hookRegistered = new AtomicBoolean();
public static final boolean MANUAL_READAHEAD = false;
static {
for (int i1 = 0; i1 < 256; i1++) {
@ -730,7 +731,9 @@ public class LLUtils {
readOptions = new ReadOptions();
}
if (!closedRange) {
readOptions.setReadaheadSize(32 * 1024); // 32KiB
if (LLUtils.MANUAL_READAHEAD) {
readOptions.setReadaheadSize(32 * 1024); // 32KiB
}
readOptions.setFillCache(false);
readOptions.setVerifyChecksums(false);
} else {

View File

@ -2,14 +2,9 @@ package it.cavallium.dbengine.database.collections;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class DatabaseEmpty {
@ -40,7 +35,7 @@ public class DatabaseEmpty {
}
public static DatabaseStageEntry<Nothing> create(LLDictionary dictionary, Buffer key, Runnable onClose) {
return new DatabaseSingle<>(dictionary, key, nothingSerializer(dictionary.getAllocator()), onClose);
return new DatabaseMapSingle<>(dictionary, key, nothingSerializer(dictionary.getAllocator()), onClose);
}
public static final class Nothing {

View File

@ -4,23 +4,38 @@ import com.google.common.primitives.Ints;
import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
public class DatabaseInt implements LLKeyValueDatabaseStructure {
private final LLSingleton singleton;
private final SerializerFixedBinaryLength<Integer> serializer;
public DatabaseInt(LLSingleton singleton) {
this.singleton = singleton;
this.serializer = SerializerFixedBinaryLength.intSerializer(singleton.getAllocator());
}
public Mono<Integer> get(@Nullable LLSnapshot snapshot) {
return singleton.get(snapshot).map(Ints::fromByteArray);
return singleton.get(snapshot).handle((dataSend, sink) -> {
try (var data = dataSend.receive()) {
sink.next(serializer.deserialize(data));
} catch (SerializationException e) {
sink.error(e);
}
});
}
public Mono<Void> set(int value) {
return singleton.set(Ints.toByteArray(value));
return singleton.set(Mono.fromCallable(() -> {
try (var buf = singleton.getAllocator().allocate(Integer.BYTES)) {
serializer.serialize(value, buf);
return buf.send();
}
}));
}
@Override

View File

@ -6,23 +6,33 @@ import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
public class DatabaseLong implements LLKeyValueDatabaseStructure {
private final LLSingleton singleton;
private final SerializerFixedBinaryLength<Long> serializer;
private final SerializerFixedBinaryLength<Integer> bugSerializer;
public DatabaseLong(LLSingleton singleton) {
this.singleton = singleton;
this.serializer = SerializerFixedBinaryLength.longSerializer(singleton.getAllocator());
this.bugSerializer = SerializerFixedBinaryLength.intSerializer(singleton.getAllocator());
}
public Mono<Long> get(@Nullable LLSnapshot snapshot) {
return singleton.get(snapshot).map(array -> {
if (array.length == 4) {
return (long) Ints.fromByteArray(array);
} else {
return Longs.fromByteArray(array);
return singleton.get(snapshot).handle((dataSend, sink) -> {
try (var data = dataSend.receive()) {
if (data.readableBytes() == 4) {
sink.next((long) (int) bugSerializer.deserialize(data));
} else {
sink.next(serializer.deserialize(data));
}
} catch (SerializationException e) {
sink.error(e);
}
});
}
@ -75,7 +85,12 @@ public class DatabaseLong implements LLKeyValueDatabaseStructure {
}
public Mono<Void> set(long value) {
return singleton.set(Longs.toByteArray(value));
return singleton.set(Mono.fromCallable(() -> {
try (var buf = singleton.getAllocator().allocate(Long.BYTES)) {
serializer.serialize(value, buf);
return buf.send();
}
}));
}
@Override

View File

@ -70,6 +70,20 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer, onClose);
}
public static <K, V> Flux<Entry<K, V>> getLeavesFrom(DatabaseMapDictionary<K, V> databaseMapDictionary,
CompositeSnapshot snapshot,
Mono<K> prevMono) {
Mono<Optional<K>> prevOptMono = prevMono.map(Optional::of).defaultIfEmpty(Optional.empty());
return prevOptMono.flatMapMany(prevOpt -> {
if (prevOpt.isPresent()) {
return databaseMapDictionary.getAllValues(snapshot, prevOpt.get());
} else {
return databaseMapDictionary.getAllValues(snapshot);
}
});
}
private void deserializeValue(T keySuffix, Send<Buffer> valueToReceive, SynchronousSink<U> sink) {
try (var value = valueToReceive.receive()) {
try {
@ -213,7 +227,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return Mono.fromCallable(() ->
new DatabaseSingle<>(dictionary, serializeKeySuffixToKey(keySuffix), valueSerializer, null));
new DatabaseMapSingle<>(dictionary, serializeKeySuffixToKey(keySuffix), valueSerializer, null));
}
@Override
@ -456,7 +470,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
try (var keyBufCopy = keyBuf.copy()) {
keySuffix = deserializeSuffix(keyBufCopy);
}
var subStage = new DatabaseSingle<>(dictionary, toKey(keyBuf), valueSerializer, null);
var subStage = new DatabaseMapSingle<>(dictionary, toKey(keyBuf), valueSerializer, null);
sink.next(Map.entry(keySuffix, subStage));
} catch (Throwable ex) {
keyBuf.close();
@ -467,8 +481,30 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot) {
return getAllValues(snapshot, rangeMono);
}
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot, @Nullable T from) {
if (from == null) {
return getAllValues(snapshot);
} else {
Mono<Send<LLRange>> boundedRangeMono = rangeMono.flatMap(fullRangeSend -> Mono.fromCallable(() -> {
try (var fullRange = fullRangeSend.receive()) {
try (var keyWithoutExtBuf = keyPrefix == null ? alloc.allocate(keySuffixLength + keyExtLength)
: keyPrefix.copy()) {
keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength);
serializeSuffix(from, keyWithoutExtBuf);
return LLRange.of(keyWithoutExtBuf.send(), fullRange.getMax()).send();
}
}
}));
return getAllValues(snapshot, boundedRangeMono);
}
}
private Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot, Mono<Send<LLRange>> sliceRangeMono) {
return dictionary
.getRange(resolveSnapshot(snapshot), rangeMono)
.getRange(resolveSnapshot(snapshot), sliceRangeMono)
.<Entry<T, U>>handle((serializedEntryToReceive, sink) -> {
try {
Entry<T, U> entry;

View File

@ -16,11 +16,17 @@ import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.function.TriFunction;
import org.apache.logging.log4j.LogManager;
@ -82,7 +88,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
};
protected final LLDictionary dictionary;
private final BufferAllocator alloc;
protected final BufferAllocator alloc;
private final AtomicLong totalZeroBytesErrors = new AtomicLong();
protected final SubStageGetter<U, US> subStageGetter;
protected final SerializerFixedBinaryLength<T> keySuffixSerializer;
@ -491,54 +497,110 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
this.onClose = null;
}
public static <K1, K2, V, R> Flux<R> getAllLeaves2(DatabaseMapDictionaryDeep<K1, Object2ObjectSortedMap<K2, V>, DatabaseMapDictionary<K2, V>> deepMap,
public static <K1, K2, V, R> Flux<R> getAllLeaves2(DatabaseMapDictionaryDeep<K1, Object2ObjectSortedMap<K2, V>, ? extends DatabaseStageMap<K2, V, DatabaseStageEntry<V>>> deepMap,
CompositeSnapshot snapshot,
TriFunction<K1, K2, V, R> merger) {
if (deepMap.subStageGetter instanceof SubStageGetterMap<K2, V> subStageGetterMap) {
var keySuffix1Serializer = deepMap.keySuffixSerializer;
var keySuffix2Serializer = subStageGetterMap.keySerializer;
var valueSerializer = subStageGetterMap.valueSerializer;
return deepMap
.dictionary
.getRange(deepMap.resolveSnapshot(snapshot), deepMap.rangeMono)
.handle((entrySend, sink) -> {
K1 key1 = null;
K2 key2 = null;
try (var entry = entrySend.receive()) {
var keyBuf = entry.getKeyUnsafe();
var valueBuf = entry.getValueUnsafe();
try {
assert keyBuf != null;
keyBuf.skipReadable(deepMap.keyPrefixLength);
try (var key1Buf = keyBuf.split(deepMap.keySuffixLength)) {
key1 = keySuffix1Serializer.deserialize(key1Buf);
}
key2 = keySuffix2Serializer.deserialize(keyBuf);
assert valueBuf != null;
var value = valueSerializer.deserialize(valueBuf);
sink.next(merger.apply(key1, key2, value));
} catch (IndexOutOfBoundsException ex) {
var exMessage = ex.getMessage();
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
var totalZeroBytesErrors = deepMap.totalZeroBytesErrors.incrementAndGet();
if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) {
LOG.error("Unexpected zero-bytes value at " + deepMap.dictionary.getDatabaseName()
+ ":" + deepMap.dictionary.getColumnName()
+ ":[" + key1
+ ":" + key2
+ "](" + LLUtils.toStringSafe(keyBuf) + ") total=" + totalZeroBytesErrors);
}
sink.complete();
} else {
sink.error(ex);
}
}
} catch (SerializationException ex) {
sink.error(ex);
}
});
TriFunction<K1, K2, V, R> merger,
@NotNull Mono<K1> savedProgressKey1) {
var keySuffix1Serializer = deepMap.keySuffixSerializer;
SerializerFixedBinaryLength<?> keySuffix2Serializer;
Serializer<?> valueSerializer;
boolean isHashed;
boolean isHashedSet;
if (deepMap.subStageGetter instanceof SubStageGetterMap subStageGetterMap) {
isHashed = false;
isHashedSet = false;
keySuffix2Serializer = subStageGetterMap.keySerializer;
valueSerializer = subStageGetterMap.valueSerializer;
} else if (deepMap.subStageGetter instanceof SubStageGetterHashMap subStageGetterHashMap) {
isHashed = true;
isHashedSet = false;
keySuffix2Serializer = subStageGetterHashMap.keyHashSerializer;
//noinspection unchecked
ValueWithHashSerializer<K2, V> valueWithHashSerializer = new ValueWithHashSerializer<>(
(Serializer<K2>) subStageGetterHashMap.keySerializer,
(Serializer<V>) subStageGetterHashMap.valueSerializer
);
valueSerializer = new ValuesSetSerializer<>(valueWithHashSerializer);
} else if (deepMap.subStageGetter instanceof SubStageGetterHashSet subStageGetterHashSet) {
isHashed = true;
isHashedSet = true;
keySuffix2Serializer = subStageGetterHashSet.keyHashSerializer;
//noinspection unchecked
valueSerializer = new ValuesSetSerializer<K2>(subStageGetterHashSet.keySerializer);
} else {
throw new IllegalArgumentException();
}
var savedProgressKey1Opt = savedProgressKey1.map(Optional::of).defaultIfEmpty(Optional.empty());
return deepMap
.dictionary
.getRange(deepMap.resolveSnapshot(snapshot), Mono.zip(savedProgressKey1Opt, deepMap.rangeMono).handle((tuple, sink) -> {
var firstKey = tuple.getT1();
try (var fullRange = tuple.getT2().receive()) {
if (firstKey.isPresent()) {
try (var key1Buf = deepMap.alloc.allocate(keySuffix1Serializer.getSerializedBinaryLength())) {
keySuffix1Serializer.serialize(firstKey.get(), key1Buf);
sink.next(LLRange.of(key1Buf.send(), fullRange.getMax()).send());
} catch (SerializationException e) {
sink.error(e);
}
} else {
sink.next(fullRange.send());
}
}
}))
.concatMapIterable(entrySend -> {
K1 key1 = null;
Object key2 = null;
try (var entry = entrySend.receive()) {
var keyBuf = entry.getKeyUnsafe();
var valueBuf = entry.getValueUnsafe();
try {
assert keyBuf != null;
keyBuf.skipReadable(deepMap.keyPrefixLength);
try (var key1Buf = keyBuf.split(deepMap.keySuffixLength)) {
key1 = keySuffix1Serializer.deserialize(key1Buf);
}
key2 = keySuffix2Serializer.deserialize(keyBuf);
assert valueBuf != null;
Object value = valueSerializer.deserialize(valueBuf);
if (isHashedSet) {
//noinspection unchecked
Set<K2> set = (Set<K2>) value;
K1 finalKey1 = key1;
//noinspection unchecked
return set.stream().map(e -> merger.apply(finalKey1, e, (V) Nothing.INSTANCE)).toList();
} else if (isHashed) {
//noinspection unchecked
Set<Entry<K2, V>> set = (Set<Entry<K2, V>>) value;
K1 finalKey1 = key1;
return set.stream().map(e -> merger.apply(finalKey1, e.getKey(), e.getValue())).toList();
} else {
//noinspection unchecked
return List.of(merger.apply(key1, (K2) key2, (V) value));
}
} catch (IndexOutOfBoundsException ex) {
var exMessage = ex.getMessage();
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
var totalZeroBytesErrors = deepMap.totalZeroBytesErrors.incrementAndGet();
if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) {
LOG.error("Unexpected zero-bytes value at " + deepMap.dictionary.getDatabaseName()
+ ":" + deepMap.dictionary.getColumnName()
+ ":[" + key1
+ ":" + key2
+ "](" + LLUtils.toStringSafe(keyBuf) + ") total=" + totalZeroBytesErrors);
}
return List.of();
} else {
throw ex;
}
}
} catch (SerializationException ex) {
throw new CompletionException(ex);
}
});
}
}

View File

@ -10,7 +10,6 @@ import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
@ -26,15 +25,15 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, DatabaseSingle<U>> implements
public class DatabaseMapSingle<U> extends ResourceSupport<DatabaseStage<U>, DatabaseMapSingle<U>> implements
DatabaseStageEntry<U> {
private static final Logger LOG = LogManager.getLogger(DatabaseSingle.class);
private static final Logger LOG = LogManager.getLogger(DatabaseMapSingle.class);
private final AtomicLong totalZeroBytesErrors = new AtomicLong();
private static final Drop<DatabaseSingle<?>> DROP = new Drop<>() {
private static final Drop<DatabaseMapSingle<?>> DROP = new Drop<>() {
@Override
public void drop(DatabaseSingle<?> obj) {
public void drop(DatabaseMapSingle<?> obj) {
try {
obj.key.close();
} catch (Throwable ex) {
@ -46,12 +45,12 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
}
@Override
public Drop<DatabaseSingle<?>> fork() {
public Drop<DatabaseMapSingle<?>> fork() {
return this;
}
@Override
public void attach(DatabaseSingle<?> obj) {
public void attach(DatabaseMapSingle<?> obj) {
}
};
@ -64,9 +63,9 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
private Runnable onClose;
@SuppressWarnings({"unchecked", "rawtypes"})
public DatabaseSingle(LLDictionary dictionary, Buffer key, Serializer<U> serializer,
public DatabaseMapSingle(LLDictionary dictionary, Buffer key, Serializer<U> serializer,
Runnable onClose) {
super((Drop<DatabaseSingle<U>>) (Drop) DROP);
super((Drop<DatabaseMapSingle<U>>) (Drop) DROP);
this.dictionary = dictionary;
this.key = key;
this.keyMono = LLUtils.lazyRetain(this.key);
@ -211,12 +210,12 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
}
@Override
protected Owned<DatabaseSingle<U>> prepareSend() {
protected Owned<DatabaseMapSingle<U>> prepareSend() {
var keySend = this.key.send();
var onClose = this.onClose;
return drop -> {
var key = keySend.receive();
var instance = new DatabaseSingle<>(dictionary, key, serializer, onClose);
var instance = new DatabaseMapSingle<>(dictionary, key, serializer, onClose);
drop.attach(instance);
return instance;
};

View File

@ -0,0 +1,213 @@
package it.cavallium.dbengine.database.collections;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.database.serialization.Serializer;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
public class DatabaseSingleton<U> extends ResourceSupport<DatabaseStage<U>, DatabaseSingleton<U>> implements
DatabaseStageEntry<U> {
private static final Logger LOG = LogManager.getLogger(DatabaseSingleton.class);
private static final Drop<DatabaseSingleton<?>> DROP = new Drop<>() {
@Override
public void drop(DatabaseSingleton<?> obj) {
if (obj.onClose != null) {
obj.onClose.run();
}
}
@Override
public Drop<DatabaseSingleton<?>> fork() {
return this;
}
@Override
public void attach(DatabaseSingleton<?> obj) {
}
};
private final LLSingleton singleton;
private final Serializer<U> serializer;
private Runnable onClose;
@SuppressWarnings({"unchecked", "rawtypes"})
public DatabaseSingleton(LLSingleton singleton, Serializer<U> serializer,
Runnable onClose) {
super((Drop<DatabaseSingleton<U>>) (Drop) DROP);
this.singleton = singleton;
this.serializer = serializer;
this.onClose = onClose;
}
private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {
if (snapshot == null) {
return null;
} else {
return snapshot.getSnapshot(singleton);
}
}
private void deserializeValue(Send<Buffer> value, SynchronousSink<U> sink) {
try {
U deserializedValue;
try (var valueBuf = value.receive()) {
deserializedValue = serializer.deserialize(valueBuf);
}
sink.next(deserializedValue);
} catch (IndexOutOfBoundsException ex) {
var exMessage = ex.getMessage();
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
LOG.error("Unexpected zero-bytes value at "
+ singleton.getDatabaseName() + ":" + singleton.getColumnName() + ":" + singleton.getName());
sink.complete();
} else {
sink.error(ex);
}
} catch (SerializationException ex) {
sink.error(ex);
}
}
private Send<Buffer> serializeValue(U value) throws SerializationException {
var valSizeHint = serializer.getSerializedSizeHint();
if (valSizeHint == -1) valSizeHint = 128;
try (var valBuf = singleton.getAllocator().allocate(valSizeHint)) {
serializer.serialize(value, valBuf);
return valBuf.send();
}
}
@Override
public Mono<U> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
return singleton.get(resolveSnapshot(snapshot))
.handle(this::deserializeValue);
}
@Override
public Mono<U> setAndGetPrevious(U value) {
return Flux
.concat(singleton.get(null), singleton.set(Mono.fromCallable(() -> serializeValue(value))).then(Mono.empty()))
.singleOrEmpty()
.handle(this::deserializeValue);
}
@Override
public Mono<U> update(SerializationFunction<@Nullable U, @Nullable U> updater,
UpdateReturnMode updateReturnMode) {
return singleton
.update((oldValueSer) -> {
try (oldValueSer) {
U result;
if (oldValueSer == null) {
result = updater.apply(null);
} else {
U deserializedValue;
try (var valueBuf = oldValueSer.receive()) {
deserializedValue = serializer.deserialize(valueBuf);
}
result = updater.apply(deserializedValue);
}
if (result == null) {
return null;
} else {
return serializeValue(result).receive();
}
}
}, updateReturnMode)
.handle(this::deserializeValue);
}
@Override
public Mono<Delta<U>> updateAndGetDelta(SerializationFunction<@Nullable U, @Nullable U> updater) {
return singleton
.updateAndGetDelta((oldValueSer) -> {
try (oldValueSer) {
U result;
if (oldValueSer == null) {
result = updater.apply(null);
} else {
U deserializedValue;
try (var valueBuf = oldValueSer.receive()) {
deserializedValue = serializer.deserialize(valueBuf);
}
result = updater.apply(deserializedValue);
}
if (result == null) {
return null;
} else {
return serializeValue(result).receive();
}
}
}).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> {
try (var valueBuf = serialized.receive()) {
return serializer.deserialize(valueBuf);
}
}));
}
@Override
public Mono<U> clearAndGetPrevious() {
return Flux
.concat(singleton.get(null), singleton.set(Mono.empty()).then(Mono.empty()))
.singleOrEmpty()
.handle(this::deserializeValue);
}
@Override
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return singleton.get(null).map(unused -> 1L).defaultIfEmpty(0L);
}
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return singleton.get(null).map(t -> false).defaultIfEmpty(true);
}
@Override
public Flux<BadBlock> badBlocks() {
return Flux.empty();
}
@Override
protected RuntimeException createResourceClosedException() {
throw new IllegalStateException("Closed");
}
@Override
protected Owned<DatabaseSingleton<U>> prepareSend() {
var onClose = this.onClose;
return drop -> {
var instance = new DatabaseSingleton<>(singleton, serializer, onClose);
drop.attach(instance);
return instance;
};
}
@Override
protected void makeInaccessible() {
this.onClose = null;
}
}

View File

@ -18,10 +18,10 @@ import reactor.core.publisher.Mono;
public class SubStageGetterHashMap<T, U, TH> implements
SubStageGetter<Object2ObjectSortedMap<T, U>, DatabaseMapDictionaryHashed<T, U, TH>> {
private final Serializer<T> keySerializer;
private final Serializer<U> valueSerializer;
private final Function<T, TH> keyHashFunction;
private final SerializerFixedBinaryLength<TH> keyHashSerializer;
final Serializer<T> keySerializer;
final Serializer<U> valueSerializer;
final Function<T, TH> keyHashFunction;
final SerializerFixedBinaryLength<TH> keyHashSerializer;
public SubStageGetterHashMap(Serializer<T> keySerializer,
Serializer<U> valueSerializer,

View File

@ -19,9 +19,9 @@ import reactor.core.publisher.Mono;
public class SubStageGetterHashSet<T, TH> implements
SubStageGetter<Object2ObjectSortedMap<T, Nothing>, DatabaseSetDictionaryHashed<T, TH>> {
private final Serializer<T> keySerializer;
private final Function<T, TH> keyHashFunction;
private final SerializerFixedBinaryLength<TH> keyHashSerializer;
final Serializer<T> keySerializer;
final Function<T, TH> keyHashFunction;
final SerializerFixedBinaryLength<TH> keyHashSerializer;
public SubStageGetterHashSet(Serializer<T> keySerializer,
Function<T, TH> keyHashFunction,

View File

@ -20,7 +20,7 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
public Mono<DatabaseStageEntry<T>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
Mono<Send<Buffer>> keyPrefixMono) {
return keyPrefixMono.map(keyPrefix -> new DatabaseSingle<>(dictionary, keyPrefix.receive(), serializer, null));
return keyPrefixMono.map(keyPrefix -> new DatabaseMapSingle<>(dictionary, keyPrefix.receive(), serializer, null));
}
}

View File

@ -84,7 +84,7 @@ public class LLLocalDictionary implements LLDictionary {
* It used to be false,
* now it's true to avoid crashes during iterations on completely corrupted files
*/
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = true;
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false;
/**
* Default: true. Use false to debug problems with windowing.
*/
@ -874,7 +874,9 @@ public class LLLocalDictionary implements LLDictionary {
try (var ro = new ReadOptions(getReadOptions(null))) {
ro.setFillCache(false);
if (!range.isSingle()) {
ro.setReadaheadSize(32 * 1024);
if (LLUtils.MANUAL_READAHEAD) {
ro.setReadaheadSize(32 * 1024);
}
}
ro.setVerifyChecksums(true);
try (var rocksIteratorTuple = getRocksIterator(nettyDirect, ro, range, db)) {
@ -1294,7 +1296,9 @@ public class LLLocalDictionary implements LLDictionary {
// readOpts.setIgnoreRangeDeletions(true);
readOpts.setFillCache(false);
readOpts.setReadaheadSize(32 * 1024); // 32KiB
if (LLUtils.MANUAL_READAHEAD) {
readOpts.setReadaheadSize(32 * 1024); // 32KiB
}
try (CappedWriteBatch writeBatch = new CappedWriteBatch(db,
alloc,
CAPPED_WRITE_BATCH_CAP,
@ -1565,7 +1569,9 @@ public class LLLocalDictionary implements LLDictionary {
}
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setFillCache(false);
readOpts.setReadaheadSize(128 * 1024); // 128KiB
if (LLUtils.MANUAL_READAHEAD) {
readOpts.setReadaheadSize(128 * 1024); // 128KiB
}
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
if (PARALLEL_EXACT_SIZE) {

View File

@ -195,7 +195,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
tableOptions.setOptimizeFiltersForMemory(true);
}
tableOptions
.setChecksumType(ChecksumType.kxxHash64)
.setChecksumType(ChecksumType.kCRC32c)
.setBlockCacheCompressed(optionsWithCache.compressedCache())
.setBlockCache(optionsWithCache.standardCache())
.setBlockSize(16 * 1024); // 16KiB
@ -561,13 +561,16 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
@Override
public Mono<LLLocalSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) {
public Mono<LLLocalSingleton> getSingleton(byte[] singletonListColumnName,
byte[] name,
byte @Nullable[] defaultValue) {
return Mono
.fromCallable(() -> new LLLocalSingleton(
getRocksDBColumn(db, getCfh(singletonListColumnName)),
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
LLLocalKeyValueDatabase.this.name,
name,
ColumnUtils.toString(singletonListColumnName),
dbScheduler,
defaultValue
))

View File

@ -3,22 +3,18 @@ package it.cavallium.dbengine.database.disk;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteOptions;
@ -33,6 +29,7 @@ public class LLLocalSingleton implements LLSingleton {
private final RocksDBColumn db;
private final Function<LLSnapshot, Snapshot> snapshotResolver;
private final byte[] name;
private final String columnName;
private final Mono<Send<Buffer>> nameMono;
private final String databaseName;
private final Scheduler dbScheduler;
@ -41,12 +38,14 @@ public class LLLocalSingleton implements LLSingleton {
Function<LLSnapshot, Snapshot> snapshotResolver,
String databaseName,
byte[] name,
String columnName,
Scheduler dbScheduler,
byte[] defaultValue) throws RocksDBException {
byte @Nullable [] defaultValue) throws RocksDBException {
this.db = db;
this.databaseName = databaseName;
this.snapshotResolver = snapshotResolver;
this.name = name;
this.columnName = columnName;
this.nameMono = Mono.fromCallable(() -> {
var alloc = db.getAllocator();
try (var nameBuf = alloc.allocate(this.name.length)) {
@ -58,7 +57,7 @@ public class LLLocalSingleton implements LLSingleton {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Initialized in a nonblocking thread");
}
if (db.get(EMPTY_READ_OPTIONS, this.name, true) == null) {
if (defaultValue != null && db.get(EMPTY_READ_OPTIONS, this.name, true) == null) {
db.put(EMPTY_WRITE_OPTIONS, this.name, defaultValue);
}
}
@ -81,30 +80,45 @@ public class LLLocalSingleton implements LLSingleton {
}
@Override
public Mono<byte[]> get(@Nullable LLSnapshot snapshot) {
return Mono
.fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called get in a nonblocking thread");
}
return db.get(resolveSnapshot(snapshot), name, true);
})
.onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause))
.subscribeOn(dbScheduler);
public Mono<Send<Buffer>> get(@Nullable LLSnapshot snapshot) {
return nameMono.publishOn(Schedulers.boundedElastic()).handle((nameSend, sink) -> {
try (Buffer name = nameSend.receive()) {
Buffer result = db.get(resolveSnapshot(snapshot), name);
if (result != null) {
sink.next(result.send());
} else {
sink.complete();
}
} catch (RocksDBException ex) {
sink.error(new IOException("Failed to read " + Arrays.toString(name), ex));
}
});
}
@Override
public Mono<Void> set(byte[] value) {
return Mono
.<Void>fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called set in a nonblocking thread");
}
public Mono<Void> set(Mono<Send<Buffer>> valueMono) {
return Mono.zip(nameMono, valueMono).publishOn(Schedulers.boundedElastic()).handle((tuple, sink) -> {
var nameSend = tuple.getT1();
var valueSend = tuple.getT2();
try (Buffer name = nameSend.receive()) {
try (Buffer value = valueSend.receive()) {
db.put(EMPTY_WRITE_OPTIONS, name, value);
return null;
})
.onErrorMap(cause -> new IOException("Failed to write " + Arrays.toString(name), cause))
.subscribeOn(dbScheduler);
sink.next(true);
}
} catch (RocksDBException ex) {
sink.error(new IOException("Failed to write " + Arrays.toString(name), ex));
}
}).switchIfEmpty(unset().thenReturn(true)).then();
}
private Mono<Void> unset() {
return nameMono.publishOn(Schedulers.boundedElastic()).handle((nameSend, sink) -> {
try (Buffer name = nameSend.receive()) {
db.delete(EMPTY_WRITE_OPTIONS, name);
} catch (RocksDBException ex) {
sink.error(new IOException("Failed to read " + Arrays.toString(name), ex));
}
});
}
@Override
@ -130,8 +144,31 @@ public class LLLocalSingleton implements LLSingleton {
keySend -> Mono.fromRunnable(keySend::close));
}
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater) {
return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
UpdateAtomicResult result
= db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater, UpdateAtomicResultMode.DELTA);
return ((UpdateAtomicResultDelta) result).delta();
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close));
}
@Override
public String getDatabaseName() {
return databaseName;
}
@Override
public String getColumnName() {
return columnName;
}
@Override
public String getName() {
return new String(name);
}
}

View File

@ -16,6 +16,7 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
@ -48,7 +49,9 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
}
@Override
public Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName, byte[] singletonName, byte[] defaultValue) {
public Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName,
byte[] singletonName,
byte @Nullable[] defaultValue) {
var columnNameString = new String(singletonListColumnName, StandardCharsets.UTF_8);
var dict = singletons.computeIfAbsent(columnNameString, _unused -> new LLMemoryDictionary(allocator,
name,
@ -58,9 +61,17 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
mainDb
));
return Mono
.fromCallable(() -> new LLMemorySingleton(dict, singletonName)).flatMap(singleton -> singleton
.fromCallable(() -> new LLMemorySingleton(dict, columnNameString, singletonName)).flatMap(singleton -> singleton
.get(null)
.switchIfEmpty(singleton.set(defaultValue).then(Mono.empty()))
.transform(mono -> {
if (defaultValue != null) {
return mono.switchIfEmpty(singleton
.set(Mono.fromSupplier(() -> allocator.copyOf(defaultValue).send()))
.then(Mono.empty()));
} else {
return mono;
}
})
.thenReturn(singleton)
);
}

View File

@ -8,30 +8,21 @@ import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.disk.UpdateAtomicResult;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultMode;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class LLMemorySingleton implements LLSingleton {
private final LLMemoryDictionary dict;
private final String columnNameString;
private final byte[] singletonName;
private final Mono<Send<Buffer>> singletonNameBufMono;
public LLMemorySingleton(LLMemoryDictionary dict, byte[] singletonName) {
public LLMemorySingleton(LLMemoryDictionary dict, String columnNameString, byte[] singletonName) {
this.dict = dict;
this.columnNameString = columnNameString;
this.singletonName = singletonName;
this.singletonNameBufMono = Mono.fromCallable(() -> dict
.getAllocator()
@ -51,22 +42,15 @@ public class LLMemorySingleton implements LLSingleton {
}
@Override
public Mono<byte[]> get(@Nullable LLSnapshot snapshot) {
return dict
.get(snapshot, singletonNameBufMono, false)
.map(b -> {
try (var buf = b.receive()) {
return LLUtils.toArray(buf);
}
});
public Mono<Send<Buffer>> get(@Nullable LLSnapshot snapshot) {
return dict.get(snapshot, singletonNameBufMono, false);
}
@Override
public Mono<Void> set(byte[] value) {
public Mono<Void> set(Mono<Send<Buffer>> value) {
var bbKey = singletonNameBufMono;
var bbVal = Mono.fromCallable(() -> dict.getAllocator().allocate(value.length).writeBytes(value).send());
return dict
.put(bbKey, bbVal, LLDictionaryResultType.VOID)
.put(bbKey, value, LLDictionaryResultType.VOID)
.then();
}
@ -75,4 +59,19 @@ public class LLMemorySingleton implements LLSingleton {
UpdateReturnMode updateReturnMode) {
return dict.update(singletonNameBufMono, updater, updateReturnMode);
}
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater) {
return dict.updateAndGetDelta(singletonNameBufMono, updater);
}
@Override
public String getColumnName() {
return columnNameString;
}
@Override
public String getName() {
return new String(singletonName);
}
}

View File

@ -0,0 +1,30 @@
package it.cavallium.dbengine.database.remote;
import it.cavallium.data.generator.DataSerializer;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.jetbrains.annotations.NotNull;
public class ByteListSerializer implements DataSerializer<ByteList> {
@Override
public void serialize(DataOutput dataOutput, @NotNull ByteList bytes) throws IOException {
dataOutput.writeInt(bytes.size());
for (Byte aByte : bytes) {
dataOutput.writeByte(aByte);
}
}
@Override
public @NotNull ByteList deserialize(DataInput dataInput) throws IOException {
var size = dataInput.readInt();
var bal = new ByteArrayList(size);
for (int i = 0; i < size; i++) {
bal.add(dataInput.readByte());
}
return bal;
}
}

View File

@ -2,15 +2,16 @@ package it.cavallium.dbengine.database.remote;
import com.google.common.collect.Multimap;
import io.micrometer.core.instrument.MeterRegistry;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.incubator.codec.quic.QuicSslContextBuilder;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.Send;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.incubator.codec.quic.QuicSslContextBuilder;
import it.cavallium.dbengine.client.MemoryStats;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLIndexRequest;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
@ -26,7 +27,6 @@ import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.LuceneRocksDBManager;
import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.cavallium.dbengine.rpc.current.data.BinaryOptional;
@ -52,6 +52,7 @@ import it.cavallium.dbengine.rpc.current.data.SingletonSet;
import it.cavallium.dbengine.rpc.current.data.SingletonUpdateEnd;
import it.cavallium.dbengine.rpc.current.data.SingletonUpdateInit;
import it.cavallium.dbengine.rpc.current.data.SingletonUpdateOldData;
import it.cavallium.dbengine.rpc.current.data.nullables.NullableBytes;
import it.cavallium.dbengine.rpc.current.data.nullables.NullableLLSnapshot;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.io.File;
@ -226,11 +227,11 @@ public class LLQuicConnection implements LLDatabaseConnection {
@Override
public Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName,
byte[] name,
byte[] defaultValue) {
byte @Nullable[] defaultValue) {
return sendRequest(new GetSingleton(id,
ByteList.of(singletonListColumnName),
ByteList.of(name),
ByteList.of(defaultValue)
defaultValue == null ? NullableBytes.empty() : NullableBytes.of(ByteList.of(defaultValue))
)).cast(GeneratedEntityId.class).map(GeneratedEntityId::id).map(singletonId -> new LLSingleton() {
@Override
@ -239,17 +240,22 @@ public class LLQuicConnection implements LLDatabaseConnection {
}
@Override
public Mono<byte[]> get(@Nullable LLSnapshot snapshot) {
public Mono<Send<Buffer>> get(@Nullable LLSnapshot snapshot) {
return sendRequest(new SingletonGet(singletonId, NullableLLSnapshot.ofNullable(snapshot)))
.cast(BinaryOptional.class)
.mapNotNull(b -> b.val().getNullable())
.map(binary -> QuicUtils.toArrayNoCopy(binary.val()));
.mapNotNull(result -> {
if (result.val().isPresent()) {
return allocator.copyOf(QuicUtils.toArrayNoCopy(result.val().get().val())).send();
} else {
return null;
}
});
}
@Override
public Mono<Void> set(byte[] value) {
return sendRequest(new SingletonSet(singletonId, ByteList.of(value)))
.then();
public Mono<Void> set(Mono<Send<Buffer>> valueMono) {
return QuicUtils.toBytes(valueMono)
.flatMap(valueSendOpt -> sendRequest(new SingletonSet(singletonId, valueSendOpt)).then());
}
@Override
@ -285,10 +291,25 @@ public class LLQuicConnection implements LLDatabaseConnection {
});
}
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater) {
return Mono.error(new UnsupportedOperationException());
}
@Override
public String getDatabaseName() {
return databaseName;
}
@Override
public String getColumnName() {
return new String(singletonListColumnName);
}
@Override
public String getName() {
return new String(name);
}
});
}

View File

@ -1,12 +1,16 @@
package it.cavallium.dbengine.database.remote;
import io.netty.handler.codec.ByteToMessageCodec;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Send;
import it.cavallium.data.generator.nativedata.NullableString;
import it.cavallium.dbengine.rpc.current.data.RPCCrash;
import it.cavallium.dbengine.rpc.current.data.RPCEvent;
import it.cavallium.dbengine.rpc.current.data.nullables.NullableBytes;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.logging.Level;
@ -38,6 +42,29 @@ public class QuicUtils {
return new String(QuicUtils.toArrayNoCopy(b), StandardCharsets.UTF_8);
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public static NullableBytes toBytes(Optional<Send<Buffer>> valueSendOpt) {
if (valueSendOpt.isPresent()) {
try (var value = valueSendOpt.get().receive()) {
var bytes = new byte[value.readableBytes()];
value.copyInto(value.readerOffset(), bytes, 0, bytes.length);
return NullableBytes.ofNullable(ByteList.of(bytes));
}
} else {
return NullableBytes.empty();
}
}
public static Mono<NullableBytes> toBytes(Mono<Send<Buffer>> valueSendOptMono) {
return valueSendOptMono.map(valueSendOpt -> {
try (var value = valueSendOpt.receive()) {
var bytes = new byte[value.readableBytes()];
value.copyInto(value.readerOffset(), bytes, 0, bytes.length);
return NullableBytes.ofNullable(ByteList.of(bytes));
}
}).defaultIfEmpty(NullableBytes.empty());
}
public record QuicStream(NettyInbound in, NettyOutbound out) {}
public static Mono<RPCEvent> catchRPCErrors(@NotNull Throwable error) {

View File

@ -13,6 +13,8 @@ import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.DatabaseStageEntry;
import it.cavallium.dbengine.database.collections.DatabaseStageMap;
import it.cavallium.dbengine.database.collections.ValueGetter;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.analyzer.LegacyWordAnalyzer;
@ -230,7 +232,7 @@ public class LuceneUtils {
public static <T, U, V> ValueGetter<Entry<T, U>, V> getAsyncDbValueGetterDeep(
CompositeSnapshot snapshot,
DatabaseMapDictionaryDeep<T, Object2ObjectSortedMap<U, V>, DatabaseMapDictionary<U, V>> dictionaryDeep) {
DatabaseMapDictionaryDeep<T, Object2ObjectSortedMap<U, V>, ? extends DatabaseStageMap<U, V, ? extends DatabaseStageEntry<V>>> dictionaryDeep) {
return entry -> LLUtils.usingResource(dictionaryDeep
.at(snapshot, entry.getKey()), sub -> sub.getValue(snapshot, entry.getValue()), true);
}

View File

@ -6,6 +6,7 @@ import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.ReadableComponent;
import io.netty5.buffer.api.WritableComponent;
import it.cavallium.dbengine.database.LLUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@ -105,9 +106,11 @@ public class RocksdbFileStore {
db.put(headers, NEXT_ID_KEY, Longs.toByteArray(100));
incFlush();
}
this.itReadOpts = new ReadOptions()
.setReadaheadSize(blockSize * 4L)
.setVerifyChecksums(false)
this.itReadOpts = new ReadOptions();
if (LLUtils.MANUAL_READAHEAD) {
itReadOpts.setReadaheadSize(blockSize * 4L);
}
itReadOpts.setVerifyChecksums(false)
.setIgnoreRangeDeletions(true);
} catch (RocksDBException e) {
throw new IOException("Failed to open RocksDB meta file store", e);

View File

@ -5,10 +5,14 @@ import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
import static it.cavallium.dbengine.DbTestUtils.newAllocator;
import static it.cavallium.dbengine.DbTestUtils.tempDb;
import it.cavallium.data.generator.nativedata.StringSerializer;
import it.cavallium.dbengine.DbTestUtils.TestAllocator;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.collections.DatabaseInt;
import it.cavallium.dbengine.database.collections.DatabaseLong;
import it.cavallium.dbengine.database.collections.DatabaseSingleton;
import it.cavallium.dbengine.database.serialization.Serializer;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -85,6 +89,15 @@ public abstract class TestSingletons {
.verifyComplete();
}
@Test
public void testCreateSingleton() {
StepVerifier
.create(tempDb(getTempDbGenerator(), allocator, db -> tempSingleton(db, "testsingleton")
.flatMap(dbSingleton -> dbSingleton.get(null))
))
.verifyComplete();
}
@ParameterizedTest
@ValueSource(ints = {Integer.MIN_VALUE, -192, -2, -1, 0, 1, 2, 1292, Integer.MAX_VALUE})
public void testDefaultValueInteger(int i) {
@ -137,6 +150,21 @@ public abstract class TestSingletons {
.verifyComplete();
}
@ParameterizedTest
@MethodSource("provideLongNumberWithRepeats")
public void testSetSingleton(Long i, Integer repeats) {
StepVerifier
.create(tempDb(getTempDbGenerator(), allocator, db -> tempSingleton(db, "test")
.flatMap(dbSingleton -> Mono
.defer(() -> dbSingleton.set(Long.toString(System.currentTimeMillis())))
.repeat(repeats)
.then(dbSingleton.set(Long.toString(i)))
.then(dbSingleton.get(null)))
))
.expectNext(Long.toString(i))
.verifyComplete();
}
public static Mono<DatabaseInt> tempInt(LLKeyValueDatabase database, String name, int defaultValue) {
return database
.getInteger("ints", name, defaultValue);
@ -146,4 +174,10 @@ public abstract class TestSingletons {
return database
.getLong("longs", name, defaultValue);
}
public static Mono<DatabaseSingleton<String>> tempSingleton(LLKeyValueDatabase database, String name) {
return database
.getSingleton("longs", name)
.map(singleton -> new DatabaseSingleton<>(singleton, Serializer.UTF8_SERIALIZER, null));
}
}