diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 2c5a1b3..c87e86f 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -57,6 +57,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; import org.rocksdb.AbstractImmutableNativeReference; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; @@ -87,6 +88,11 @@ public class LLUtils { public static final boolean FORCE_DISABLE_CHECKSUM_VERIFICATION = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checksum.disable.force", "false")); + public static final boolean DEBUG_ALL_DROPS + = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.drops.log", "false")); + public static final boolean DEBUG_ALL_DISCARDS + = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.discards.log", "false")); + static { for (int i1 = 0; i1 < 256; i1++) { var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1]; @@ -624,8 +630,16 @@ public class LLUtils { return readOptions; } - public static Mono closeResource(Resource resource) { - return Mono.fromRunnable(resource::close); + public static Mono finalizeResource(Resource resource) { + return Mono.fromRunnable(() -> LLUtils.closeResource(resource)); + } + + public static Flux handleDiscard(Flux flux) { + return flux.doOnDiscard(Object.class, LLUtils::onDiscard); + } + + public static Mono handleDiscard(Mono flux) { + return flux.doOnDiscard(Object.class, LLUtils::onDiscard); } @Deprecated @@ -847,6 +861,20 @@ public class LLUtils { } private static void onNextDropped(Object next) { + if (DEBUG_ALL_DROPS) { + logger.trace("Dropped: {}", () -> next.getClass().getName()); + } + closeResource(next); + } + + public static void onDiscard(Object next) { + if (DEBUG_ALL_DISCARDS) { + logger.trace("Discarded: {}", () -> next.getClass().getName()); + } + closeResource(next); + } + + public static void closeResource(Object next) { if (next instanceof Send send) { send.close(); } else if (next instanceof Resource resource && resource.isAccessible()) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 3de0f98..c4c9482 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -440,7 +440,8 @@ public class DatabaseMapDictionaryDeep> extend var beforeWriterOffset = output.writerOffset(); keySuffixSerializer.serialize(keySuffix, output); var afterWriterOffset = output.writerOffset(); - assert suffixKeyLengthConsistency(afterWriterOffset - beforeWriterOffset); + assert suffixKeyLengthConsistency(afterWriterOffset - beforeWriterOffset) + : "Invalid key suffix length: " + (afterWriterOffset - beforeWriterOffset) + ". Expected: " + keySuffixLength; } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 304c8cf..8701ea7 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -1,6 +1,5 @@ package it.cavallium.dbengine.database.collections; -import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Owned; @@ -237,7 +236,7 @@ public class DatabaseMapDictionaryHashed extends public Flux> setAllValuesAndGetPrevious(Flux> entries) { return entries.flatMap(entry -> Mono.usingWhen(this.at(null, entry.getKey()), stage -> stage.setAndGetPrevious(entry.getValue()).map(prev -> Map.entry(entry.getKey(), prev)), - LLUtils::closeResource + LLUtils::finalizeResource )); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 8ad917b..c691dee 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -1,7 +1,5 @@ package it.cavallium.dbengine.database.collections; -import static reactor.core.publisher.Mono.fromRunnable; - import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; @@ -34,14 +32,14 @@ public interface DatabaseStageMap> extends default Mono containsKey(@Nullable CompositeSnapshot snapshot, T key) { return Mono.usingWhen(this.at(snapshot, key), stage -> stage.isEmpty(snapshot).map(empty -> !empty), - LLUtils::closeResource + LLUtils::finalizeResource ); } default Mono getValue(@Nullable CompositeSnapshot snapshot, T key, boolean existsAlmostCertainly) { return Mono.usingWhen(this.at(snapshot, key), stage -> stage.get(snapshot, existsAlmostCertainly), - LLUtils::closeResource + LLUtils::finalizeResource ); } @@ -54,7 +52,7 @@ public interface DatabaseStageMap> extends } default Mono putValue(T key, U value) { - return Mono.usingWhen(at(null, key).single(), stage -> stage.set(value), LLUtils::closeResource); + return Mono.usingWhen(at(null, key).single(), stage -> stage.set(value), LLUtils::finalizeResource); } Mono getUpdateMode(); @@ -64,7 +62,7 @@ public interface DatabaseStageMap> extends SerializationFunction<@Nullable U, @Nullable U> updater) { return Mono.usingWhen(at(null, key).single(), stage -> stage.update(updater, updateReturnMode), - LLUtils::closeResource + LLUtils::finalizeResource ); } @@ -87,7 +85,7 @@ public interface DatabaseStageMap> extends default Mono putValueAndGetPrevious(T key, U value) { return Mono.usingWhen(at(null, key).single(), stage -> stage.setAndGetPrevious(value), - LLUtils::closeResource + LLUtils::finalizeResource ); } @@ -96,7 +94,7 @@ public interface DatabaseStageMap> extends */ default Mono putValueAndGetChanged(T key, U value) { return Mono - .usingWhen(at(null, key).single(), stage -> stage.setAndGetChanged(value), LLUtils::closeResource) + .usingWhen(at(null, key).single(), stage -> stage.setAndGetChanged(value), LLUtils::finalizeResource) .single(); } @@ -105,7 +103,7 @@ public interface DatabaseStageMap> extends } default Mono removeAndGetPrevious(T key) { - return Mono.usingWhen(at(null, key), DatabaseStage::clearAndGetPrevious, LLUtils::closeResource); + return Mono.usingWhen(at(null, key), DatabaseStage::clearAndGetPrevious, LLUtils::finalizeResource); } default Mono removeAndGetStatus(T key) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java index 5afc7d2..c4e4b2e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java @@ -7,6 +7,7 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.LLUtils; import java.io.IOException; import java.time.Duration; import java.util.concurrent.ExecutorService; @@ -86,6 +87,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { .repeatWhen(s -> s.delayElements(queryRefreshDebounceTime)) .takeUntilOther(closeRequestedMono.asMono()) .doAfterTerminate(refresherClosed::tryEmitEmpty) + .transform(LLUtils::handleDiscard) .subscribe(); this.cachedSnapshotSearchers = CacheBuilder.newBuilder() diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index bb34c6d..55c72c6 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -11,7 +11,6 @@ import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.DatabaseStageEntry; import it.cavallium.dbengine.database.collections.DatabaseStageMap; @@ -90,7 +89,6 @@ import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.search.similarities.TFIDFSimilarity; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Constants; import org.apache.lucene.util.StringHelper; import org.jetbrains.annotations.NotNull; @@ -235,7 +233,7 @@ public class LuceneUtils { DatabaseMapDictionaryDeep, ? extends DatabaseStageMap>> dictionaryDeep) { return entry -> Mono.usingWhen(dictionaryDeep.at(snapshot, entry.getKey()), sub -> sub.getValue(snapshot, entry.getValue()), - LLUtils::closeResource + LLUtils::finalizeResource ); } diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java index 2f60044..decfb69 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java @@ -6,8 +6,6 @@ import static it.cavallium.dbengine.SyncUtils.*; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.database.UpdateMode; -import it.cavallium.dbengine.database.collections.DatabaseStageEntry; -import it.cavallium.dbengine.database.collections.DatabaseStageMap; import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; @@ -24,6 +22,7 @@ import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -132,8 +131,8 @@ public abstract class TestDictionaryMap { .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) .flatMap(map -> Mono - .usingWhen(map.at(null, key), v -> v.set(value), LLUtils::closeResource) - .then(Mono.usingWhen(map.at(null, key), v -> v.get(null), LLUtils::closeResource)) + .usingWhen(map.at(null, key), v -> v.set(value), LLUtils::finalizeResource) + .then(Mono.usingWhen(map.at(null, key), v -> v.get(null), LLUtils::finalizeResource)) .doFinally(s -> map.close()) ) )); @@ -322,6 +321,21 @@ public abstract class TestDictionaryMap { } } + @Test + public void testUpdateGetWithCancel() { + tempDb(getTempDbGenerator(), allocator, db -> { + var mapMono = tempDictionary(db, UpdateMode.ALLOW) + .map(dict -> tempDatabaseMapDictionaryMap(dict, MapType.MAP, 5)); + + var keys = Flux.range(10, 89).map(n -> "key" + n).repeat(100); + + return Mono.usingWhen(mapMono, + map -> keys.flatMap(key -> map.updateValue(key, prevValue -> key + "-val")).then(), + LLUtils::finalizeResource + ); + }).take(50).blockLast(); + } + @ParameterizedTest @MethodSource("provideArgumentsPut") public void testPutAndGetChanged(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) { @@ -427,7 +441,7 @@ public abstract class TestDictionaryMap { Flux keysFlux = entriesFlux.map(Entry::getKey); Flux> resultsFlux = map.setAllValues(entriesFlux).thenMany(map.getMulti(null, keysFlux)); return Flux.zip(keysFlux, resultsFlux, Map::entry); - }, LLUtils::closeResource) + }, LLUtils::finalizeResource) .filter(k -> k.getValue().isPresent()).map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())); } ));