Fast implementation of query transformers
This commit is contained in:
parent
30a14a4aae
commit
83e98ebce8
@ -1,8 +1,12 @@
|
||||
package it.cavallium.dbengine.lucene.searcher;
|
||||
|
||||
import io.net5.buffer.api.Send;
|
||||
import io.net5.buffer.api.internal.ResourceSupport;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearchers.UnshardedIndexSearchers;
|
||||
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@ -12,12 +16,29 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher {
|
||||
|
||||
private static final LuceneLocalSearcher countSearcher = new CountLuceneLocalSearcher();
|
||||
|
||||
//todo: detect transformed query params, not input query params!
|
||||
@Override
|
||||
public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexSearcher>> indexSearcher,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName,
|
||||
LLSearchTransformer transformer) {
|
||||
Mono<Send<LLIndexSearchers>> indexSearchersMono = indexSearcher
|
||||
.map(LLIndexSearchers::unsharded)
|
||||
.map(ResourceSupport::send);
|
||||
|
||||
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
|
||||
return transformedCollect(indexSearcher, queryParams, keyFieldName, transformer);
|
||||
} else {
|
||||
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer
|
||||
.transform(Mono.fromCallable(() -> new TransformerInput(indexSearchers, queryParams)))
|
||||
.flatMap(queryParams2 -> this
|
||||
.transformedCollect(indexSearcher, queryParams2, keyFieldName, LLSearchTransformer.NO_TRANSFORMATION)),
|
||||
true);
|
||||
}
|
||||
}
|
||||
public Mono<Send<LuceneSearchResult>> transformedCollect(Mono<Send<LLIndexSearcher>> indexSearcher,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName,
|
||||
LLSearchTransformer transformer) {
|
||||
if (queryParams.limit() == 0) {
|
||||
return countSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer);
|
||||
} else {
|
||||
|
@ -1,42 +1,60 @@
|
||||
package it.cavallium.dbengine.lucene.searcher;
|
||||
|
||||
import io.net5.buffer.api.Send;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
|
||||
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher {
|
||||
|
||||
private static final LuceneMultiSearcher countLuceneMultiSearcher
|
||||
private static final LuceneMultiSearcher count
|
||||
= new SimpleUnsortedUnscoredLuceneMultiSearcher(new CountLuceneLocalSearcher());
|
||||
|
||||
private static final LuceneMultiSearcher scoredSimpleLuceneShardSearcher
|
||||
private static final LuceneMultiSearcher scoredSimple
|
||||
= new ScoredSimpleLuceneShardSearcher();
|
||||
|
||||
private static final LuceneMultiSearcher unsortedUnscoredPagedLuceneMultiSearcher
|
||||
private static final LuceneMultiSearcher unsortedUnscoredPaged
|
||||
= new SimpleUnsortedUnscoredLuceneMultiSearcher(new SimpleLuceneLocalSearcher());
|
||||
|
||||
private static final LuceneMultiSearcher unsortedUnscoredContinuousLuceneMultiSearcher
|
||||
private static final LuceneMultiSearcher unsortedUnscoredContinuous
|
||||
= new UnsortedUnscoredContinuousLuceneMultiSearcher();
|
||||
|
||||
//todo: detect transformed query params, not input query params!
|
||||
@Override
|
||||
public Mono<Send<LuceneSearchResult>> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName,
|
||||
LLSearchTransformer transformer) {
|
||||
if (queryParams.limit() == 0) {
|
||||
return countLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
} else if (queryParams.isSorted() || queryParams.isScored()) {
|
||||
return scoredSimpleLuceneShardSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
|
||||
return transformedCollectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
} else {
|
||||
if (((long) queryParams.offset() + (long) queryParams.limit()) <= (long) queryParams.pageLimits().getPageLimit(0)
|
||||
|| transformer != null) {
|
||||
// Run single-page searches using the paged multi searcher
|
||||
return unsortedUnscoredPagedLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
} else {
|
||||
// Run large/unbounded searches using the continuous multi searcher
|
||||
return unsortedUnscoredContinuousLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
}
|
||||
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer
|
||||
.transform(Mono.fromCallable(() -> new TransformerInput(indexSearchers, queryParams)))
|
||||
.flatMap(queryParams2 -> this
|
||||
.transformedCollectMulti(indexSearchersMono, queryParams2, keyFieldName, LLSearchTransformer.NO_TRANSFORMATION)),
|
||||
true);
|
||||
}
|
||||
}
|
||||
|
||||
public Mono<Send<LuceneSearchResult>> transformedCollectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName,
|
||||
LLSearchTransformer transformer) {
|
||||
// offset + limit
|
||||
long realLimit = ((long) queryParams.offset() + (long) queryParams.limit());
|
||||
|
||||
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> {
|
||||
if (queryParams.limit() == 0) {
|
||||
return count.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
} else if (queryParams.isSorted() || queryParams.isScored()) {
|
||||
return scoredSimple.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
} else if (realLimit <= (long) queryParams.pageLimits().getPageLimit(0)) {
|
||||
// Run single-page searches using the paged multi searcher
|
||||
return unsortedUnscoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
} else {
|
||||
// Run large/unbounded searches using the continuous multi searcher
|
||||
return unsortedUnscoredContinuous.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
}
|
||||
}, true);
|
||||
}
|
||||
}
|
||||
|
@ -22,9 +22,13 @@ public class CountLuceneLocalSearcher implements LuceneLocalSearcher {
|
||||
.usingWhen(
|
||||
indexSearcherMono,
|
||||
indexSearcher -> {
|
||||
var queryParamsMono = transformer
|
||||
.transform(Mono.fromSupplier(() -> new TransformerInput(LLIndexSearchers.unsharded(indexSearcher),
|
||||
queryParams)));
|
||||
Mono<LocalQueryParams> queryParamsMono;
|
||||
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
|
||||
queryParamsMono = Mono.just(queryParams);
|
||||
} else {
|
||||
queryParamsMono = transformer.transform(Mono
|
||||
.fromSupplier(() -> new TransformerInput(LLIndexSearchers.unsharded(indexSearcher), queryParams)));
|
||||
}
|
||||
|
||||
return queryParamsMono.flatMap(queryParams2 -> Mono.fromCallable(() -> {
|
||||
try (var is = indexSearcher.receive()) {
|
||||
|
@ -10,6 +10,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearcher;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
|
||||
import it.cavallium.dbengine.database.disk.LLLocalGroupedReactiveRocksIterator;
|
||||
import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
@ -35,20 +36,31 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher {
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName,
|
||||
LLSearchTransformer transformer) {
|
||||
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
|
||||
PaginationInfo paginationInfo = getPaginationInfo(queryParams);
|
||||
Mono<LocalQueryParams> queryParamsMono;
|
||||
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
|
||||
queryParamsMono = Mono.just(queryParams);
|
||||
} else {
|
||||
queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono
|
||||
.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true);
|
||||
}
|
||||
|
||||
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this
|
||||
// Search first page results
|
||||
.searchFirstPage(indexSearchers.shards(), queryParams, paginationInfo)
|
||||
// Compute the results of the first page
|
||||
.transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers,
|
||||
keyFieldName, queryParams))
|
||||
// Compute other results
|
||||
.map(firstResult -> this.computeOtherResults(firstResult, indexSearchers.shards(), queryParams, keyFieldName, indexSearchers::close))
|
||||
// Ensure that one LuceneSearchResult is always returned
|
||||
.single(),
|
||||
false);
|
||||
return queryParamsMono.flatMap(queryParams2 -> {
|
||||
Objects.requireNonNull(queryParams2.scoreMode(), "ScoreMode must not be null");
|
||||
PaginationInfo paginationInfo = getPaginationInfo(queryParams2);
|
||||
|
||||
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this
|
||||
// Search first page results
|
||||
.searchFirstPage(indexSearchers.shards(), queryParams2, paginationInfo)
|
||||
// Compute the results of the first page
|
||||
.transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers,
|
||||
keyFieldName, queryParams2))
|
||||
// Compute other results
|
||||
.map(firstResult -> this.computeOtherResults(firstResult, indexSearchers.shards(),
|
||||
queryParams2, keyFieldName, indexSearchers::close))
|
||||
// Ensure that one LuceneSearchResult is always returned
|
||||
.single(),
|
||||
false);
|
||||
});
|
||||
}
|
||||
|
||||
private Sort getSort(LocalQueryParams queryParams) {
|
||||
|
@ -5,6 +5,7 @@ import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LI
|
||||
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
|
||||
|
||||
import io.net5.buffer.api.Send;
|
||||
import io.net5.buffer.api.internal.ResourceSupport;
|
||||
import it.cavallium.dbengine.database.LLKeyScore;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
|
||||
@ -36,13 +37,18 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
|
||||
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
|
||||
PaginationInfo paginationInfo = getPaginationInfo(queryParams);
|
||||
|
||||
var indexSearchersMono = indexSearcherMono.map(LLIndexSearchers::unsharded);
|
||||
var indexSearchersMono = indexSearcherMono.map(LLIndexSearchers::unsharded).map(ResourceSupport::send);
|
||||
|
||||
return LLUtils.usingResource(indexSearchersMono, indexSearchers -> {
|
||||
var queryParamsMono = transformer
|
||||
.transform(Mono.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams)));
|
||||
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> {
|
||||
Mono<LocalQueryParams> queryParamsMono;
|
||||
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
|
||||
queryParamsMono = Mono.just(queryParams);
|
||||
} else {
|
||||
queryParamsMono = transformer.transform(Mono
|
||||
.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams)));
|
||||
}
|
||||
|
||||
return queryParamsMono.flatMap(queryParams2 -> this
|
||||
return queryParamsMono.flatMap(queryParams2 -> this
|
||||
// Search first page results
|
||||
.searchFirstPage(indexSearchers.shards(), queryParams2, paginationInfo)
|
||||
// Compute the results of the first page
|
||||
|
@ -29,9 +29,13 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea
|
||||
|
||||
return LLUtils.usingSendResource(indexSearchersMono,
|
||||
indexSearchers -> {
|
||||
var queryParamsMono = transformer
|
||||
.transform(Mono.fromSupplier(() -> new TransformerInput(indexSearchers,
|
||||
queryParams)));
|
||||
Mono<LocalQueryParams> queryParamsMono;
|
||||
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
|
||||
queryParamsMono = Mono.just(queryParams);
|
||||
} else {
|
||||
queryParamsMono = transformer.transform(Mono
|
||||
.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams)));
|
||||
}
|
||||
|
||||
return queryParamsMono.flatMap(queryParams2 -> {
|
||||
var localQueryParams = getLocalQueryParams(queryParams2);
|
||||
|
@ -7,6 +7,7 @@ import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
|
||||
import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||
import it.cavallium.dbengine.lucene.collector.ReactiveCollectorManager;
|
||||
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
@ -31,74 +32,76 @@ public class UnsortedUnscoredContinuousLuceneMultiSearcher implements LuceneMult
|
||||
);
|
||||
private static final Supplier<Queue<ScoreDoc>> QUEUE_SUPPLIER = Queues.get(1024);
|
||||
|
||||
//todo: Support transformers
|
||||
@Override
|
||||
public Mono<Send<LuceneSearchResult>> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName,
|
||||
LLSearchTransformer transformer) {
|
||||
var indexSearchersSendResource = Mono
|
||||
.fromRunnable(() -> {
|
||||
LLUtils.ensureBlocking();
|
||||
if (transformer != null) {
|
||||
throw new UnsupportedOperationException("Transformers are not supported"
|
||||
+ " by UnsortedUnscoredContinuousLuceneMultiSearcher");
|
||||
}
|
||||
if (queryParams.isSorted() && queryParams.limit() > 0) {
|
||||
throw new UnsupportedOperationException("Sorted queries are not supported"
|
||||
+ " by UnsortedUnscoredContinuousLuceneMultiSearcher");
|
||||
}
|
||||
if (queryParams.isScored() && queryParams.limit() > 0) {
|
||||
throw new UnsupportedOperationException("Scored queries are not supported"
|
||||
+ " by UnsortedUnscoredContinuousLuceneMultiSearcher");
|
||||
}
|
||||
})
|
||||
.then(indexSearchersMono);
|
||||
var localQueryParams = getLocalQueryParams(queryParams);
|
||||
|
||||
return LLUtils.usingSendResource(indexSearchersSendResource,
|
||||
indexSearchers -> Mono.fromCallable(() -> {
|
||||
LLUtils.ensureBlocking();
|
||||
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> {
|
||||
Mono<LocalQueryParams> queryParamsMono;
|
||||
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
|
||||
queryParamsMono = Mono.just(queryParams);
|
||||
} else {
|
||||
queryParamsMono = transformer.transform(Mono
|
||||
.fromCallable(() -> new TransformerInput(indexSearchers, queryParams)));
|
||||
}
|
||||
|
||||
Many<ScoreDoc> scoreDocsSink = Sinks.many().unicast().onBackpressureBuffer(QUEUE_SUPPLIER.get());
|
||||
return queryParamsMono
|
||||
.flatMap(queryParams2 -> {
|
||||
var localQueryParams = getLocalQueryParams(queryParams2);
|
||||
if (queryParams2.isSorted() && queryParams2.limit() > 0) {
|
||||
return Mono.error(new UnsupportedOperationException("Sorted queries are not supported"
|
||||
+ " by UnsortedUnscoredContinuousLuceneMultiSearcher"));
|
||||
}
|
||||
if (queryParams2.isScored() && queryParams2.limit() > 0) {
|
||||
return Mono.error(new UnsupportedOperationException("Scored queries are not supported"
|
||||
+ " by UnsortedUnscoredContinuousLuceneMultiSearcher"));
|
||||
}
|
||||
return Mono.fromCallable(() -> {
|
||||
LLUtils.ensureBlocking();
|
||||
|
||||
var cm = new ReactiveCollectorManager(scoreDocsSink);
|
||||
Many<ScoreDoc> scoreDocsSink = Sinks.many().unicast().onBackpressureBuffer(QUEUE_SUPPLIER.get());
|
||||
|
||||
AtomicInteger runningTasks = new AtomicInteger(0);
|
||||
var shards = indexSearchers.shards();
|
||||
var cm = new ReactiveCollectorManager(scoreDocsSink);
|
||||
|
||||
runningTasks.addAndGet(shards.size());
|
||||
int mutableShardIndex = 0;
|
||||
for (IndexSearcher shard : shards) {
|
||||
int shardIndex = mutableShardIndex++;
|
||||
UNSCORED_UNSORTED_EXECUTOR.schedule(() -> {
|
||||
try {
|
||||
var collector = cm.newCollector();
|
||||
collector.setShardIndex(shardIndex);
|
||||
shard.search(localQueryParams.query(), collector);
|
||||
} catch (Throwable e) {
|
||||
while (scoreDocsSink.tryEmitError(e) == EmitResult.FAIL_NON_SERIALIZED) {
|
||||
LockSupport.parkNanos(10);
|
||||
}
|
||||
} finally {
|
||||
if (runningTasks.decrementAndGet() <= 0) {
|
||||
while (scoreDocsSink.tryEmitComplete() == EmitResult.FAIL_NON_SERIALIZED) {
|
||||
LockSupport.parkNanos(10);
|
||||
AtomicInteger runningTasks = new AtomicInteger(0);
|
||||
var shards = indexSearchers.shards();
|
||||
|
||||
runningTasks.addAndGet(shards.size());
|
||||
int mutableShardIndex = 0;
|
||||
for (IndexSearcher shard : shards) {
|
||||
int shardIndex = mutableShardIndex++;
|
||||
UNSCORED_UNSORTED_EXECUTOR.schedule(() -> {
|
||||
try {
|
||||
var collector = cm.newCollector();
|
||||
collector.setShardIndex(shardIndex);
|
||||
shard.search(localQueryParams.query(), collector);
|
||||
} catch (Throwable e) {
|
||||
while (scoreDocsSink.tryEmitError(e) == EmitResult.FAIL_NON_SERIALIZED) {
|
||||
LockSupport.parkNanos(10);
|
||||
}
|
||||
} finally {
|
||||
if (runningTasks.decrementAndGet() <= 0) {
|
||||
while (scoreDocsSink.tryEmitComplete() == EmitResult.FAIL_NON_SERIALIZED) {
|
||||
LockSupport.parkNanos(10);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Flux<LLKeyScore> resultsFlux = LuceneUtils.convertHits(scoreDocsSink.asFlux(), shards, keyFieldName, false);
|
||||
|
||||
var totalHitsCount = new TotalHitsCount(0, false);
|
||||
Flux<LLKeyScore> mergedFluxes = resultsFlux
|
||||
.skip(queryParams2.offset())
|
||||
.take(queryParams2.limit(), true);
|
||||
|
||||
return new LuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers::close).send();
|
||||
});
|
||||
}
|
||||
|
||||
Flux<LLKeyScore> resultsFlux = LuceneUtils.convertHits(scoreDocsSink.asFlux(), shards, keyFieldName, false);
|
||||
|
||||
var totalHitsCount = new TotalHitsCount(0, false);
|
||||
Flux<LLKeyScore> mergedFluxes = resultsFlux
|
||||
.skip(queryParams.offset())
|
||||
.take(queryParams.limit(), true);
|
||||
|
||||
return new LuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers::close).send();
|
||||
}), false);
|
||||
});
|
||||
}, false);
|
||||
}
|
||||
|
||||
private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) {
|
||||
|
Loading…
Reference in New Issue
Block a user