diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 4d09595..266bcd0 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -16,6 +16,8 @@ import java.util.Collection; import java.util.List; import java.util.Map.Entry; import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.ToIntFunction; import org.apache.lucene.document.Document; @@ -493,7 +495,23 @@ public class LLUtils { return Mono.just(buf).map(ByteBuf::retain); } - public static Mono lazyRetain(LLRange range) { + public static Mono lazyRetainRange(LLRange range) { return Mono.just(range).map(LLRange::retain); } + + public static Mono lazyRetain(Callable bufCallable) { + return Mono.fromCallable(bufCallable).cacheInvalidateIf(byteBuf -> { + // Retain if the value has been cached previously + byteBuf.retain(); + return false; + }); + } + + public static Mono lazyRetainRange(Callable rangeCallable) { + return Mono.fromCallable(rangeCallable).cacheInvalidateIf(range -> { + // Retain if the value has been cached previously + range.retain(); + return false; + }); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 74acaa7..0691eae 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -269,7 +269,7 @@ public class DatabaseMapDictionaryDeep> implem assert keyPrefix.refCnt() > 0; assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey); this.range = keyPrefixLength == 0 ? LLRange.all() : LLRange.of(firstKey.retain(), nextRangeKey.retain()); - this.rangeMono = LLUtils.lazyRetain(this.range); + this.rangeMono = LLUtils.lazyRetainRange(this.range); assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength); } finally { nextRangeKey.release(); @@ -389,34 +389,33 @@ public class DatabaseMapDictionaryDeep> implem @Override public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { - return Mono - .using( - () -> serializeSuffix(keySuffix), - keySuffixData -> { - Flux debuggingKeysFlux = Mono - .>defer(() -> { - if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED - && this.subStageGetter.needsDebuggingKeyFlux()) { - return Flux - .using( - () -> toExtRange(keySuffixData.retain()), - extRangeBuf -> this.dictionary - .getRangeKeys(resolveSnapshot(snapshot), LLUtils.lazyRetain(extRangeBuf)), - LLRange::release - ) - .collectList(); - } else { - return Mono.just(List.of()); - } - }) - .flatMapIterable(it -> it); - Mono keyBufMono = LLUtils.lazyRetain(toKeyWithoutExt(keySuffixData.retain())); - return this.subStageGetter - .subStage(dictionary, snapshot, keyBufMono, debuggingKeysFlux); - }, - ReferenceCounted::release - ) - .doOnDiscard(DatabaseStage.class, DatabaseStage::release); + return Mono.using( + () -> serializeSuffix(keySuffix), + keySuffixData -> { + Flux debuggingKeysFlux = Mono.>defer(() -> { + if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED + && this.subStageGetter.needsDebuggingKeyFlux()) { + return Flux + .using( + () -> toExtRange(keySuffixData.retain()), + extRangeBuf -> this.dictionary + .getRangeKeys(resolveSnapshot(snapshot), LLUtils.lazyRetainRange(extRangeBuf)), + LLRange::release + ) + .collectList(); + } else { + return Mono.just(List.of()); + } + }).flatMapIterable(it -> it); + return Mono.using( + () -> toKeyWithoutExt(keySuffixData.retain()), + keyWithoutExt -> this.subStageGetter + .subStage(dictionary, snapshot, LLUtils.lazyRetain(keyWithoutExt), debuggingKeysFlux), + ReferenceCounted::release + ); + }, + ReferenceCounted::release + ).doOnDiscard(DatabaseStage.class, DatabaseStage::release); } @Override @@ -538,7 +537,7 @@ public class DatabaseMapDictionaryDeep> implem .doOnNext(ReferenceCounted::release) .then(); } else { - return dictionary.setRange(LLUtils.lazyRetain(range), Flux.empty()); + return dictionary.setRange(LLUtils.lazyRetainRange(range), Flux.empty()); } }); }