From d1963a1d6589de38fa7686924b67056bfd4bf1e6 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 19 Sep 2021 12:01:11 +0200 Subject: [PATCH] Finish refactoring SimpleLuceneLocalSearcher --- .../cavallium/dbengine/database/LLUtils.java | 43 +++ .../lucene/searcher/IndexSearchers.java | 12 +- .../searcher/SimpleLuceneLocalSearcher.java | 249 ++++++++++-------- 3 files changed, 194 insertions(+), 110 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 2a729f4..651567c 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -5,6 +5,7 @@ import com.google.common.primitives.Longs; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.CompositeBuffer; +import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import io.net5.util.IllegalReferenceCountException; import io.net5.util.internal.PlatformDependent; @@ -24,6 +25,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.function.ToIntFunction; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -344,6 +346,47 @@ public class LLUtils { } } + /** + * cleanup resource + * @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful + */ + public static > Mono usingSend(Mono> resourceSupplier, + Function, Mono> resourceClosure, + boolean cleanupOnSuccess) { + return Mono.usingWhen(resourceSupplier, resourceClosure, + r -> { + if (cleanupOnSuccess) { + return Mono.fromRunnable(r::close); + } else { + return Mono.empty(); + } + }, + (r, ex) -> Mono.fromRunnable(r::close), + r -> Mono.fromRunnable(r::close)) + .doOnDiscard(Send.class, Send::close); + } + + /** + * cleanup resource + * @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful + */ + public static , V extends T> Mono usingResource(Mono resourceSupplier, + Function> resourceClosure, + boolean cleanupOnSuccess) { + return Mono.usingWhen(resourceSupplier, resourceClosure, + r -> { + if (cleanupOnSuccess) { + return Mono.fromRunnable(r::close); + } else { + return Mono.empty(); + } + }, + (r, ex) -> Mono.fromRunnable(r::close), + r -> Mono.fromRunnable(r::close)) + .doOnDiscard(Resource.class, Resource::close) + .doOnDiscard(Send.class, Send::close); + } + public static record DirectBuffer(@NotNull Send buffer, @NotNull ByteBuffer byteBuffer) {} @NotNull diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/IndexSearchers.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/IndexSearchers.java index ea0cb68..05925e8 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/IndexSearchers.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/IndexSearchers.java @@ -19,7 +19,7 @@ public interface IndexSearchers extends Resource { return new ShardedIndexSearchers(indexSearchers, d -> {}); } - static IndexSearchers unsharded(LLIndexSearcher indexSearcher) { + static UnshardedIndexSearchers unsharded(Send indexSearcher) { return new UnshardedIndexSearchers(indexSearcher, d -> {}); } @@ -30,9 +30,9 @@ public interface IndexSearchers extends Resource { private LLIndexSearcher indexSearcher; - public UnshardedIndexSearchers(LLIndexSearcher indexSearcher, Drop drop) { + public UnshardedIndexSearchers(Send indexSearcher, Drop drop) { super(new CloseOnDrop(drop)); - this.indexSearcher = indexSearcher; + this.indexSearcher = indexSearcher.receive(); } @Override @@ -46,6 +46,10 @@ public interface IndexSearchers extends Resource { return indexSearcher; } + public LLIndexSearcher shard() { + return this.shard(0); + } + @Override protected RuntimeException createResourceClosedException() { return new IllegalStateException("Closed"); @@ -53,7 +57,7 @@ public interface IndexSearchers extends Resource { @Override protected Owned prepareSend() { - LLIndexSearcher indexSearcher = this.indexSearcher; + Send indexSearcher = this.indexSearcher.send(); this.makeInaccessible(); return drop -> new UnshardedIndexSearchers(indexSearcher, drop); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java index 2030acd..284b006 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java @@ -4,31 +4,23 @@ import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT; import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT; -import io.net5.buffer.api.Buffer; -import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.searcher.IndexSearchers.UnshardedIndexSearchers; import java.io.IOException; import java.util.Arrays; import java.util.Objects; -import java.util.Optional; -import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocsCollector; -import org.apache.lucene.search.TotalHits; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; -import reactor.core.scheduler.Scheduler; +import reactor.core.publisher.SynchronousSink; import reactor.core.scheduler.Schedulers; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuple3; -import reactor.util.function.Tuples; public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { @@ -38,107 +30,152 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { String keyFieldName) { Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); - PaginationInfo paginationInfo; + PaginationInfo paginationInfo = getPaginationInfo(queryParams); + + var indexSearchersMono = indexSearcherMono.map(IndexSearchers::unsharded); + + return LLUtils.usingResource(indexSearchersMono, indexSearchers -> this + // Search first page results + .searchFirstPage(indexSearchers, queryParams, paginationInfo) + // Compute the results of the first page + .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers, + keyFieldName, queryParams)) + // Compute other results + .transform(firstResult -> this.computeOtherResults(firstResult, indexSearchers, queryParams, keyFieldName)) + // Ensure that one LuceneSearchResult is always returned + .single(), + false); + } + + /** + * Get the pagination info + */ + private PaginationInfo getPaginationInfo(LocalQueryParams queryParams) { if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) { - paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true); + return new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true); } else { - paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false); + return new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false); } + } - return indexSearcherMono - .flatMap(indexSearcherToReceive -> { - var indexSearcher = indexSearcherToReceive.receive(); - var indexSearchers = IndexSearchers.unsharded(indexSearcher); - return Mono - .fromCallable(() -> { - LLUtils.ensureBlocking(); - TopDocsCollector firstPageCollector = TopDocsSearcher.getTopDocsCollector( - queryParams.sort(), - LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), - null, - LuceneUtils.totalHitsThreshold(), - !paginationInfo.forceSinglePage(), - queryParams.isScored()); + /** + * Search effectively the raw results of the first page + */ + private Mono searchFirstPage(UnshardedIndexSearchers indexSearchers, + LocalQueryParams queryParams, + PaginationInfo paginationInfo) { + var limit = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()); + var pagination = !paginationInfo.forceSinglePage(); + var resultsOffset = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()); + return Mono + .fromSupplier(() -> new CurrentPageInfo(null, limit, 0)) + .handle((s, sink) -> this.searchPageSync(queryParams, indexSearchers, pagination, resultsOffset, s, sink)); + } - indexSearcher.getIndexSearcher().search(queryParams.query(), firstPageCollector); - return firstPageCollector.topDocs(LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()), - LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()) - ); - }) - .map(firstPageTopDocs -> { - Flux firstPageHitsFlux = LuceneUtils.convertHits(Flux.fromArray(firstPageTopDocs.scoreDocs), - indexSearchers, keyFieldName, true) - .take(queryParams.limit(), true); - - return Tuples.of(Optional.ofNullable(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs)), - LuceneUtils.convertTotalHitsCount(firstPageTopDocs.totalHits), firstPageHitsFlux); - }) - .map(firstResult -> { - var firstPageLastScoreDoc = firstResult.getT1(); - var totalHitsCount = firstResult.getT2(); - var firstPageFlux = firstResult.getT3(); - - - Flux nextHits; - if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { - nextHits = null; - } else { - nextHits = Flux.defer(() -> Flux - .generate( - () -> new CurrentPageInfo(firstPageLastScoreDoc.orElse(null), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), - (s, sink) -> { - LLUtils.ensureBlocking(); - if (s.last() != null && s.remainingLimit() > 0) { - TopDocs pageTopDocs; - try { - TopDocsCollector collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(), - s.currentPageLimit(), s.last(), LuceneUtils.totalHitsThreshold(), true, - queryParams.isScored()); - indexSearcher.getIndexSearcher().search(queryParams.query(), collector); - pageTopDocs = collector.topDocs(); - } catch (IOException e) { - sink.error(e); - return EMPTY_STATUS; - } - var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs); - sink.next(pageTopDocs); - return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1); - } else { - sink.complete(); - return EMPTY_STATUS; - } - }, - s -> {} - ) - .subscribeOn(Schedulers.boundedElastic()) - .flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs)) - .transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers, - keyFieldName, true)) - ); - } - - Flux combinedFlux; - - if (nextHits != null) { - combinedFlux = firstPageFlux - .concatWith(nextHits); - } else { - combinedFlux = firstPageFlux; - } - return new LuceneSearchResult(totalHitsCount, combinedFlux, d -> { - indexSearcher.close(); - indexSearchers.close(); - }).send(); - }) - .doFinally(s -> { - // Close searchers if the search result has not been returned - if (s != SignalType.ON_COMPLETE) { - indexSearcher.close(); - indexSearchers.close(); - } - }); - } + /** + * Search effectively the merged raw results of the next pages + */ + private Flux searchOtherPages(UnshardedIndexSearchers indexSearchers, + LocalQueryParams queryParams, String keyFieldName, CurrentPageInfo secondPageInfo) { + return Flux + .generate( + () -> secondPageInfo, + (s, sink) -> searchPageSync(queryParams, indexSearchers, true, 0, s, sink), + s -> {} ) - .doOnDiscard(Send.class, Send::close); + .subscribeOn(Schedulers.boundedElastic()) + .map(PageData::topDocs) + .flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs)) + .transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers, + keyFieldName, true)); + } + + private static record FirstPageResults(TotalHitsCount totalHitsCount, Flux firstPageHitsFlux, + CurrentPageInfo nextPageInfo) {} + + /** + * Compute the results of the first page, extracting useful data + */ + private Mono computeFirstPageResults(Mono firstPageDataMono, + IndexSearchers indexSearchers, + String keyFieldName, + LocalQueryParams queryParams) { + return firstPageDataMono.map(firstPageData -> { + var totalHitsCount = LuceneUtils.convertTotalHitsCount(firstPageData.topDocs().totalHits); + + Flux firstPageHitsFlux = LuceneUtils.convertHits(Flux.fromArray(firstPageData.topDocs().scoreDocs), + indexSearchers, keyFieldName, true) + .take(queryParams.limit(), true); + + CurrentPageInfo nextPageInfo = firstPageData.nextPageInfo(); + + return new FirstPageResults(totalHitsCount, firstPageHitsFlux, nextPageInfo); + }); + } + + private Mono> computeOtherResults(Mono firstResultMono, + UnshardedIndexSearchers indexSearchers, + LocalQueryParams queryParams, + String keyFieldName) { + return firstResultMono.map(firstResult -> { + var totalHitsCount = firstResult.totalHitsCount(); + var firstPageHitsFlux = firstResult.firstPageHitsFlux(); + var secondPageInfo = firstResult.nextPageInfo(); + + Flux nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo); + + Flux combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux); + return new LuceneSearchResult(totalHitsCount, combinedFlux, d -> indexSearchers.close()).send(); + }); + } + + private static record PageData(TopDocs topDocs, CurrentPageInfo nextPageInfo) {} + + /** + * + * @param resultsOffset offset of the resulting topDocs. Useful if you want to + * skip the first n results in the first page + */ + private CurrentPageInfo searchPageSync(LocalQueryParams queryParams, + UnshardedIndexSearchers indexSearchers, + boolean allowPagination, + int resultsOffset, + CurrentPageInfo s, + SynchronousSink sink) { + LLUtils.ensureBlocking(); + if (resultsOffset < 0) { + throw new IndexOutOfBoundsException(resultsOffset); + } + if ((s.pageIndex() == 0 || s.last() != null) && s.remainingLimit() > 0) { + TopDocs pageTopDocs; + try { + TopDocsCollector collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(), + s.currentPageLimit(), s.last(), LuceneUtils.totalHitsThreshold(), allowPagination, + queryParams.isScored()); + indexSearchers.shard().getIndexSearcher().search(queryParams.query(), collector); + if (resultsOffset > 0) { + pageTopDocs = collector.topDocs(resultsOffset, s.currentPageLimit()); + } else { + pageTopDocs = collector.topDocs(); + } + } catch (IOException e) { + sink.error(e); + return EMPTY_STATUS; + } + var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs); + long nextRemainingLimit; + if (allowPagination) { + nextRemainingLimit = s.remainingLimit() - s.currentPageLimit(); + } else { + nextRemainingLimit = 0L; + } + var nextPageIndex = s.pageIndex() + 1; + var nextPageInfo = new CurrentPageInfo(pageLastDoc, nextRemainingLimit, nextPageIndex); + sink.next(new PageData(pageTopDocs, nextPageInfo)); + return nextPageInfo; + } else { + sink.complete(); + return EMPTY_STATUS; + } } }