diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java index e9becba..ea3a6ac 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java @@ -1,8 +1,12 @@ package it.cavallium.dbengine.lucene.searcher; import io.net5.buffer.api.Send; +import io.net5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.database.disk.LLIndexSearchers.UnshardedIndexSearchers; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import org.apache.lucene.search.IndexSearcher; import reactor.core.publisher.Mono; @@ -12,12 +16,29 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher { private static final LuceneLocalSearcher countSearcher = new CountLuceneLocalSearcher(); - //todo: detect transformed query params, not input query params! @Override public Mono> collect(Mono> indexSearcher, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { + Mono> indexSearchersMono = indexSearcher + .map(LLIndexSearchers::unsharded) + .map(ResourceSupport::send); + + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + return transformedCollect(indexSearcher, queryParams, keyFieldName, transformer); + } else { + return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer + .transform(Mono.fromCallable(() -> new TransformerInput(indexSearchers, queryParams))) + .flatMap(queryParams2 -> this + .transformedCollect(indexSearcher, queryParams2, keyFieldName, LLSearchTransformer.NO_TRANSFORMATION)), + true); + } + } + public Mono> transformedCollect(Mono> indexSearcher, + LocalQueryParams queryParams, + String keyFieldName, + LLSearchTransformer transformer) { if (queryParams.limit() == 0) { return countSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer); } else { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java index 132218f..d6b65df 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java @@ -1,42 +1,60 @@ package it.cavallium.dbengine.lucene.searcher; import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import reactor.core.publisher.Mono; public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher { - private static final LuceneMultiSearcher countLuceneMultiSearcher + private static final LuceneMultiSearcher count = new SimpleUnsortedUnscoredLuceneMultiSearcher(new CountLuceneLocalSearcher()); - private static final LuceneMultiSearcher scoredSimpleLuceneShardSearcher + private static final LuceneMultiSearcher scoredSimple = new ScoredSimpleLuceneShardSearcher(); - private static final LuceneMultiSearcher unsortedUnscoredPagedLuceneMultiSearcher + private static final LuceneMultiSearcher unsortedUnscoredPaged = new SimpleUnsortedUnscoredLuceneMultiSearcher(new SimpleLuceneLocalSearcher()); - private static final LuceneMultiSearcher unsortedUnscoredContinuousLuceneMultiSearcher + private static final LuceneMultiSearcher unsortedUnscoredContinuous = new UnsortedUnscoredContinuousLuceneMultiSearcher(); - //todo: detect transformed query params, not input query params! @Override public Mono> collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { - if (queryParams.limit() == 0) { - return countLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); - } else if (queryParams.isSorted() || queryParams.isScored()) { - return scoredSimpleLuceneShardSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + return transformedCollectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } else { - if (((long) queryParams.offset() + (long) queryParams.limit()) <= (long) queryParams.pageLimits().getPageLimit(0) - || transformer != null) { - // Run single-page searches using the paged multi searcher - return unsortedUnscoredPagedLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); - } else { - // Run large/unbounded searches using the continuous multi searcher - return unsortedUnscoredContinuousLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); - } + return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer + .transform(Mono.fromCallable(() -> new TransformerInput(indexSearchers, queryParams))) + .flatMap(queryParams2 -> this + .transformedCollectMulti(indexSearchersMono, queryParams2, keyFieldName, LLSearchTransformer.NO_TRANSFORMATION)), + true); } } + + public Mono> transformedCollectMulti(Mono> indexSearchersMono, + LocalQueryParams queryParams, + String keyFieldName, + LLSearchTransformer transformer) { + // offset + limit + long realLimit = ((long) queryParams.offset() + (long) queryParams.limit()); + + return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { + if (queryParams.limit() == 0) { + return count.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + } else if (queryParams.isSorted() || queryParams.isScored()) { + return scoredSimple.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + } else if (realLimit <= (long) queryParams.pageLimits().getPageLimit(0)) { + // Run single-page searches using the paged multi searcher + return unsortedUnscoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + } else { + // Run large/unbounded searches using the continuous multi searcher + return unsortedUnscoredContinuous.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + } + }, true); + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java index 7229db3..6342eef 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java @@ -22,9 +22,13 @@ public class CountLuceneLocalSearcher implements LuceneLocalSearcher { .usingWhen( indexSearcherMono, indexSearcher -> { - var queryParamsMono = transformer - .transform(Mono.fromSupplier(() -> new TransformerInput(LLIndexSearchers.unsharded(indexSearcher), - queryParams))); + Mono queryParamsMono; + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = transformer.transform(Mono + .fromSupplier(() -> new TransformerInput(LLIndexSearchers.unsharded(indexSearcher), queryParams))); + } return queryParamsMono.flatMap(queryParams2 -> Mono.fromCallable(() -> { try (var is = indexSearcher.receive()) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java index 73cb506..5001683 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java @@ -10,6 +10,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLLocalGroupedReactiveRocksIterator; import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -35,20 +36,31 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher { LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { - Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); - PaginationInfo paginationInfo = getPaginationInfo(queryParams); + Mono queryParamsMono; + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono + .fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true); + } - return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this - // Search first page results - .searchFirstPage(indexSearchers.shards(), queryParams, paginationInfo) - // Compute the results of the first page - .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers, - keyFieldName, queryParams)) - // Compute other results - .map(firstResult -> this.computeOtherResults(firstResult, indexSearchers.shards(), queryParams, keyFieldName, indexSearchers::close)) - // Ensure that one LuceneSearchResult is always returned - .single(), - false); + return queryParamsMono.flatMap(queryParams2 -> { + Objects.requireNonNull(queryParams2.scoreMode(), "ScoreMode must not be null"); + PaginationInfo paginationInfo = getPaginationInfo(queryParams2); + + return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this + // Search first page results + .searchFirstPage(indexSearchers.shards(), queryParams2, paginationInfo) + // Compute the results of the first page + .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers, + keyFieldName, queryParams2)) + // Compute other results + .map(firstResult -> this.computeOtherResults(firstResult, indexSearchers.shards(), + queryParams2, keyFieldName, indexSearchers::close)) + // Ensure that one LuceneSearchResult is always returned + .single(), + false); + }); } private Sort getSort(LocalQueryParams queryParams) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java index 2aacdf7..6f8167f 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java @@ -5,6 +5,7 @@ import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LI import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT; import io.net5.buffer.api.Send; +import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; @@ -36,13 +37,18 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); PaginationInfo paginationInfo = getPaginationInfo(queryParams); - var indexSearchersMono = indexSearcherMono.map(LLIndexSearchers::unsharded); + var indexSearchersMono = indexSearcherMono.map(LLIndexSearchers::unsharded).map(ResourceSupport::send); - return LLUtils.usingResource(indexSearchersMono, indexSearchers -> { - var queryParamsMono = transformer - .transform(Mono.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))); + return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { + Mono queryParamsMono; + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = transformer.transform(Mono + .fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))); + } - return queryParamsMono.flatMap(queryParams2 -> this + return queryParamsMono.flatMap(queryParams2 -> this // Search first page results .searchFirstPage(indexSearchers.shards(), queryParams2, paginationInfo) // Compute the results of the first page diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java index f1be4e4..6b8bea3 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java @@ -29,9 +29,13 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { - var queryParamsMono = transformer - .transform(Mono.fromSupplier(() -> new TransformerInput(indexSearchers, - queryParams))); + Mono queryParamsMono; + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = transformer.transform(Mono + .fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))); + } return queryParamsMono.flatMap(queryParams2 -> { var localQueryParams = getLocalQueryParams(queryParams2); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java index 2e78955..378b7ea 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java @@ -7,6 +7,7 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.ReactiveCollectorManager; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; @@ -31,74 +32,76 @@ public class UnsortedUnscoredContinuousLuceneMultiSearcher implements LuceneMult ); private static final Supplier> QUEUE_SUPPLIER = Queues.get(1024); - //todo: Support transformers @Override public Mono> collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { - var indexSearchersSendResource = Mono - .fromRunnable(() -> { - LLUtils.ensureBlocking(); - if (transformer != null) { - throw new UnsupportedOperationException("Transformers are not supported" - + " by UnsortedUnscoredContinuousLuceneMultiSearcher"); - } - if (queryParams.isSorted() && queryParams.limit() > 0) { - throw new UnsupportedOperationException("Sorted queries are not supported" - + " by UnsortedUnscoredContinuousLuceneMultiSearcher"); - } - if (queryParams.isScored() && queryParams.limit() > 0) { - throw new UnsupportedOperationException("Scored queries are not supported" - + " by UnsortedUnscoredContinuousLuceneMultiSearcher"); - } - }) - .then(indexSearchersMono); - var localQueryParams = getLocalQueryParams(queryParams); - return LLUtils.usingSendResource(indexSearchersSendResource, - indexSearchers -> Mono.fromCallable(() -> { - LLUtils.ensureBlocking(); + return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { + Mono queryParamsMono; + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = transformer.transform(Mono + .fromCallable(() -> new TransformerInput(indexSearchers, queryParams))); + } - Many scoreDocsSink = Sinks.many().unicast().onBackpressureBuffer(QUEUE_SUPPLIER.get()); + return queryParamsMono + .flatMap(queryParams2 -> { + var localQueryParams = getLocalQueryParams(queryParams2); + if (queryParams2.isSorted() && queryParams2.limit() > 0) { + return Mono.error(new UnsupportedOperationException("Sorted queries are not supported" + + " by UnsortedUnscoredContinuousLuceneMultiSearcher")); + } + if (queryParams2.isScored() && queryParams2.limit() > 0) { + return Mono.error(new UnsupportedOperationException("Scored queries are not supported" + + " by UnsortedUnscoredContinuousLuceneMultiSearcher")); + } + return Mono.fromCallable(() -> { + LLUtils.ensureBlocking(); - var cm = new ReactiveCollectorManager(scoreDocsSink); + Many scoreDocsSink = Sinks.many().unicast().onBackpressureBuffer(QUEUE_SUPPLIER.get()); - AtomicInteger runningTasks = new AtomicInteger(0); - var shards = indexSearchers.shards(); + var cm = new ReactiveCollectorManager(scoreDocsSink); - runningTasks.addAndGet(shards.size()); - int mutableShardIndex = 0; - for (IndexSearcher shard : shards) { - int shardIndex = mutableShardIndex++; - UNSCORED_UNSORTED_EXECUTOR.schedule(() -> { - try { - var collector = cm.newCollector(); - collector.setShardIndex(shardIndex); - shard.search(localQueryParams.query(), collector); - } catch (Throwable e) { - while (scoreDocsSink.tryEmitError(e) == EmitResult.FAIL_NON_SERIALIZED) { - LockSupport.parkNanos(10); - } - } finally { - if (runningTasks.decrementAndGet() <= 0) { - while (scoreDocsSink.tryEmitComplete() == EmitResult.FAIL_NON_SERIALIZED) { - LockSupport.parkNanos(10); + AtomicInteger runningTasks = new AtomicInteger(0); + var shards = indexSearchers.shards(); + + runningTasks.addAndGet(shards.size()); + int mutableShardIndex = 0; + for (IndexSearcher shard : shards) { + int shardIndex = mutableShardIndex++; + UNSCORED_UNSORTED_EXECUTOR.schedule(() -> { + try { + var collector = cm.newCollector(); + collector.setShardIndex(shardIndex); + shard.search(localQueryParams.query(), collector); + } catch (Throwable e) { + while (scoreDocsSink.tryEmitError(e) == EmitResult.FAIL_NON_SERIALIZED) { + LockSupport.parkNanos(10); + } + } finally { + if (runningTasks.decrementAndGet() <= 0) { + while (scoreDocsSink.tryEmitComplete() == EmitResult.FAIL_NON_SERIALIZED) { + LockSupport.parkNanos(10); + } + } } - } + }); } + + Flux resultsFlux = LuceneUtils.convertHits(scoreDocsSink.asFlux(), shards, keyFieldName, false); + + var totalHitsCount = new TotalHitsCount(0, false); + Flux mergedFluxes = resultsFlux + .skip(queryParams2.offset()) + .take(queryParams2.limit(), true); + + return new LuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers::close).send(); }); - } - - Flux resultsFlux = LuceneUtils.convertHits(scoreDocsSink.asFlux(), shards, keyFieldName, false); - - var totalHitsCount = new TotalHitsCount(0, false); - Flux mergedFluxes = resultsFlux - .skip(queryParams.offset()) - .take(queryParams.limit(), true); - - return new LuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers::close).send(); - }), false); + }); + }, false); } private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) {