Add count multi searcher

This commit is contained in:
Andrea Cavalli 2021-11-09 00:05:26 +01:00
parent 1ba4a1866f
commit 47aac33b22
7 changed files with 151 additions and 75 deletions

View File

@ -12,7 +12,7 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
private static final LocalSearcher localSearcher = new PagedLocalSearcher(); private static final LocalSearcher localSearcher = new PagedLocalSearcher();
private static final LocalSearcher countSearcher = new CountLocalSearcher(); private static final LocalSearcher countSearcher = new CountMultiSearcher();
@Override @Override
public Mono<LuceneSearchResult> collect(Mono<Send<LLIndexSearcher>> indexSearcher, public Mono<LuceneSearchResult> collect(Mono<Send<LLIndexSearcher>> indexSearcher,

View File

@ -5,22 +5,15 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.io.Closeable;
import java.io.IOException;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
public class AdaptiveMultiSearcher implements MultiSearcher { public class AdaptiveMultiSearcher implements MultiSearcher {
private static final MultiSearcher count private static final MultiSearcher count = new CountMultiSearcher();
= new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher());
private static final MultiSearcher scoredPaged = new ScoredPagedMultiSearcher(); private static final MultiSearcher scoredPaged = new ScoredPagedMultiSearcher();
private static final MultiSearcher unsortedUnscoredPaged private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedUnscoredStreamingMultiSearcher();
= new UnsortedUnscoredSimpleMultiSearcher(new PagedLocalSearcher());
private static final MultiSearcher unsortedUnscoredContinuous
= new UnsortedUnscoredStreamingMultiSearcher();
private final UnsortedScoredFullMultiSearcher unsortedScoredFull; private final UnsortedScoredFullMultiSearcher unsortedScoredFull;
@ -75,9 +68,6 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
return unsortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); return unsortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
} }
} }
} else if (realLimit <= maxAllowedInMemoryLimit) {
// Run single-page searches using the paged multi searcher
return unsortedUnscoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
} else { } else {
// Run large/unbounded searches using the continuous multi searcher // Run large/unbounded searches using the continuous multi searcher
return unsortedUnscoredContinuous.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); return unsortedUnscoredContinuous.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);

View File

@ -1,51 +0,0 @@
package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class CountLocalSearcher implements LocalSearcher {
@Override
public Mono<LuceneSearchResult> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
return Mono
.usingWhen(
indexSearcherMono,
indexSearcher -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = transformer.transform(Mono
.fromSupplier(() -> new TransformerInput(LLIndexSearchers.unsharded(indexSearcher), queryParams)));
}
return queryParamsMono.flatMap(queryParams2 -> Mono.fromCallable(() -> {
try (var is = indexSearcher.receive()) {
LLUtils.ensureBlocking();
return is.getIndexSearcher().count(queryParams2.query());
}
}).subscribeOn(Schedulers.boundedElastic()));
},
is -> Mono.empty()
)
.map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), null))
.doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
}
@Override
public String getName() {
return "count local";
}
}

View File

@ -0,0 +1,134 @@
package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.util.ArrayList;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class CountMultiSearcher implements MultiSearcher {
@Override
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
return LLUtils.usingSendResource(indexSearchersMono,
indexSearchers -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = transformer.transform(Mono
.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams)));
}
return queryParamsMono.flatMap(queryParams2 -> {
var localQueryParams = getLocalQueryParams(queryParams2);
return Mono
.fromRunnable(() -> {
LLUtils.ensureBlocking();
if (queryParams2.isSorted() && queryParams2.limitLong() > 0) {
throw new UnsupportedOperationException("Sorted queries are not supported"
+ " by SimpleUnsortedUnscoredLuceneMultiSearcher");
}
if (queryParams2.needsScores() && queryParams2.limitLong() > 0) {
throw new UnsupportedOperationException("Scored queries are not supported"
+ " by SimpleUnsortedUnscoredLuceneMultiSearcher");
}
})
.thenMany(Flux.fromIterable(indexSearchers.shards()))
.flatMap(searcher -> {
var llSearcher = Mono.fromCallable(() -> new LLIndexSearcher(searcher, false, null).send());
return this.collect(llSearcher, localQueryParams, keyFieldName, transformer);
})
.collectList()
.map(results -> {
List<LuceneSearchResult> resultsToDrop = new ArrayList<>(results.size());
List<Flux<LLKeyScore>> resultsFluxes = new ArrayList<>(results.size());
boolean exactTotalHitsCount = true;
long totalHitsCountValue = 0;
for (LuceneSearchResult result : results) {
resultsToDrop.add(result);
resultsFluxes.add(result.results());
exactTotalHitsCount &= result.totalHitsCount().exact();
totalHitsCountValue += result.totalHitsCount().value();
}
var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount);
Flux<LLKeyScore> mergedFluxes = Flux
.merge(resultsFluxes)
.skip(queryParams2.offsetLong())
.take(queryParams2.limitLong(), true);
return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> {
for (LuceneSearchResult luceneSearchResult : resultsToDrop) {
luceneSearchResult.close();
}
indexSearchers.close();
});
});
}
);
},
false
);
}
private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) {
return new LocalQueryParams(queryParams.query(),
0L,
queryParams.offsetLong() + queryParams.limitLong(),
queryParams.pageLimits(),
queryParams.minCompetitiveScore(),
queryParams.sort(),
queryParams.complete()
);
}
@Override
public Mono<LuceneSearchResult> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
return Mono
.usingWhen(
indexSearcherMono,
indexSearcher -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = transformer.transform(Mono
.fromSupplier(() -> new TransformerInput(LLIndexSearchers.unsharded(indexSearcher), queryParams)));
}
return queryParamsMono.flatMap(queryParams2 -> Mono.fromCallable(() -> {
try (var is = indexSearcher.receive()) {
LLUtils.ensureBlocking();
return is.getIndexSearcher().count(queryParams2.query());
}
}).subscribeOn(Schedulers.boundedElastic()));
},
is -> Mono.empty()
)
.map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), null))
.doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
}
@Override
public String getName() {
return "count";
}
}

View File

@ -17,10 +17,9 @@ import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher; import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher; import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.CountLocalSearcher; import it.cavallium.dbengine.lucene.searcher.CountMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.LocalSearcher; import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
import it.cavallium.dbengine.lucene.searcher.MultiSearcher; import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredSimpleMultiSearcher;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -137,8 +136,8 @@ public class TestLuceneIndex {
index.updateDocument("test-key-14", "2999").block(); index.updateDocument("test-key-14", "2999").block();
index.updateDocument("test-key-15", "3902").block(); index.updateDocument("test-key-15", "3902").block();
Flux.range(1, 1000).concatMap(i -> index.updateDocument("test-key-" + (15 + i), "" + i)).blockLast(); Flux.range(1, 1000).concatMap(i -> index.updateDocument("test-key-" + (15 + i), "" + i)).blockLast();
tempDb.swappableLuceneSearcher().setSingle(new CountLocalSearcher()); tempDb.swappableLuceneSearcher().setSingle(new CountMultiSearcher());
tempDb.swappableLuceneSearcher().setMulti(new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher())); tempDb.swappableLuceneSearcher().setMulti(new CountMultiSearcher());
assertCount(index, 1000 + 15); assertCount(index, 1000 + 15);
if (customSearcher != null) { if (customSearcher != null) {
tempDb.swappableLuceneSearcher().setSingle(customSearcher); tempDb.swappableLuceneSearcher().setSingle(customSearcher);

View File

@ -31,14 +31,13 @@ import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher; import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher; import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.CountLocalSearcher; import it.cavallium.dbengine.lucene.searcher.CountMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.LocalSearcher; import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
import it.cavallium.dbengine.lucene.searcher.MultiSearcher; import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
import it.cavallium.dbengine.lucene.searcher.OfficialSearcher; import it.cavallium.dbengine.lucene.searcher.OfficialSearcher;
import it.cavallium.dbengine.lucene.searcher.ScoredPagedMultiSearcher; import it.cavallium.dbengine.lucene.searcher.ScoredPagedMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.PagedLocalSearcher; import it.cavallium.dbengine.lucene.searcher.PagedLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.SortedScoredFullMultiSearcher; import it.cavallium.dbengine.lucene.searcher.SortedScoredFullMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredSimpleMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.UnsortedScoredFullMultiSearcher; import it.cavallium.dbengine.lucene.searcher.UnsortedScoredFullMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredStreamingMultiSearcher; import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredStreamingMultiSearcher;
import java.io.IOException; import java.io.IOException;
@ -124,8 +123,8 @@ public class TestLuceneSearches {
.flatMap(entry -> index.updateDocument(entry.getKey(), entry.getValue())) .flatMap(entry -> index.updateDocument(entry.getKey(), entry.getValue()))
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.blockLast(); .blockLast();
tempDb.swappableLuceneSearcher().setSingle(new CountLocalSearcher()); tempDb.swappableLuceneSearcher().setSingle(new CountMultiSearcher());
tempDb.swappableLuceneSearcher().setMulti(new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher())); tempDb.swappableLuceneSearcher().setMulti(new CountMultiSearcher());
assertCount(index, 1000 + 15); assertCount(index, 1000 + 15);
if (shards) { if (shards) {
multiIndex = index; multiIndex = index;
@ -155,7 +154,7 @@ public class TestLuceneSearches {
return Flux.push(sink -> { return Flux.push(sink -> {
if (info.shard()) { if (info.shard()) {
if (info.onlyCount()) { if (info.onlyCount()) {
sink.next(new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher())); sink.next(new CountMultiSearcher());
} else { } else {
sink.next(new ScoredPagedMultiSearcher()); sink.next(new ScoredPagedMultiSearcher());
if (info.sorted() && !info.sortedByScore()) { if (info.sorted() && !info.sortedByScore()) {
@ -171,7 +170,7 @@ public class TestLuceneSearches {
sink.next(new AdaptiveMultiSearcher(ENV)); sink.next(new AdaptiveMultiSearcher(ENV));
} else { } else {
if (info.onlyCount()) { if (info.onlyCount()) {
sink.next(new CountLocalSearcher()); sink.next(new CountMultiSearcher());
} else { } else {
sink.next(new PagedLocalSearcher()); sink.next(new PagedLocalSearcher());
} }

View File

@ -1,4 +1,4 @@
package it.cavallium.dbengine.lucene.searcher; package it.cavallium.dbengine;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
@ -7,7 +7,12 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult;
import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;