Lazy retain generated suffixes

This commit is contained in:
Andrea Cavalli 2021-08-22 19:52:19 +02:00
parent bc759c344d
commit 906379923f
2 changed files with 48 additions and 31 deletions

View File

@ -16,6 +16,8 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.ToIntFunction; import java.util.function.ToIntFunction;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
@ -493,7 +495,23 @@ public class LLUtils {
return Mono.just(buf).map(ByteBuf::retain); return Mono.just(buf).map(ByteBuf::retain);
} }
public static Mono<LLRange> lazyRetain(LLRange range) { public static Mono<LLRange> lazyRetainRange(LLRange range) {
return Mono.just(range).map(LLRange::retain); return Mono.just(range).map(LLRange::retain);
} }
public static Mono<ByteBuf> lazyRetain(Callable<ByteBuf> bufCallable) {
return Mono.fromCallable(bufCallable).cacheInvalidateIf(byteBuf -> {
// Retain if the value has been cached previously
byteBuf.retain();
return false;
});
}
public static Mono<LLRange> lazyRetainRange(Callable<LLRange> rangeCallable) {
return Mono.fromCallable(rangeCallable).cacheInvalidateIf(range -> {
// Retain if the value has been cached previously
range.retain();
return false;
});
}
} }

View File

@ -269,7 +269,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
assert keyPrefix.refCnt() > 0; assert keyPrefix.refCnt() > 0;
assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey); assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey);
this.range = keyPrefixLength == 0 ? LLRange.all() : LLRange.of(firstKey.retain(), nextRangeKey.retain()); this.range = keyPrefixLength == 0 ? LLRange.all() : LLRange.of(firstKey.retain(), nextRangeKey.retain());
this.rangeMono = LLUtils.lazyRetain(this.range); this.rangeMono = LLUtils.lazyRetainRange(this.range);
assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength); assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength);
} finally { } finally {
nextRangeKey.release(); nextRangeKey.release();
@ -389,34 +389,33 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override @Override
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return Mono return Mono.using(
.using(
() -> serializeSuffix(keySuffix), () -> serializeSuffix(keySuffix),
keySuffixData -> { keySuffixData -> {
Flux<ByteBuf> debuggingKeysFlux = Mono Flux<ByteBuf> debuggingKeysFlux = Mono.<List<ByteBuf>>defer(() -> {
.<List<ByteBuf>>defer(() -> {
if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED
&& this.subStageGetter.needsDebuggingKeyFlux()) { && this.subStageGetter.needsDebuggingKeyFlux()) {
return Flux return Flux
.using( .using(
() -> toExtRange(keySuffixData.retain()), () -> toExtRange(keySuffixData.retain()),
extRangeBuf -> this.dictionary extRangeBuf -> this.dictionary
.getRangeKeys(resolveSnapshot(snapshot), LLUtils.lazyRetain(extRangeBuf)), .getRangeKeys(resolveSnapshot(snapshot), LLUtils.lazyRetainRange(extRangeBuf)),
LLRange::release LLRange::release
) )
.collectList(); .collectList();
} else { } else {
return Mono.just(List.of()); return Mono.just(List.of());
} }
}) }).flatMapIterable(it -> it);
.flatMapIterable(it -> it); return Mono.using(
Mono<ByteBuf> keyBufMono = LLUtils.lazyRetain(toKeyWithoutExt(keySuffixData.retain())); () -> toKeyWithoutExt(keySuffixData.retain()),
return this.subStageGetter keyWithoutExt -> this.subStageGetter
.subStage(dictionary, snapshot, keyBufMono, debuggingKeysFlux); .subStage(dictionary, snapshot, LLUtils.lazyRetain(keyWithoutExt), debuggingKeysFlux),
ReferenceCounted::release
);
}, },
ReferenceCounted::release ReferenceCounted::release
) ).doOnDiscard(DatabaseStage.class, DatabaseStage::release);
.doOnDiscard(DatabaseStage.class, DatabaseStage::release);
} }
@Override @Override
@ -538,7 +537,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
.doOnNext(ReferenceCounted::release) .doOnNext(ReferenceCounted::release)
.then(); .then();
} else { } else {
return dictionary.setRange(LLUtils.lazyRetain(range), Flux.empty()); return dictionary.setRange(LLUtils.lazyRetainRange(range), Flux.empty());
} }
}); });
} }