diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index c251e19..7c3a1d1 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -193,6 +193,34 @@ public class LLUtils { } } + public static String toStringSafe(@Nullable LLRange range) { + try { + if (range == null || range.isAccessible()) { + return toString(range); + } else { + return "(released)"; + } + } catch (IllegalReferenceCountException ex) { + return "(released)"; + } + } + + public static String toString(@Nullable LLRange range) { + if (range == null) { + return "null"; + } else if (range.isAll()) { + return "ξ"; + } else if (range.hasMin() && range.hasMax()) { + return "[" + toStringSafe(range.getMinUnsafe()) + "," + toStringSafe(range.getMaxUnsafe()) + ")"; + } else if (range.hasMin()) { + return "[" + toStringSafe(range.getMinUnsafe()) + ",*)"; + } else if (range.hasMax()) { + return "[*," + toStringSafe(range.getMaxUnsafe()) + ")"; + } else { + return "∅"; + } + } + public static String toString(@Nullable Buffer key) { if (key == null) { return "null"; @@ -215,6 +243,8 @@ public class LLUtils { if (isAscii) { if (byteVal >= 32 && byteVal < 127) { asciiSB.append((char) byteVal); + } else if (byteVal == 0) { + asciiSB.append('␀'); } else { isAscii = false; asciiSB = null; @@ -477,16 +507,20 @@ public class LLUtils { @NotNull public static DirectBuffer convertToReadableDirect(BufferAllocator allocator, Send content) { try (var buf = content.receive()) { + DirectBuffer result; if (buf.countComponents() == 1) { var direct = obtainDirect(buf, false); - return new DirectBuffer(buf.send(), direct); + result = new DirectBuffer(buf.send(), direct); } else { var direct = newDirect(allocator, buf.readableBytes()); try (var buf2 = direct.buffer().receive()) { buf.copyInto(buf.readerOffset(), buf2, buf2.writerOffset(), buf.readableBytes()); - return new DirectBuffer(buf2.send(), direct.byteBuffer()); + buf2.writerOffset(buf2.writerOffset() + buf.readableBytes()); + assert buf2.readableBytes() == buf.readableBytes(); + result = new DirectBuffer(buf2.send(), direct.byteBuffer()); } } + return result; } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index b02277c..baf3492 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -252,7 +252,7 @@ public class DatabaseMapDictionaryDeep> implem protected void removePrefix(Buffer key) { assert key.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength || key.readableBytes() == keyPrefixLength + keySuffixLength; - key.readerOffset(key.readerOffset() + this.keyPrefixLength).compact(); + key.readerOffset(key.readerOffset() + this.keyPrefixLength); assert key.readableBytes() == keySuffixLength + keyExtLength || key.readableBytes() == keySuffixLength; } @@ -262,7 +262,7 @@ public class DatabaseMapDictionaryDeep> implem */ protected void removeExt(Buffer key) { assert key.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; - key.writerOffset(keyPrefixLength + keySuffixLength).compact(); + key.writerOffset(keyPrefixLength + keySuffixLength); assert key.readableBytes() == keyPrefixLength + keySuffixLength; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 653cc35..66815c3 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -31,7 +31,7 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap keySuffixHashFunction; protected DatabaseMapDictionaryHashed(LLDictionary dictionary, - Send prefixKey, + @NotNull Send prefixKey, Serializer keySuffixSerializer, Serializer valueSerializer, Function keySuffixHashFunction, diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java index f1903f8..260ff51 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java @@ -4,6 +4,7 @@ import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.HashMap; @@ -23,7 +24,7 @@ public class DatabaseSetDictionary extends DatabaseMapDictionary public static DatabaseSetDictionary simple(LLDictionary dictionary, SerializerFixedBinaryLength keySerializer) { - return new DatabaseSetDictionary<>(dictionary, null, keySerializer); + return new DatabaseSetDictionary<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer); } public static DatabaseSetDictionary tail(LLDictionary dictionary, diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java index c9c6a38..16a9f3d 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java @@ -4,6 +4,7 @@ import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; @@ -11,6 +12,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.function.Function; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; @@ -18,7 +20,7 @@ import reactor.core.publisher.Mono; public class DatabaseSetDictionaryHashed extends DatabaseMapDictionaryHashed { protected DatabaseSetDictionaryHashed(LLDictionary dictionary, - Send prefixKey, + @NotNull Send prefixKey, Serializer keySuffixSerializer, Function keySuffixHashFunction, SerializerFixedBinaryLength keySuffixHashSerializer) { @@ -36,7 +38,7 @@ public class DatabaseSetDictionaryHashed extends DatabaseMapDictionaryHas Function keyHashFunction, SerializerFixedBinaryLength keyHashSerializer) { return new DatabaseSetDictionaryHashed<>(dictionary, - null, + LLUtils.empty(dictionary.getAllocator()), keySerializer, keyHashFunction, keyHashSerializer diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 42cf458..fdc4c2e 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -287,7 +287,8 @@ public interface DatabaseStageMap> extends Dat @Override default Mono> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { return getAllValues(snapshot) - .collectMap(Entry::getKey, Entry::getValue, HashMap::new); + .collectMap(Entry::getKey, Entry::getValue, HashMap::new) + .filter(map -> !map.isEmpty()); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 8c53e41..075fa91 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -834,9 +834,6 @@ public class LLLocalDictionary implements LLDictionary { stamp = 0; } try { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Reading {} (before update)", LLUtils.toStringSafe(key)); - } while (true) { @Nullable Buffer prevData; var prevDataHolder = existsAlmostCertainly ? null : new Holder(); @@ -862,7 +859,7 @@ public class LLLocalDictionary implements LLDictionary { } if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, - "Read {}: {} (before update)", + "Reading {}: {} (before update)", LLUtils.toStringSafe(key), LLUtils.toStringSafe(prevData) ); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index e2c82f5..e4db431 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database.disk; +import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; + import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Send; @@ -12,10 +14,13 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; public abstract class LLLocalGroupedReactiveRocksIterator { + protected static final Logger logger = LoggerFactory.getLogger(LLLocalGroupedReactiveRocksIterator.class); private final RocksDB db; private final BufferAllocator alloc; private final ColumnFamilyHandle cfh; @@ -52,6 +57,9 @@ public abstract class LLLocalGroupedReactiveRocksIterator { .generate(() -> { var readOptions = new ReadOptions(this.readOptions); readOptions.setFillCache(canFillCache && range.hasMin() && range.hasMax()); + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); + } return LLLocalDictionary.getRocksIterator(alloc, allowNettyDirect, readOptions, range.copy().send(), db, cfh); }, (tuple, sink) -> { try { @@ -74,6 +82,16 @@ public abstract class LLLocalGroupedReactiveRocksIterator { } else { value = null; } + + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Range {} is reading {}: {}", + LLUtils.toStringSafe(range), + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(value) + ); + } + try { rocksIterator.next(); rocksIterator.status(); @@ -94,9 +112,15 @@ public abstract class LLLocalGroupedReactiveRocksIterator { if (!values.isEmpty()) { sink.next(values); } else { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); + } sink.complete(); } } catch (RocksDBException ex) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); + } sink.error(ex); } return tuple; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index 818882d..d82e570 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database.disk; +import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; + import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Send; @@ -9,10 +11,13 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; public class LLLocalKeyPrefixReactiveRocksIterator { + protected static final Logger logger = LoggerFactory.getLogger(LLLocalKeyPrefixReactiveRocksIterator.class); private final RocksDB db; private final BufferAllocator alloc; private final ColumnFamilyHandle cfh; @@ -54,6 +59,9 @@ public class LLLocalKeyPrefixReactiveRocksIterator { readOptions.setReadaheadSize(32 * 1024); // 32KiB readOptions.setFillCache(canFillCache); } + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); + } return LLLocalDictionary.getRocksIterator(alloc, allowNettyDirect, readOptions, rangeSend, db, cfh); }, (tuple, sink) -> { try { @@ -79,11 +87,24 @@ public class LLLocalKeyPrefixReactiveRocksIterator { rocksIterator.status(); } } + if (firstGroupKey != null) { var groupKeyPrefix = firstGroupKey.copy(firstGroupKey.readerOffset(), prefixLength); assert groupKeyPrefix.isAccessible(); + + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Range {} is reading prefix {}", + LLUtils.toStringSafe(range), + LLUtils.toStringSafe(groupKeyPrefix) + ); + } + sink.next(groupKeyPrefix.send()); } else { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); + } sink.complete(); } } finally { @@ -92,6 +113,9 @@ public class LLLocalKeyPrefixReactiveRocksIterator { } } } catch (RocksDBException ex) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); + } sink.error(ex); } return tuple; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index 70428da..a1d5d32 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.disk; +import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.disk.LLLocalDictionary.getRocksIterator; import io.net5.buffer.api.Buffer; @@ -14,10 +15,13 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; public abstract class LLLocalReactiveRocksIterator { + protected static final Logger logger = LoggerFactory.getLogger(LLLocalReactiveRocksIterator.class); private final AtomicBoolean released = new AtomicBoolean(false); private final RocksDB db; private final BufferAllocator alloc; @@ -51,6 +55,9 @@ public abstract class LLLocalReactiveRocksIterator { readOptions.setReadaheadSize(32 * 1024); // 32KiB readOptions.setFillCache(false); } + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); + } return getRocksIterator(alloc, allowNettyDirect, readOptions, range.copy().send(), db, cfh); }, (tuple, sink) -> { try { @@ -74,6 +81,16 @@ public abstract class LLLocalReactiveRocksIterator { } else { value = null; } + + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Range {} is reading {}: {}", + LLUtils.toStringSafe(range), + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(value) + ); + } + try { rocksIterator.next(); rocksIterator.status(); @@ -85,9 +102,15 @@ public abstract class LLLocalReactiveRocksIterator { } } } else { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); + } sink.complete(); } } catch (RocksDBException ex) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); + } sink.error(ex); } return tuple; diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java index 2238a62..d7073c7 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java @@ -5,7 +5,10 @@ import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; import static it.cavallium.dbengine.DbTestUtils.isCIMode; import static it.cavallium.dbengine.DbTestUtils.newAllocator; import static it.cavallium.dbengine.DbTestUtils.destroyAllocator; +import static it.cavallium.dbengine.DbTestUtils.run; +import static it.cavallium.dbengine.DbTestUtils.runVoid; import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryDeepMap; +import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryMap; import static it.cavallium.dbengine.DbTestUtils.tempDb; import static it.cavallium.dbengine.DbTestUtils.tempDictionary; @@ -23,12 +26,15 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -42,6 +48,7 @@ import reactor.util.function.Tuples; @TestMethodOrder(MethodOrderer.MethodName.class) public abstract class TestDictionaryMapDeep { + private final Logger log = LoggerFactory.getLogger(this.getClass()); private TestAllocator allocator; private boolean checkLeaks = true; @@ -174,22 +181,51 @@ public abstract class TestDictionaryMapDeep { @ParameterizedTest @MethodSource("provideArgumentsSet") - public void testSetValueGetValue(UpdateMode updateMode, String key, Map value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMap(map -> map - .putValue(key, value) - .then(map.getValue(null, key)) - .doAfterTerminate(map::release) - ) - )); - if (shouldFail) { - this.checkLeaks = false; - stpVer.verifyError(); - } else { - stpVer.expectNext(value).verifyComplete(); - } + public void testPutValue(UpdateMode updateMode, String key, Map value, boolean shouldFail) { + var gen = getTempDbGenerator(); + var db = run(gen.openTempDb(allocator)); + var dict = run(tempDictionary(db.db(), updateMode)); + var map = tempDatabaseMapDictionaryDeepMap(dict, 5, 6); + + log.debug("Put \"{}\" = \"{}\"", key, value); + runVoid(shouldFail, map.putValue(key, value)); + + var resultingMapSize = run(map.leavesCount(null, false)); + Assertions.assertEquals(shouldFail ? 0 : value.size(), resultingMapSize); + + var resultingMap = run(map.get(null)); + Assertions.assertEquals(shouldFail ? null : Map.of(key, value), resultingMap); + + runVoid(map.close()); + map.release(); + + //if (shouldFail) this.checkLeaks = false; + + gen.closeTempDb(db); + } + + @ParameterizedTest + @MethodSource("provideArgumentsSet") + public void testGetValue(UpdateMode updateMode, String key, Map value, boolean shouldFail) { + var gen = getTempDbGenerator(); + var db = run(gen.openTempDb(allocator)); + var dict = run(tempDictionary(db.db(), updateMode)); + var map = tempDatabaseMapDictionaryDeepMap(dict, 5, 6); + + log.debug("Put \"{}\" = \"{}\"", key, value); + runVoid(shouldFail, map.putValue(key, value)); + + log.debug("Get \"{}\"", key); + var returnedValue = run(shouldFail, map.getValue(null, key)); + + Assertions.assertEquals(shouldFail ? null : value, returnedValue); + + runVoid(map.close()); + map.release(); + + //if (shouldFail) this.checkLeaks = false; + + gen.closeTempDb(db); } @ParameterizedTest