diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java index 20824ea..e8ce26b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java @@ -12,7 +12,7 @@ public class AdaptiveLocalSearcher implements LocalSearcher { private static final LocalSearcher localSearcher = new PagedLocalSearcher(); - private static final LocalSearcher countSearcher = new CountLocalSearcher(); + private static final LocalSearcher countSearcher = new CountMultiSearcher(); @Override public Mono collect(Mono> indexSearcher, diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java index 759195b..bf9fab8 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java @@ -5,22 +5,15 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; -import java.io.Closeable; -import java.io.IOException; import reactor.core.publisher.Mono; public class AdaptiveMultiSearcher implements MultiSearcher { - private static final MultiSearcher count - = new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher()); + private static final MultiSearcher count = new CountMultiSearcher(); private static final MultiSearcher scoredPaged = new ScoredPagedMultiSearcher(); - private static final MultiSearcher unsortedUnscoredPaged - = new UnsortedUnscoredSimpleMultiSearcher(new PagedLocalSearcher()); - - private static final MultiSearcher unsortedUnscoredContinuous - = new UnsortedUnscoredStreamingMultiSearcher(); + private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedUnscoredStreamingMultiSearcher(); private final UnsortedScoredFullMultiSearcher unsortedScoredFull; @@ -75,9 +68,6 @@ public class AdaptiveMultiSearcher implements MultiSearcher { return unsortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } } - } else if (realLimit <= maxAllowedInMemoryLimit) { - // Run single-page searches using the paged multi searcher - return unsortedUnscoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } else { // Run large/unbounded searches using the continuous multi searcher return unsortedUnscoredContinuous.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLocalSearcher.java deleted file mode 100644 index 68a3d75..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLocalSearcher.java +++ /dev/null @@ -1,51 +0,0 @@ -package it.cavallium.dbengine.lucene.searcher; - -import io.net5.buffer.api.Resource; -import io.net5.buffer.api.Send; -import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; -import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.disk.LLIndexSearcher; -import it.cavallium.dbengine.database.disk.LLIndexSearchers; -import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -public class CountLocalSearcher implements LocalSearcher { - - @Override - public Mono collect(Mono> indexSearcherMono, - LocalQueryParams queryParams, - String keyFieldName, - LLSearchTransformer transformer) { - return Mono - .usingWhen( - indexSearcherMono, - indexSearcher -> { - Mono queryParamsMono; - if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = transformer.transform(Mono - .fromSupplier(() -> new TransformerInput(LLIndexSearchers.unsharded(indexSearcher), queryParams))); - } - - return queryParamsMono.flatMap(queryParams2 -> Mono.fromCallable(() -> { - try (var is = indexSearcher.receive()) { - LLUtils.ensureBlocking(); - return is.getIndexSearcher().count(queryParams2.query()); - } - }).subscribeOn(Schedulers.boundedElastic())); - }, - is -> Mono.empty() - ) - .map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), null)) - .doOnDiscard(Send.class, Send::close) - .doOnDiscard(Resource.class, Resource::close); - } - - @Override - public String getName() { - return "count local"; - } -} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java new file mode 100644 index 0000000..0d038c9 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java @@ -0,0 +1,134 @@ +package it.cavallium.dbengine.lucene.searcher; + +import io.net5.buffer.api.Resource; +import io.net5.buffer.api.Send; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexSearcher; +import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; +import java.util.ArrayList; +import java.util.List; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class CountMultiSearcher implements MultiSearcher { + + + @Override + public Mono collectMulti(Mono> indexSearchersMono, + LocalQueryParams queryParams, + String keyFieldName, + LLSearchTransformer transformer) { + + return LLUtils.usingSendResource(indexSearchersMono, + indexSearchers -> { + Mono queryParamsMono; + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = transformer.transform(Mono + .fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))); + } + + return queryParamsMono.flatMap(queryParams2 -> { + var localQueryParams = getLocalQueryParams(queryParams2); + return Mono + .fromRunnable(() -> { + LLUtils.ensureBlocking(); + if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { + throw new UnsupportedOperationException("Sorted queries are not supported" + + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); + } + if (queryParams2.needsScores() && queryParams2.limitLong() > 0) { + throw new UnsupportedOperationException("Scored queries are not supported" + + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); + } + }) + .thenMany(Flux.fromIterable(indexSearchers.shards())) + .flatMap(searcher -> { + var llSearcher = Mono.fromCallable(() -> new LLIndexSearcher(searcher, false, null).send()); + return this.collect(llSearcher, localQueryParams, keyFieldName, transformer); + }) + .collectList() + .map(results -> { + List resultsToDrop = new ArrayList<>(results.size()); + List> resultsFluxes = new ArrayList<>(results.size()); + boolean exactTotalHitsCount = true; + long totalHitsCountValue = 0; + for (LuceneSearchResult result : results) { + resultsToDrop.add(result); + resultsFluxes.add(result.results()); + exactTotalHitsCount &= result.totalHitsCount().exact(); + totalHitsCountValue += result.totalHitsCount().value(); + } + + var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount); + Flux mergedFluxes = Flux + .merge(resultsFluxes) + .skip(queryParams2.offsetLong()) + .take(queryParams2.limitLong(), true); + + return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { + for (LuceneSearchResult luceneSearchResult : resultsToDrop) { + luceneSearchResult.close(); + } + indexSearchers.close(); + }); + }); + } + ); + }, + false + ); + } + + private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) { + return new LocalQueryParams(queryParams.query(), + 0L, + queryParams.offsetLong() + queryParams.limitLong(), + queryParams.pageLimits(), + queryParams.minCompetitiveScore(), + queryParams.sort(), + queryParams.complete() + ); + } + + @Override + public Mono collect(Mono> indexSearcherMono, + LocalQueryParams queryParams, + String keyFieldName, + LLSearchTransformer transformer) { + return Mono + .usingWhen( + indexSearcherMono, + indexSearcher -> { + Mono queryParamsMono; + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = transformer.transform(Mono + .fromSupplier(() -> new TransformerInput(LLIndexSearchers.unsharded(indexSearcher), queryParams))); + } + + return queryParamsMono.flatMap(queryParams2 -> Mono.fromCallable(() -> { + try (var is = indexSearcher.receive()) { + LLUtils.ensureBlocking(); + return is.getIndexSearcher().count(queryParams2.query()); + } + }).subscribeOn(Schedulers.boundedElastic())); + }, + is -> Mono.empty() + ) + .map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), null)) + .doOnDiscard(Send.class, Send::close) + .doOnDiscard(Resource.class, Resource::close); + } + + @Override + public String getName() { + return "count"; + } +} diff --git a/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java b/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java index 81caf74..30175bb 100644 --- a/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java +++ b/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java @@ -17,10 +17,9 @@ import it.cavallium.dbengine.database.LLScoreMode; import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher; import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher; -import it.cavallium.dbengine.lucene.searcher.CountLocalSearcher; +import it.cavallium.dbengine.lucene.searcher.CountMultiSearcher; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; import it.cavallium.dbengine.lucene.searcher.MultiSearcher; -import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredSimpleMultiSearcher; import java.io.IOException; import java.util.Objects; import java.util.stream.Stream; @@ -137,8 +136,8 @@ public class TestLuceneIndex { index.updateDocument("test-key-14", "2999").block(); index.updateDocument("test-key-15", "3902").block(); Flux.range(1, 1000).concatMap(i -> index.updateDocument("test-key-" + (15 + i), "" + i)).blockLast(); - tempDb.swappableLuceneSearcher().setSingle(new CountLocalSearcher()); - tempDb.swappableLuceneSearcher().setMulti(new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher())); + tempDb.swappableLuceneSearcher().setSingle(new CountMultiSearcher()); + tempDb.swappableLuceneSearcher().setMulti(new CountMultiSearcher()); assertCount(index, 1000 + 15); if (customSearcher != null) { tempDb.swappableLuceneSearcher().setSingle(customSearcher); diff --git a/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java b/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java index 3f00d26..2e89fc1 100644 --- a/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java +++ b/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java @@ -31,14 +31,13 @@ import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher; import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher; -import it.cavallium.dbengine.lucene.searcher.CountLocalSearcher; +import it.cavallium.dbengine.lucene.searcher.CountMultiSearcher; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; import it.cavallium.dbengine.lucene.searcher.MultiSearcher; import it.cavallium.dbengine.lucene.searcher.OfficialSearcher; import it.cavallium.dbengine.lucene.searcher.ScoredPagedMultiSearcher; import it.cavallium.dbengine.lucene.searcher.PagedLocalSearcher; import it.cavallium.dbengine.lucene.searcher.SortedScoredFullMultiSearcher; -import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredSimpleMultiSearcher; import it.cavallium.dbengine.lucene.searcher.UnsortedScoredFullMultiSearcher; import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredStreamingMultiSearcher; import java.io.IOException; @@ -124,8 +123,8 @@ public class TestLuceneSearches { .flatMap(entry -> index.updateDocument(entry.getKey(), entry.getValue())) .subscribeOn(Schedulers.boundedElastic()) .blockLast(); - tempDb.swappableLuceneSearcher().setSingle(new CountLocalSearcher()); - tempDb.swappableLuceneSearcher().setMulti(new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher())); + tempDb.swappableLuceneSearcher().setSingle(new CountMultiSearcher()); + tempDb.swappableLuceneSearcher().setMulti(new CountMultiSearcher()); assertCount(index, 1000 + 15); if (shards) { multiIndex = index; @@ -155,7 +154,7 @@ public class TestLuceneSearches { return Flux.push(sink -> { if (info.shard()) { if (info.onlyCount()) { - sink.next(new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher())); + sink.next(new CountMultiSearcher()); } else { sink.next(new ScoredPagedMultiSearcher()); if (info.sorted() && !info.sortedByScore()) { @@ -171,7 +170,7 @@ public class TestLuceneSearches { sink.next(new AdaptiveMultiSearcher(ENV)); } else { if (info.onlyCount()) { - sink.next(new CountLocalSearcher()); + sink.next(new CountMultiSearcher()); } else { sink.next(new PagedLocalSearcher()); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredSimpleMultiSearcher.java b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java similarity index 91% rename from src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredSimpleMultiSearcher.java rename to src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java index 7ad0641..bda736b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredSimpleMultiSearcher.java +++ b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java @@ -1,4 +1,4 @@ -package it.cavallium.dbengine.lucene.searcher; +package it.cavallium.dbengine; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; @@ -7,7 +7,12 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; +import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; +import it.cavallium.dbengine.lucene.searcher.LocalSearcher; +import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult; +import it.cavallium.dbengine.lucene.searcher.MultiSearcher; import java.util.ArrayList; import java.util.List; import reactor.core.publisher.Flux;