diff --git a/src/main/data-generator/quic-rpc.yaml b/src/main/data-generator/quic-rpc.yaml index e4b8765..4ce590f 100644 --- a/src/main/data-generator/quic-rpc.yaml +++ b/src/main/data-generator/quic-rpc.yaml @@ -113,6 +113,9 @@ versions: javaClass: it.cavallium.dbengine.database.LLSnapshot serializer: it.cavallium.dbengine.database.remote.LLSnapshotSerializer + Bytes: + javaClass: it.unimi.dsi.fastutil.bytes.ByteList + serializer: it.cavallium.dbengine.database.remote.ByteListSerializer StringMap: javaClass: java.util.Map serializer: it.cavallium.dbengine.database.remote.StringMapSerializer @@ -150,7 +153,7 @@ versions: databaseId: long singletonListColumnName: byte[] name: byte[] - defaultValue: byte[] + defaultValue: -Bytes SingletonGet: data: singletonId: long @@ -158,7 +161,7 @@ versions: SingletonSet: data: singletonId: long - value: byte[] + value: -Bytes SingletonUpdateInit: data: singletonId: long diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java index b9bd47c..c085ad7 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java @@ -7,13 +7,14 @@ import io.netty5.buffer.api.BufferAllocator; import it.cavallium.dbengine.client.MemoryStats; import it.cavallium.dbengine.database.collections.DatabaseInt; import it.cavallium.dbengine.database.collections.DatabaseLong; -import it.cavallium.dbengine.rpc.current.data.Column; +import it.cavallium.dbengine.database.collections.DatabaseSingleton; import java.nio.charset.StandardCharsets; +import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseStructure { - Mono getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue); + Mono getSingleton(byte[] singletonListColumnName, byte[] name, byte @Nullable[] defaultValue); Mono getDictionary(byte[] columnName, UpdateMode updateMode); @@ -26,6 +27,13 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS return getDictionary(ColumnUtils.dictionary(name).name().getBytes(StandardCharsets.US_ASCII), updateMode); } + default Mono getSingleton(String singletonListName, String name) { + return getSingleton(ColumnUtils.special(singletonListName).name().getBytes(StandardCharsets.US_ASCII), + name.getBytes(StandardCharsets.US_ASCII), + null + ); + } + default Mono getInteger(String singletonListName, String name, int defaultValue) { return this .getSingleton(ColumnUtils.special(singletonListName).name().getBytes(StandardCharsets.US_ASCII), diff --git a/src/main/java/it/cavallium/dbengine/database/LLSingleton.java b/src/main/java/it/cavallium/dbengine/database/LLSingleton.java index d93b3d8..2c4822a 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/LLSingleton.java @@ -14,10 +14,20 @@ public interface LLSingleton extends LLKeyValueDatabaseStructure { BufferAllocator getAllocator(); - Mono get(@Nullable LLSnapshot snapshot); + Mono> get(@Nullable LLSnapshot snapshot); - Mono set(byte[] value); + Mono set(Mono> value); - Mono> update(SerializationFunction<@Nullable Send, @Nullable Buffer> updater, - UpdateReturnMode updateReturnMode); + default Mono> update(SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + UpdateReturnMode updateReturnMode) { + return this + .updateAndGetDelta(updater) + .transform(prev -> LLUtils.resolveLLDelta(prev, updateReturnMode)); + } + + Mono> updateAndGetDelta(SerializationFunction<@Nullable Send, @Nullable Buffer> updater); + + String getColumnName(); + + String getName(); } diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 34544db..b3e5ec8 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -84,6 +84,7 @@ public class LLUtils { private static final byte[] RESPONSE_FALSE_BUF = new byte[]{0}; public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1]; public static final AtomicBoolean hookRegistered = new AtomicBoolean(); + public static final boolean MANUAL_READAHEAD = false; static { for (int i1 = 0; i1 < 256; i1++) { @@ -730,7 +731,9 @@ public class LLUtils { readOptions = new ReadOptions(); } if (!closedRange) { - readOptions.setReadaheadSize(32 * 1024); // 32KiB + if (LLUtils.MANUAL_READAHEAD) { + readOptions.setReadaheadSize(32 * 1024); // 32KiB + } readOptions.setFillCache(false); readOptions.setVerifyChecksums(false); } else { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java index 8cec46c..ab81744 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java @@ -2,14 +2,9 @@ package it.cavallium.dbengine.database.collections; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; -import io.netty5.buffer.api.Drop; -import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLDictionary; -import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.Serializer; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; public class DatabaseEmpty { @@ -40,7 +35,7 @@ public class DatabaseEmpty { } public static DatabaseStageEntry create(LLDictionary dictionary, Buffer key, Runnable onClose) { - return new DatabaseSingle<>(dictionary, key, nothingSerializer(dictionary.getAllocator()), onClose); + return new DatabaseMapSingle<>(dictionary, key, nothingSerializer(dictionary.getAllocator()), onClose); } public static final class Nothing { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseInt.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseInt.java index adeb428..310d29a 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseInt.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseInt.java @@ -4,23 +4,38 @@ import com.google.common.primitives.Ints; import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; public class DatabaseInt implements LLKeyValueDatabaseStructure { private final LLSingleton singleton; + private final SerializerFixedBinaryLength serializer; public DatabaseInt(LLSingleton singleton) { this.singleton = singleton; + this.serializer = SerializerFixedBinaryLength.intSerializer(singleton.getAllocator()); } public Mono get(@Nullable LLSnapshot snapshot) { - return singleton.get(snapshot).map(Ints::fromByteArray); + return singleton.get(snapshot).handle((dataSend, sink) -> { + try (var data = dataSend.receive()) { + sink.next(serializer.deserialize(data)); + } catch (SerializationException e) { + sink.error(e); + } + }); } public Mono set(int value) { - return singleton.set(Ints.toByteArray(value)); + return singleton.set(Mono.fromCallable(() -> { + try (var buf = singleton.getAllocator().allocate(Integer.BYTES)) { + serializer.serialize(value, buf); + return buf.send(); + } + })); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java index 7db4049..248b45d 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java @@ -6,23 +6,33 @@ import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; public class DatabaseLong implements LLKeyValueDatabaseStructure { private final LLSingleton singleton; + private final SerializerFixedBinaryLength serializer; + private final SerializerFixedBinaryLength bugSerializer; public DatabaseLong(LLSingleton singleton) { this.singleton = singleton; + this.serializer = SerializerFixedBinaryLength.longSerializer(singleton.getAllocator()); + this.bugSerializer = SerializerFixedBinaryLength.intSerializer(singleton.getAllocator()); } public Mono get(@Nullable LLSnapshot snapshot) { - return singleton.get(snapshot).map(array -> { - if (array.length == 4) { - return (long) Ints.fromByteArray(array); - } else { - return Longs.fromByteArray(array); + return singleton.get(snapshot).handle((dataSend, sink) -> { + try (var data = dataSend.receive()) { + if (data.readableBytes() == 4) { + sink.next((long) (int) bugSerializer.deserialize(data)); + } else { + sink.next(serializer.deserialize(data)); + } + } catch (SerializationException e) { + sink.error(e); } }); } @@ -75,7 +85,12 @@ public class DatabaseLong implements LLKeyValueDatabaseStructure { } public Mono set(long value) { - return singleton.set(Longs.toByteArray(value)); + return singleton.set(Mono.fromCallable(() -> { + try (var buf = singleton.getAllocator().allocate(Long.BYTES)) { + serializer.serialize(value, buf); + return buf.send(); + } + })); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 76baec9..661eaca 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -70,6 +70,20 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep(dictionary, prefixKey, keySuffixSerializer, valueSerializer, onClose); } + public static Flux> getLeavesFrom(DatabaseMapDictionary databaseMapDictionary, + CompositeSnapshot snapshot, + Mono prevMono) { + Mono> prevOptMono = prevMono.map(Optional::of).defaultIfEmpty(Optional.empty()); + + return prevOptMono.flatMapMany(prevOpt -> { + if (prevOpt.isPresent()) { + return databaseMapDictionary.getAllValues(snapshot, prevOpt.get()); + } else { + return databaseMapDictionary.getAllValues(snapshot); + } + }); + } + private void deserializeValue(T keySuffix, Send valueToReceive, SynchronousSink sink) { try (var value = valueToReceive.receive()) { try { @@ -213,7 +227,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { return Mono.fromCallable(() -> - new DatabaseSingle<>(dictionary, serializeKeySuffixToKey(keySuffix), valueSerializer, null)); + new DatabaseMapSingle<>(dictionary, serializeKeySuffixToKey(keySuffix), valueSerializer, null)); } @Override @@ -456,7 +470,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep(dictionary, toKey(keyBuf), valueSerializer, null); + var subStage = new DatabaseMapSingle<>(dictionary, toKey(keyBuf), valueSerializer, null); sink.next(Map.entry(keySuffix, subStage)); } catch (Throwable ex) { keyBuf.close(); @@ -467,8 +481,30 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> getAllValues(@Nullable CompositeSnapshot snapshot) { + return getAllValues(snapshot, rangeMono); + } + + public Flux> getAllValues(@Nullable CompositeSnapshot snapshot, @Nullable T from) { + if (from == null) { + return getAllValues(snapshot); + } else { + Mono> boundedRangeMono = rangeMono.flatMap(fullRangeSend -> Mono.fromCallable(() -> { + try (var fullRange = fullRangeSend.receive()) { + try (var keyWithoutExtBuf = keyPrefix == null ? alloc.allocate(keySuffixLength + keyExtLength) + : keyPrefix.copy()) { + keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength); + serializeSuffix(from, keyWithoutExtBuf); + return LLRange.of(keyWithoutExtBuf.send(), fullRange.getMax()).send(); + } + } + })); + return getAllValues(snapshot, boundedRangeMono); + } + } + + private Flux> getAllValues(@Nullable CompositeSnapshot snapshot, Mono> sliceRangeMono) { return dictionary - .getRange(resolveSnapshot(snapshot), rangeMono) + .getRange(resolveSnapshot(snapshot), sliceRangeMono) .>handle((serializedEntryToReceive, sink) -> { try { Entry entry; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 20e1022..20a5139 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -16,11 +16,17 @@ import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing; import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.function.TriFunction; import org.apache.logging.log4j.LogManager; @@ -82,7 +88,7 @@ public class DatabaseMapDictionaryDeep> extend }; protected final LLDictionary dictionary; - private final BufferAllocator alloc; + protected final BufferAllocator alloc; private final AtomicLong totalZeroBytesErrors = new AtomicLong(); protected final SubStageGetter subStageGetter; protected final SerializerFixedBinaryLength keySuffixSerializer; @@ -491,54 +497,110 @@ public class DatabaseMapDictionaryDeep> extend this.onClose = null; } - public static Flux getAllLeaves2(DatabaseMapDictionaryDeep, DatabaseMapDictionary> deepMap, + public static Flux getAllLeaves2(DatabaseMapDictionaryDeep, ? extends DatabaseStageMap>> deepMap, CompositeSnapshot snapshot, - TriFunction merger) { - if (deepMap.subStageGetter instanceof SubStageGetterMap subStageGetterMap) { - var keySuffix1Serializer = deepMap.keySuffixSerializer; - var keySuffix2Serializer = subStageGetterMap.keySerializer; - var valueSerializer = subStageGetterMap.valueSerializer; - return deepMap - .dictionary - .getRange(deepMap.resolveSnapshot(snapshot), deepMap.rangeMono) - .handle((entrySend, sink) -> { - K1 key1 = null; - K2 key2 = null; - try (var entry = entrySend.receive()) { - var keyBuf = entry.getKeyUnsafe(); - var valueBuf = entry.getValueUnsafe(); - try { - assert keyBuf != null; - keyBuf.skipReadable(deepMap.keyPrefixLength); - try (var key1Buf = keyBuf.split(deepMap.keySuffixLength)) { - key1 = keySuffix1Serializer.deserialize(key1Buf); - } - key2 = keySuffix2Serializer.deserialize(keyBuf); - assert valueBuf != null; - var value = valueSerializer.deserialize(valueBuf); - sink.next(merger.apply(key1, key2, value)); - } catch (IndexOutOfBoundsException ex) { - var exMessage = ex.getMessage(); - if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { - var totalZeroBytesErrors = deepMap.totalZeroBytesErrors.incrementAndGet(); - if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) { - LOG.error("Unexpected zero-bytes value at " + deepMap.dictionary.getDatabaseName() - + ":" + deepMap.dictionary.getColumnName() - + ":[" + key1 - + ":" + key2 - + "](" + LLUtils.toStringSafe(keyBuf) + ") total=" + totalZeroBytesErrors); - } - sink.complete(); - } else { - sink.error(ex); - } - } - } catch (SerializationException ex) { - sink.error(ex); - } - }); + TriFunction merger, + @NotNull Mono savedProgressKey1) { + var keySuffix1Serializer = deepMap.keySuffixSerializer; + SerializerFixedBinaryLength keySuffix2Serializer; + Serializer valueSerializer; + boolean isHashed; + boolean isHashedSet; + if (deepMap.subStageGetter instanceof SubStageGetterMap subStageGetterMap) { + isHashed = false; + isHashedSet = false; + keySuffix2Serializer = subStageGetterMap.keySerializer; + valueSerializer = subStageGetterMap.valueSerializer; + } else if (deepMap.subStageGetter instanceof SubStageGetterHashMap subStageGetterHashMap) { + isHashed = true; + isHashedSet = false; + keySuffix2Serializer = subStageGetterHashMap.keyHashSerializer; + + //noinspection unchecked + ValueWithHashSerializer valueWithHashSerializer = new ValueWithHashSerializer<>( + (Serializer) subStageGetterHashMap.keySerializer, + (Serializer) subStageGetterHashMap.valueSerializer + ); + valueSerializer = new ValuesSetSerializer<>(valueWithHashSerializer); + } else if (deepMap.subStageGetter instanceof SubStageGetterHashSet subStageGetterHashSet) { + isHashed = true; + isHashedSet = true; + keySuffix2Serializer = subStageGetterHashSet.keyHashSerializer; + + //noinspection unchecked + valueSerializer = new ValuesSetSerializer(subStageGetterHashSet.keySerializer); } else { throw new IllegalArgumentException(); } + + var savedProgressKey1Opt = savedProgressKey1.map(Optional::of).defaultIfEmpty(Optional.empty()); + + return deepMap + .dictionary + .getRange(deepMap.resolveSnapshot(snapshot), Mono.zip(savedProgressKey1Opt, deepMap.rangeMono).handle((tuple, sink) -> { + var firstKey = tuple.getT1(); + try (var fullRange = tuple.getT2().receive()) { + if (firstKey.isPresent()) { + try (var key1Buf = deepMap.alloc.allocate(keySuffix1Serializer.getSerializedBinaryLength())) { + keySuffix1Serializer.serialize(firstKey.get(), key1Buf); + sink.next(LLRange.of(key1Buf.send(), fullRange.getMax()).send()); + } catch (SerializationException e) { + sink.error(e); + } + } else { + sink.next(fullRange.send()); + } + } + })) + .concatMapIterable(entrySend -> { + K1 key1 = null; + Object key2 = null; + try (var entry = entrySend.receive()) { + var keyBuf = entry.getKeyUnsafe(); + var valueBuf = entry.getValueUnsafe(); + try { + assert keyBuf != null; + keyBuf.skipReadable(deepMap.keyPrefixLength); + try (var key1Buf = keyBuf.split(deepMap.keySuffixLength)) { + key1 = keySuffix1Serializer.deserialize(key1Buf); + } + key2 = keySuffix2Serializer.deserialize(keyBuf); + assert valueBuf != null; + Object value = valueSerializer.deserialize(valueBuf); + if (isHashedSet) { + //noinspection unchecked + Set set = (Set) value; + K1 finalKey1 = key1; + //noinspection unchecked + return set.stream().map(e -> merger.apply(finalKey1, e, (V) Nothing.INSTANCE)).toList(); + } else if (isHashed) { + //noinspection unchecked + Set> set = (Set>) value; + K1 finalKey1 = key1; + return set.stream().map(e -> merger.apply(finalKey1, e.getKey(), e.getValue())).toList(); + } else { + //noinspection unchecked + return List.of(merger.apply(key1, (K2) key2, (V) value)); + } + } catch (IndexOutOfBoundsException ex) { + var exMessage = ex.getMessage(); + if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { + var totalZeroBytesErrors = deepMap.totalZeroBytesErrors.incrementAndGet(); + if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) { + LOG.error("Unexpected zero-bytes value at " + deepMap.dictionary.getDatabaseName() + + ":" + deepMap.dictionary.getColumnName() + + ":[" + key1 + + ":" + key2 + + "](" + LLUtils.toStringSafe(keyBuf) + ") total=" + totalZeroBytesErrors); + } + return List.of(); + } else { + throw ex; + } + } + } catch (SerializationException ex) { + throw new CompletionException(ex); + } + }); } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java similarity index 91% rename from src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java rename to src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java index 1898c3e..3834301 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java @@ -10,7 +10,6 @@ import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; -import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; @@ -26,15 +25,15 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.SynchronousSink; -public class DatabaseSingle extends ResourceSupport, DatabaseSingle> implements +public class DatabaseMapSingle extends ResourceSupport, DatabaseMapSingle> implements DatabaseStageEntry { - private static final Logger LOG = LogManager.getLogger(DatabaseSingle.class); + private static final Logger LOG = LogManager.getLogger(DatabaseMapSingle.class); private final AtomicLong totalZeroBytesErrors = new AtomicLong(); - private static final Drop> DROP = new Drop<>() { + private static final Drop> DROP = new Drop<>() { @Override - public void drop(DatabaseSingle obj) { + public void drop(DatabaseMapSingle obj) { try { obj.key.close(); } catch (Throwable ex) { @@ -46,12 +45,12 @@ public class DatabaseSingle extends ResourceSupport, Databas } @Override - public Drop> fork() { + public Drop> fork() { return this; } @Override - public void attach(DatabaseSingle obj) { + public void attach(DatabaseMapSingle obj) { } }; @@ -64,9 +63,9 @@ public class DatabaseSingle extends ResourceSupport, Databas private Runnable onClose; @SuppressWarnings({"unchecked", "rawtypes"}) - public DatabaseSingle(LLDictionary dictionary, Buffer key, Serializer serializer, + public DatabaseMapSingle(LLDictionary dictionary, Buffer key, Serializer serializer, Runnable onClose) { - super((Drop>) (Drop) DROP); + super((Drop>) (Drop) DROP); this.dictionary = dictionary; this.key = key; this.keyMono = LLUtils.lazyRetain(this.key); @@ -211,12 +210,12 @@ public class DatabaseSingle extends ResourceSupport, Databas } @Override - protected Owned> prepareSend() { + protected Owned> prepareSend() { var keySend = this.key.send(); var onClose = this.onClose; return drop -> { var key = keySend.receive(); - var instance = new DatabaseSingle<>(dictionary, key, serializer, onClose); + var instance = new DatabaseMapSingle<>(dictionary, key, serializer, onClose); drop.attach(instance); return instance; }; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java new file mode 100644 index 0000000..d8b70dc --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java @@ -0,0 +1,213 @@ +package it.cavallium.dbengine.database.collections; + +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.Drop; +import io.netty5.buffer.api.Owned; +import io.netty5.buffer.api.Send; +import io.netty5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.client.BadBlock; +import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.Delta; +import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.LLSingleton; +import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.SerializationFunction; +import it.cavallium.dbengine.database.serialization.Serializer; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.SynchronousSink; + +public class DatabaseSingleton extends ResourceSupport, DatabaseSingleton> implements + DatabaseStageEntry { + + private static final Logger LOG = LogManager.getLogger(DatabaseSingleton.class); + + private static final Drop> DROP = new Drop<>() { + @Override + public void drop(DatabaseSingleton obj) { + if (obj.onClose != null) { + obj.onClose.run(); + } + } + + @Override + public Drop> fork() { + return this; + } + + @Override + public void attach(DatabaseSingleton obj) { + + } + }; + + private final LLSingleton singleton; + private final Serializer serializer; + + private Runnable onClose; + + @SuppressWarnings({"unchecked", "rawtypes"}) + public DatabaseSingleton(LLSingleton singleton, Serializer serializer, + Runnable onClose) { + super((Drop>) (Drop) DROP); + this.singleton = singleton; + this.serializer = serializer; + this.onClose = onClose; + } + + private LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) { + if (snapshot == null) { + return null; + } else { + return snapshot.getSnapshot(singleton); + } + } + + private void deserializeValue(Send value, SynchronousSink sink) { + try { + U deserializedValue; + try (var valueBuf = value.receive()) { + deserializedValue = serializer.deserialize(valueBuf); + } + sink.next(deserializedValue); + } catch (IndexOutOfBoundsException ex) { + var exMessage = ex.getMessage(); + if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { + LOG.error("Unexpected zero-bytes value at " + + singleton.getDatabaseName() + ":" + singleton.getColumnName() + ":" + singleton.getName()); + sink.complete(); + } else { + sink.error(ex); + } + } catch (SerializationException ex) { + sink.error(ex); + } + } + + private Send serializeValue(U value) throws SerializationException { + var valSizeHint = serializer.getSerializedSizeHint(); + if (valSizeHint == -1) valSizeHint = 128; + try (var valBuf = singleton.getAllocator().allocate(valSizeHint)) { + serializer.serialize(value, valBuf); + return valBuf.send(); + } + } + + @Override + public Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { + return singleton.get(resolveSnapshot(snapshot)) + .handle(this::deserializeValue); + } + + @Override + public Mono setAndGetPrevious(U value) { + return Flux + .concat(singleton.get(null), singleton.set(Mono.fromCallable(() -> serializeValue(value))).then(Mono.empty())) + .singleOrEmpty() + .handle(this::deserializeValue); + } + + @Override + public Mono update(SerializationFunction<@Nullable U, @Nullable U> updater, + UpdateReturnMode updateReturnMode) { + return singleton + .update((oldValueSer) -> { + try (oldValueSer) { + U result; + if (oldValueSer == null) { + result = updater.apply(null); + } else { + U deserializedValue; + try (var valueBuf = oldValueSer.receive()) { + deserializedValue = serializer.deserialize(valueBuf); + } + result = updater.apply(deserializedValue); + } + if (result == null) { + return null; + } else { + return serializeValue(result).receive(); + } + } + }, updateReturnMode) + .handle(this::deserializeValue); + } + + @Override + public Mono> updateAndGetDelta(SerializationFunction<@Nullable U, @Nullable U> updater) { + return singleton + .updateAndGetDelta((oldValueSer) -> { + try (oldValueSer) { + U result; + if (oldValueSer == null) { + result = updater.apply(null); + } else { + U deserializedValue; + try (var valueBuf = oldValueSer.receive()) { + deserializedValue = serializer.deserialize(valueBuf); + } + result = updater.apply(deserializedValue); + } + if (result == null) { + return null; + } else { + return serializeValue(result).receive(); + } + } + }).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> { + try (var valueBuf = serialized.receive()) { + return serializer.deserialize(valueBuf); + } + })); + } + + @Override + public Mono clearAndGetPrevious() { + return Flux + .concat(singleton.get(null), singleton.set(Mono.empty()).then(Mono.empty())) + .singleOrEmpty() + .handle(this::deserializeValue); + } + + @Override + public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { + return singleton.get(null).map(unused -> 1L).defaultIfEmpty(0L); + } + + @Override + public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { + return singleton.get(null).map(t -> false).defaultIfEmpty(true); + } + + @Override + public Flux badBlocks() { + return Flux.empty(); + } + + @Override + protected RuntimeException createResourceClosedException() { + throw new IllegalStateException("Closed"); + } + + @Override + protected Owned> prepareSend() { + var onClose = this.onClose; + return drop -> { + var instance = new DatabaseSingleton<>(singleton, serializer, onClose); + drop.attach(instance); + return instance; + }; + } + + @Override + protected void makeInaccessible() { + this.onClose = null; + } +} \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java index f899f7f..ab6512b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java @@ -18,10 +18,10 @@ import reactor.core.publisher.Mono; public class SubStageGetterHashMap implements SubStageGetter, DatabaseMapDictionaryHashed> { - private final Serializer keySerializer; - private final Serializer valueSerializer; - private final Function keyHashFunction; - private final SerializerFixedBinaryLength keyHashSerializer; + final Serializer keySerializer; + final Serializer valueSerializer; + final Function keyHashFunction; + final SerializerFixedBinaryLength keyHashSerializer; public SubStageGetterHashMap(Serializer keySerializer, Serializer valueSerializer, diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java index 0753c42..d1fc9ad 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java @@ -19,9 +19,9 @@ import reactor.core.publisher.Mono; public class SubStageGetterHashSet implements SubStageGetter, DatabaseSetDictionaryHashed> { - private final Serializer keySerializer; - private final Function keyHashFunction; - private final SerializerFixedBinaryLength keyHashSerializer; + final Serializer keySerializer; + final Function keyHashFunction; + final SerializerFixedBinaryLength keyHashSerializer; public SubStageGetterHashSet(Serializer keySerializer, Function keyHashFunction, diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index 3644e0c..d1c6cc1 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -20,7 +20,7 @@ public class SubStageGetterSingle implements SubStageGetter> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, Mono> keyPrefixMono) { - return keyPrefixMono.map(keyPrefix -> new DatabaseSingle<>(dictionary, keyPrefix.receive(), serializer, null)); + return keyPrefixMono.map(keyPrefix -> new DatabaseMapSingle<>(dictionary, keyPrefix.receive(), serializer, null)); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 556500a..da784eb 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -84,7 +84,7 @@ public class LLLocalDictionary implements LLDictionary { * It used to be false, * now it's true to avoid crashes during iterations on completely corrupted files */ - static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = true; + static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false; /** * Default: true. Use false to debug problems with windowing. */ @@ -874,7 +874,9 @@ public class LLLocalDictionary implements LLDictionary { try (var ro = new ReadOptions(getReadOptions(null))) { ro.setFillCache(false); if (!range.isSingle()) { - ro.setReadaheadSize(32 * 1024); + if (LLUtils.MANUAL_READAHEAD) { + ro.setReadaheadSize(32 * 1024); + } } ro.setVerifyChecksums(true); try (var rocksIteratorTuple = getRocksIterator(nettyDirect, ro, range, db)) { @@ -1294,7 +1296,9 @@ public class LLLocalDictionary implements LLDictionary { // readOpts.setIgnoreRangeDeletions(true); readOpts.setFillCache(false); - readOpts.setReadaheadSize(32 * 1024); // 32KiB + if (LLUtils.MANUAL_READAHEAD) { + readOpts.setReadaheadSize(32 * 1024); // 32KiB + } try (CappedWriteBatch writeBatch = new CappedWriteBatch(db, alloc, CAPPED_WRITE_BATCH_CAP, @@ -1565,7 +1569,9 @@ public class LLLocalDictionary implements LLDictionary { } try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { readOpts.setFillCache(false); - readOpts.setReadaheadSize(128 * 1024); // 128KiB + if (LLUtils.MANUAL_READAHEAD) { + readOpts.setReadaheadSize(128 * 1024); // 128KiB + } readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); if (PARALLEL_EXACT_SIZE) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 56a7f00..2e98dce 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -195,7 +195,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { tableOptions.setOptimizeFiltersForMemory(true); } tableOptions - .setChecksumType(ChecksumType.kxxHash64) + .setChecksumType(ChecksumType.kCRC32c) .setBlockCacheCompressed(optionsWithCache.compressedCache()) .setBlockCache(optionsWithCache.standardCache()) .setBlockSize(16 * 1024); // 16KiB @@ -561,13 +561,16 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } @Override - public Mono getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) { + public Mono getSingleton(byte[] singletonListColumnName, + byte[] name, + byte @Nullable[] defaultValue) { return Mono .fromCallable(() -> new LLLocalSingleton( getRocksDBColumn(db, getCfh(singletonListColumnName)), (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), LLLocalKeyValueDatabase.this.name, name, + ColumnUtils.toString(singletonListColumnName), dbScheduler, defaultValue )) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index 24226d1..06593bb 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -3,22 +3,18 @@ package it.cavallium.dbengine.database.disk; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.Send; -import it.cavallium.dbengine.database.Delta; +import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; -import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializationFunction; -import it.unimi.dsi.fastutil.bytes.ByteList; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.Callable; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; -import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; import org.rocksdb.WriteOptions; @@ -33,6 +29,7 @@ public class LLLocalSingleton implements LLSingleton { private final RocksDBColumn db; private final Function snapshotResolver; private final byte[] name; + private final String columnName; private final Mono> nameMono; private final String databaseName; private final Scheduler dbScheduler; @@ -41,12 +38,14 @@ public class LLLocalSingleton implements LLSingleton { Function snapshotResolver, String databaseName, byte[] name, + String columnName, Scheduler dbScheduler, - byte[] defaultValue) throws RocksDBException { + byte @Nullable [] defaultValue) throws RocksDBException { this.db = db; this.databaseName = databaseName; this.snapshotResolver = snapshotResolver; this.name = name; + this.columnName = columnName; this.nameMono = Mono.fromCallable(() -> { var alloc = db.getAllocator(); try (var nameBuf = alloc.allocate(this.name.length)) { @@ -58,7 +57,7 @@ public class LLLocalSingleton implements LLSingleton { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Initialized in a nonblocking thread"); } - if (db.get(EMPTY_READ_OPTIONS, this.name, true) == null) { + if (defaultValue != null && db.get(EMPTY_READ_OPTIONS, this.name, true) == null) { db.put(EMPTY_WRITE_OPTIONS, this.name, defaultValue); } } @@ -81,30 +80,45 @@ public class LLLocalSingleton implements LLSingleton { } @Override - public Mono get(@Nullable LLSnapshot snapshot) { - return Mono - .fromCallable(() -> { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called get in a nonblocking thread"); - } - return db.get(resolveSnapshot(snapshot), name, true); - }) - .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)) - .subscribeOn(dbScheduler); + public Mono> get(@Nullable LLSnapshot snapshot) { + return nameMono.publishOn(Schedulers.boundedElastic()).handle((nameSend, sink) -> { + try (Buffer name = nameSend.receive()) { + Buffer result = db.get(resolveSnapshot(snapshot), name); + if (result != null) { + sink.next(result.send()); + } else { + sink.complete(); + } + } catch (RocksDBException ex) { + sink.error(new IOException("Failed to read " + Arrays.toString(name), ex)); + } + }); } @Override - public Mono set(byte[] value) { - return Mono - .fromCallable(() -> { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called set in a nonblocking thread"); - } + public Mono set(Mono> valueMono) { + return Mono.zip(nameMono, valueMono).publishOn(Schedulers.boundedElastic()).handle((tuple, sink) -> { + var nameSend = tuple.getT1(); + var valueSend = tuple.getT2(); + try (Buffer name = nameSend.receive()) { + try (Buffer value = valueSend.receive()) { db.put(EMPTY_WRITE_OPTIONS, name, value); - return null; - }) - .onErrorMap(cause -> new IOException("Failed to write " + Arrays.toString(name), cause)) - .subscribeOn(dbScheduler); + sink.next(true); + } + } catch (RocksDBException ex) { + sink.error(new IOException("Failed to write " + Arrays.toString(name), ex)); + } + }).switchIfEmpty(unset().thenReturn(true)).then(); + } + + private Mono unset() { + return nameMono.publishOn(Schedulers.boundedElastic()).handle((nameSend, sink) -> { + try (Buffer name = nameSend.receive()) { + db.delete(EMPTY_WRITE_OPTIONS, name); + } catch (RocksDBException ex) { + sink.error(new IOException("Failed to read " + Arrays.toString(name), ex)); + } + }); } @Override @@ -130,8 +144,31 @@ public class LLLocalSingleton implements LLSingleton { keySend -> Mono.fromRunnable(keySend::close)); } + @Override + public Mono> updateAndGetDelta(SerializationFunction<@Nullable Send, @Nullable Buffer> updater) { + return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called update in a nonblocking thread"); + } + UpdateAtomicResult result + = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater, UpdateAtomicResultMode.DELTA); + return ((UpdateAtomicResultDelta) result).delta(); + }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), + keySend -> Mono.fromRunnable(keySend::close)); + } + @Override public String getDatabaseName() { return databaseName; } + + @Override + public String getColumnName() { + return columnName; + } + + @Override + public String getName() { + return new String(name); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java index 9fd1e34..3b29473 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; +import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { @@ -48,7 +49,9 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { } @Override - public Mono getSingleton(byte[] singletonListColumnName, byte[] singletonName, byte[] defaultValue) { + public Mono getSingleton(byte[] singletonListColumnName, + byte[] singletonName, + byte @Nullable[] defaultValue) { var columnNameString = new String(singletonListColumnName, StandardCharsets.UTF_8); var dict = singletons.computeIfAbsent(columnNameString, _unused -> new LLMemoryDictionary(allocator, name, @@ -58,9 +61,17 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { mainDb )); return Mono - .fromCallable(() -> new LLMemorySingleton(dict, singletonName)).flatMap(singleton -> singleton + .fromCallable(() -> new LLMemorySingleton(dict, columnNameString, singletonName)).flatMap(singleton -> singleton .get(null) - .switchIfEmpty(singleton.set(defaultValue).then(Mono.empty())) + .transform(mono -> { + if (defaultValue != null) { + return mono.switchIfEmpty(singleton + .set(Mono.fromSupplier(() -> allocator.copyOf(defaultValue).send())) + .then(Mono.empty())); + } else { + return mono; + } + }) .thenReturn(singleton) ); } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java index df594c7..4978127 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java @@ -8,30 +8,21 @@ import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; -import it.cavallium.dbengine.database.disk.UpdateAtomicResult; -import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent; -import it.cavallium.dbengine.database.disk.UpdateAtomicResultMode; -import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious; -import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; -import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicReference; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; public class LLMemorySingleton implements LLSingleton { private final LLMemoryDictionary dict; + private final String columnNameString; private final byte[] singletonName; private final Mono> singletonNameBufMono; - public LLMemorySingleton(LLMemoryDictionary dict, byte[] singletonName) { + public LLMemorySingleton(LLMemoryDictionary dict, String columnNameString, byte[] singletonName) { this.dict = dict; + this.columnNameString = columnNameString; this.singletonName = singletonName; this.singletonNameBufMono = Mono.fromCallable(() -> dict .getAllocator() @@ -51,22 +42,15 @@ public class LLMemorySingleton implements LLSingleton { } @Override - public Mono get(@Nullable LLSnapshot snapshot) { - return dict - .get(snapshot, singletonNameBufMono, false) - .map(b -> { - try (var buf = b.receive()) { - return LLUtils.toArray(buf); - } - }); + public Mono> get(@Nullable LLSnapshot snapshot) { + return dict.get(snapshot, singletonNameBufMono, false); } @Override - public Mono set(byte[] value) { + public Mono set(Mono> value) { var bbKey = singletonNameBufMono; - var bbVal = Mono.fromCallable(() -> dict.getAllocator().allocate(value.length).writeBytes(value).send()); return dict - .put(bbKey, bbVal, LLDictionaryResultType.VOID) + .put(bbKey, value, LLDictionaryResultType.VOID) .then(); } @@ -75,4 +59,19 @@ public class LLMemorySingleton implements LLSingleton { UpdateReturnMode updateReturnMode) { return dict.update(singletonNameBufMono, updater, updateReturnMode); } + + @Override + public Mono> updateAndGetDelta(SerializationFunction<@Nullable Send, @Nullable Buffer> updater) { + return dict.updateAndGetDelta(singletonNameBufMono, updater); + } + + @Override + public String getColumnName() { + return columnNameString; + } + + @Override + public String getName() { + return new String(singletonName); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/remote/ByteListSerializer.java b/src/main/java/it/cavallium/dbengine/database/remote/ByteListSerializer.java new file mode 100644 index 0000000..a1de4d4 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/remote/ByteListSerializer.java @@ -0,0 +1,30 @@ +package it.cavallium.dbengine.database.remote; + +import it.cavallium.data.generator.DataSerializer; +import it.unimi.dsi.fastutil.bytes.ByteArrayList; +import it.unimi.dsi.fastutil.bytes.ByteList; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.jetbrains.annotations.NotNull; + +public class ByteListSerializer implements DataSerializer { + + @Override + public void serialize(DataOutput dataOutput, @NotNull ByteList bytes) throws IOException { + dataOutput.writeInt(bytes.size()); + for (Byte aByte : bytes) { + dataOutput.writeByte(aByte); + } + } + + @Override + public @NotNull ByteList deserialize(DataInput dataInput) throws IOException { + var size = dataInput.readInt(); + var bal = new ByteArrayList(size); + for (int i = 0; i < size; i++) { + bal.add(dataInput.readByte()); + } + return bal; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java index d55c95a..d9d6f6d 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java @@ -2,15 +2,16 @@ package it.cavallium.dbengine.database.remote; import com.google.common.collect.Multimap; import io.micrometer.core.instrument.MeterRegistry; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.incubator.codec.quic.QuicSslContextBuilder; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.Send; -import io.netty.handler.ssl.util.InsecureTrustManagerFactory; -import io.netty.incubator.codec.quic.QuicSslContextBuilder; import it.cavallium.dbengine.client.MemoryStats; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLDatabaseConnection; +import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLIndexRequest; import it.cavallium.dbengine.database.LLKeyValueDatabase; @@ -26,7 +27,6 @@ import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.LuceneHacks; -import it.cavallium.dbengine.lucene.LuceneRocksDBManager; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.rpc.current.data.BinaryOptional; @@ -52,6 +52,7 @@ import it.cavallium.dbengine.rpc.current.data.SingletonSet; import it.cavallium.dbengine.rpc.current.data.SingletonUpdateEnd; import it.cavallium.dbengine.rpc.current.data.SingletonUpdateInit; import it.cavallium.dbengine.rpc.current.data.SingletonUpdateOldData; +import it.cavallium.dbengine.rpc.current.data.nullables.NullableBytes; import it.cavallium.dbengine.rpc.current.data.nullables.NullableLLSnapshot; import it.unimi.dsi.fastutil.bytes.ByteList; import java.io.File; @@ -226,11 +227,11 @@ public class LLQuicConnection implements LLDatabaseConnection { @Override public Mono getSingleton(byte[] singletonListColumnName, byte[] name, - byte[] defaultValue) { + byte @Nullable[] defaultValue) { return sendRequest(new GetSingleton(id, ByteList.of(singletonListColumnName), ByteList.of(name), - ByteList.of(defaultValue) + defaultValue == null ? NullableBytes.empty() : NullableBytes.of(ByteList.of(defaultValue)) )).cast(GeneratedEntityId.class).map(GeneratedEntityId::id).map(singletonId -> new LLSingleton() { @Override @@ -239,17 +240,22 @@ public class LLQuicConnection implements LLDatabaseConnection { } @Override - public Mono get(@Nullable LLSnapshot snapshot) { + public Mono> get(@Nullable LLSnapshot snapshot) { return sendRequest(new SingletonGet(singletonId, NullableLLSnapshot.ofNullable(snapshot))) .cast(BinaryOptional.class) - .mapNotNull(b -> b.val().getNullable()) - .map(binary -> QuicUtils.toArrayNoCopy(binary.val())); + .mapNotNull(result -> { + if (result.val().isPresent()) { + return allocator.copyOf(QuicUtils.toArrayNoCopy(result.val().get().val())).send(); + } else { + return null; + } + }); } @Override - public Mono set(byte[] value) { - return sendRequest(new SingletonSet(singletonId, ByteList.of(value))) - .then(); + public Mono set(Mono> valueMono) { + return QuicUtils.toBytes(valueMono) + .flatMap(valueSendOpt -> sendRequest(new SingletonSet(singletonId, valueSendOpt)).then()); } @Override @@ -285,10 +291,25 @@ public class LLQuicConnection implements LLDatabaseConnection { }); } + @Override + public Mono> updateAndGetDelta(SerializationFunction<@Nullable Send, @Nullable Buffer> updater) { + return Mono.error(new UnsupportedOperationException()); + } + @Override public String getDatabaseName() { return databaseName; } + + @Override + public String getColumnName() { + return new String(singletonListColumnName); + } + + @Override + public String getName() { + return new String(name); + } }); } diff --git a/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java b/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java index 7c351f1..8656d66 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java @@ -1,12 +1,16 @@ package it.cavallium.dbengine.database.remote; import io.netty.handler.codec.ByteToMessageCodec; +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.Send; import it.cavallium.data.generator.nativedata.NullableString; import it.cavallium.dbengine.rpc.current.data.RPCCrash; import it.cavallium.dbengine.rpc.current.data.RPCEvent; +import it.cavallium.dbengine.rpc.current.data.nullables.NullableBytes; import it.unimi.dsi.fastutil.bytes.ByteArrayList; import it.unimi.dsi.fastutil.bytes.ByteList; import java.nio.charset.StandardCharsets; +import java.util.Optional; import java.util.function.Function; import java.util.function.Supplier; import java.util.logging.Level; @@ -38,6 +42,29 @@ public class QuicUtils { return new String(QuicUtils.toArrayNoCopy(b), StandardCharsets.UTF_8); } + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + public static NullableBytes toBytes(Optional> valueSendOpt) { + if (valueSendOpt.isPresent()) { + try (var value = valueSendOpt.get().receive()) { + var bytes = new byte[value.readableBytes()]; + value.copyInto(value.readerOffset(), bytes, 0, bytes.length); + return NullableBytes.ofNullable(ByteList.of(bytes)); + } + } else { + return NullableBytes.empty(); + } + } + + public static Mono toBytes(Mono> valueSendOptMono) { + return valueSendOptMono.map(valueSendOpt -> { + try (var value = valueSendOpt.receive()) { + var bytes = new byte[value.readableBytes()]; + value.copyInto(value.readerOffset(), bytes, 0, bytes.length); + return NullableBytes.ofNullable(ByteList.of(bytes)); + } + }).defaultIfEmpty(NullableBytes.empty()); + } + public record QuicStream(NettyInbound in, NettyOutbound out) {} public static Mono catchRPCErrors(@NotNull Throwable error) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 9bdb3eb..5a6004e 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -13,6 +13,8 @@ import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; +import it.cavallium.dbengine.database.collections.DatabaseStageEntry; +import it.cavallium.dbengine.database.collections.DatabaseStageMap; import it.cavallium.dbengine.database.collections.ValueGetter; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.lucene.analyzer.LegacyWordAnalyzer; @@ -230,7 +232,7 @@ public class LuceneUtils { public static ValueGetter, V> getAsyncDbValueGetterDeep( CompositeSnapshot snapshot, - DatabaseMapDictionaryDeep, DatabaseMapDictionary> dictionaryDeep) { + DatabaseMapDictionaryDeep, ? extends DatabaseStageMap>> dictionaryDeep) { return entry -> LLUtils.usingResource(dictionaryDeep .at(snapshot, entry.getKey()), sub -> sub.getValue(snapshot, entry.getValue()), true); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java index fa488b6..033a2dd 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java +++ b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java @@ -6,6 +6,7 @@ import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.ReadableComponent; import io.netty5.buffer.api.WritableComponent; +import it.cavallium.dbengine.database.LLUtils; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -105,9 +106,11 @@ public class RocksdbFileStore { db.put(headers, NEXT_ID_KEY, Longs.toByteArray(100)); incFlush(); } - this.itReadOpts = new ReadOptions() - .setReadaheadSize(blockSize * 4L) - .setVerifyChecksums(false) + this.itReadOpts = new ReadOptions(); + if (LLUtils.MANUAL_READAHEAD) { + itReadOpts.setReadaheadSize(blockSize * 4L); + } + itReadOpts.setVerifyChecksums(false) .setIgnoreRangeDeletions(true); } catch (RocksDBException e) { throw new IOException("Failed to open RocksDB meta file store", e); diff --git a/src/test/java/it/cavallium/dbengine/TestSingletons.java b/src/test/java/it/cavallium/dbengine/TestSingletons.java index 2f91e1f..2863e9a 100644 --- a/src/test/java/it/cavallium/dbengine/TestSingletons.java +++ b/src/test/java/it/cavallium/dbengine/TestSingletons.java @@ -5,10 +5,14 @@ import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; import static it.cavallium.dbengine.DbTestUtils.newAllocator; import static it.cavallium.dbengine.DbTestUtils.tempDb; +import it.cavallium.data.generator.nativedata.StringSerializer; import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.database.LLKeyValueDatabase; +import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.collections.DatabaseInt; import it.cavallium.dbengine.database.collections.DatabaseLong; +import it.cavallium.dbengine.database.collections.DatabaseSingleton; +import it.cavallium.dbengine.database.serialization.Serializer; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -85,6 +89,15 @@ public abstract class TestSingletons { .verifyComplete(); } + @Test + public void testCreateSingleton() { + StepVerifier + .create(tempDb(getTempDbGenerator(), allocator, db -> tempSingleton(db, "testsingleton") + .flatMap(dbSingleton -> dbSingleton.get(null)) + )) + .verifyComplete(); + } + @ParameterizedTest @ValueSource(ints = {Integer.MIN_VALUE, -192, -2, -1, 0, 1, 2, 1292, Integer.MAX_VALUE}) public void testDefaultValueInteger(int i) { @@ -137,6 +150,21 @@ public abstract class TestSingletons { .verifyComplete(); } + @ParameterizedTest + @MethodSource("provideLongNumberWithRepeats") + public void testSetSingleton(Long i, Integer repeats) { + StepVerifier + .create(tempDb(getTempDbGenerator(), allocator, db -> tempSingleton(db, "test") + .flatMap(dbSingleton -> Mono + .defer(() -> dbSingleton.set(Long.toString(System.currentTimeMillis()))) + .repeat(repeats) + .then(dbSingleton.set(Long.toString(i))) + .then(dbSingleton.get(null))) + )) + .expectNext(Long.toString(i)) + .verifyComplete(); + } + public static Mono tempInt(LLKeyValueDatabase database, String name, int defaultValue) { return database .getInteger("ints", name, defaultValue); @@ -146,4 +174,10 @@ public abstract class TestSingletons { return database .getLong("longs", name, defaultValue); } + + public static Mono> tempSingleton(LLKeyValueDatabase database, String name) { + return database + .getSingleton("longs", name) + .map(singleton -> new DatabaseSingleton<>(singleton, Serializer.UTF8_SERIALIZER, null)); + } }