diff --git a/src/main/java/it/cavallium/dbengine/SwappableLuceneSearcher.java b/src/main/java/it/cavallium/dbengine/SwappableLuceneSearcher.java index 203a187..f519736 100644 --- a/src/main/java/it/cavallium/dbengine/SwappableLuceneSearcher.java +++ b/src/main/java/it/cavallium/dbengine/SwappableLuceneSearcher.java @@ -26,7 +26,7 @@ public class SwappableLuceneSearcher implements LocalSearcher, MultiSearcher, Cl } @Override - public Mono> collect(Mono> indexSearcherMono, + public Mono collect(Mono> indexSearcherMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { @@ -51,7 +51,7 @@ public class SwappableLuceneSearcher implements LocalSearcher, MultiSearcher, Cl } @Override - public Mono> collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { diff --git a/src/main/java/it/cavallium/dbengine/client/Hits.java b/src/main/java/it/cavallium/dbengine/client/Hits.java index 10ab45a..120be5a 100644 --- a/src/main/java/it/cavallium/dbengine/client/Hits.java +++ b/src/main/java/it/cavallium/dbengine/client/Hits.java @@ -62,37 +62,39 @@ public final class Hits extends ResourceSupport, Hits> { return new Hits<>(hitsEntry, hits.totalHitsCount, hits::close); } - public static Function>>, Send>>> generateMapper( + public static Function>, Hits>> generateMapper( ValueGetter valueGetter) { - return resultToReceive -> { - var result = resultToReceive.receive(); + return result -> { var hitsToTransform = result.results() .map(hit -> new LazyHitEntry<>(Mono.just(hit.key()), valueGetter.get(hit.key()), hit.score())); - return new Hits<>(hitsToTransform, result.totalHitsCount(), result::close).send(); + return new Hits<>(hitsToTransform, result.totalHitsCount(), result::close); }; } - public static Function>>, Send>>> generateMapper( + public static Function>, Hits>> generateMapper( ValueTransformer valueTransformer) { - return resultToReceive -> { - var result = resultToReceive.receive(); + return result -> { + try { + var sharedHitsFlux = result.results().publish().refCount(3); + var scoresFlux = sharedHitsFlux.map(HitKey::score); + var keysFlux = sharedHitsFlux.map(HitKey::key); - var sharedHitsFlux = result.results().publish().refCount(3); - var scoresFlux = sharedHitsFlux.map(HitKey::score); - var keysFlux = sharedHitsFlux.map(HitKey::key); + var valuesFlux = valueTransformer.transform(keysFlux); - var valuesFlux = valueTransformer.transform(keysFlux); + var transformedFlux = Flux.zip((Object[] data) -> { + //noinspection unchecked + var keyMono = Mono.just((T) data[0]); + //noinspection unchecked + var valMono = Mono.just((U) data[1]); + var score = (Float) data[2]; + return new LazyHitEntry<>(keyMono, valMono, score); + }, keysFlux, valuesFlux, scoresFlux); - var transformedFlux = Flux.zip((Object[] data) -> { - //noinspection unchecked - var keyMono = Mono.just((T) data[0]); - //noinspection unchecked - var valMono = Mono.just((U) data[1]); - var score = (Float) data[2]; - return new LazyHitEntry<>(keyMono, valMono, score); - }, keysFlux, valuesFlux, scoresFlux); - - return new Hits<>(transformedFlux, result.totalHitsCount(), result::close).send(); + return new Hits<>(transformedFlux, result.totalHitsCount(), result::close); + } catch (Throwable t) { + result.close(); + throw t; + } }; } diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java index 9609d19..3f6c43e 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java @@ -48,10 +48,10 @@ public interface LuceneIndex extends LLSnapshottable { Mono deleteAll(); - Mono>>> moreLikeThis(ClientQueryParams queryParams, T key, + Mono>> moreLikeThis(ClientQueryParams queryParams, T key, U mltDocumentValue); - Mono>>> search(ClientQueryParams queryParams); + Mono>> search(ClientQueryParams queryParams); Mono count(@Nullable CompositeSnapshot snapshot, Query query); diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 911fa49..70e723e 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -82,7 +82,7 @@ public class LuceneIndexImpl implements LuceneIndex { } @Override - public Mono>>> moreLikeThis(ClientQueryParams queryParams, + public Mono>> moreLikeThis(ClientQueryParams queryParams, T key, U mltDocumentValue) { Flux>> mltDocumentFields @@ -99,7 +99,7 @@ public class LuceneIndexImpl implements LuceneIndex { } @Override - public Mono>>> search(ClientQueryParams queryParams) { + public Mono>> search(ClientQueryParams queryParams) { return luceneIndex .search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), @@ -109,13 +109,12 @@ public class LuceneIndexImpl implements LuceneIndex { .single(); } - private Send>> mapResults(Send llSearchResultToReceive) { - var llSearchResult = llSearchResultToReceive.receive(); + private Hits> mapResults(LLSearchResultShard llSearchResult) { var scoresWithKeysFlux = llSearchResult .results() .map(hit -> new HitKey<>(indicizer.getKey(hit.key()), hit.score())); - return new Hits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), llSearchResult::close).send(); + return new Hits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), llSearchResult::close); } @Override @@ -123,8 +122,8 @@ public class LuceneIndexImpl implements LuceneIndex { return this .search(ClientQueryParams.builder().snapshot(snapshot).query(query).limit(0).build()) .single() - .map(searchResultKeysSend -> { - try (var searchResultKeys = searchResultKeysSend.receive()) { + .map(searchResultKeys -> { + try (searchResultKeys) { return searchResultKeys.totalHitsCount(); } }); diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index 1ec559d..b1103d3 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database; +import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import it.cavallium.data.generator.nativedata.Nullablefloat; import it.cavallium.dbengine.client.query.current.data.NoSort; @@ -42,7 +43,7 @@ public interface LLLuceneIndex extends LLSnapshottable { * The additional query will be used with the moreLikeThis query: "mltQuery AND additionalQuery" * @return the collection has one or more flux */ - Mono> moreLikeThis(@Nullable LLSnapshot snapshot, + Mono moreLikeThis(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, Flux>> mltDocumentFields); @@ -52,18 +53,18 @@ public interface LLLuceneIndex extends LLSnapshottable { * returned can be at most limit * 15 * @return the collection has one or more flux */ - Mono> search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName); + Mono search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName); default Mono count(@Nullable LLSnapshot snapshot, Query query) { QueryParams params = QueryParams.of(query, 0, 0, Nullablefloat.empty(), NoSort.of(), false); return Mono.from(this.search(snapshot, params, null) - .map(llSearchResultShardToReceive -> { - try (var llSearchResultShard = llSearchResultShardToReceive.receive()) { + .map(llSearchResultShard -> { + try (llSearchResultShard) { return llSearchResultShard.totalHitsCount(); } }) .defaultIfEmpty(TotalHitsCount.of(0, true)) - ).doOnDiscard(Send.class, Send::close); + ).doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close); } boolean isLowMemoryMode(); diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index be368a3..eee47c0 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -502,6 +502,25 @@ public class LLUtils { .doOnDiscard(Send.class, send -> send.close()); } + // todo: remove this ugly method + /** + * cleanup resource + * @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful + */ + public static , V extends T> Flux usingResources(Mono resourceSupplier, + Function> resourceClosure, + boolean cleanupOnSuccess) { + return Flux.usingWhen(resourceSupplier, resourceClosure, r -> { + if (cleanupOnSuccess) { + return Mono.fromRunnable(() -> r.close()); + } else { + return Mono.empty(); + } + }, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) + .doOnDiscard(Resource.class, resource -> resource.close()) + .doOnDiscard(Send.class, send -> send.close()); + } + // todo: remove this ugly method /** * cleanup resource diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 839ea5b..9e6fde3 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -4,6 +4,7 @@ import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE; import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION; import io.micrometer.core.instrument.MeterRegistry; +import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.DirectIOOptions; import it.cavallium.dbengine.client.IndicizerAnalyzers; @@ -359,7 +360,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } @Override - public Mono> moreLikeThis(@Nullable LLSnapshot snapshot, + public Mono moreLikeThis(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, Flux>> mltDocumentFieldsFlux) { @@ -367,22 +368,23 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { var searcher = this.searcherManager.retrieveSearcher(snapshot); var transformer = new MoreLikeThisTransformer(mltDocumentFieldsFlux); - return localSearcher.collect(searcher, localQueryParams, keyFieldName, transformer).map(resultToReceive -> { - var result = resultToReceive.receive(); - return new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close).send(); - }).doOnDiscard(Send.class, Send::close); + return localSearcher + .collect(searcher, localQueryParams, keyFieldName, transformer) + .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .doOnDiscard(Send.class, Send::close) + .doOnDiscard(Resource.class, Resource::close); } @Override - public Mono> search(@Nullable LLSnapshot snapshot, QueryParams queryParams, + public Mono search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName) { LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams); var searcher = searcherManager.retrieveSearcher(snapshot); - return localSearcher.collect(searcher, localQueryParams, keyFieldName, NO_TRANSFORMATION).map(resultToReceive -> { - var result = resultToReceive.receive(); - return new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close).send(); - }).doOnDiscard(Send.class, Send::close); + return localSearcher + .collect(searcher, localQueryParams, keyFieldName, NO_TRANSFORMATION) + .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .doOnDiscard(Send.class, Send::close); } public Mono> retrieveSearcher(@Nullable LLSnapshot snapshot) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 0587b9f..118fb69 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database.disk; import io.micrometer.core.instrument.MeterRegistry; +import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; @@ -210,7 +211,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } @Override - public Mono> moreLikeThis(@Nullable LLSnapshot snapshot, + public Mono moreLikeThis(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, Flux>> mltDocumentFields) { @@ -222,15 +223,13 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { return multiSearcher .collectMulti(searchers, localQueryParams, keyFieldName, transformer) // Transform the result type - .map(resultToReceive -> { - var result = resultToReceive.receive(); - return new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close).send(); - }) - .doOnDiscard(Send.class, Send::close); + .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .doOnDiscard(Send.class, Send::close) + .doOnDiscard(Resource.class, Resource::close); } @Override - public Mono> search(@Nullable LLSnapshot snapshot, + public Mono search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName) { LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams); @@ -240,11 +239,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { return multiSearcher .collectMulti(searchers, localQueryParams, keyFieldName, LLSearchTransformer.NO_TRANSFORMATION) // Transform the result type - .map(resultToReceive -> { - var result = resultToReceive.receive(); - return new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close).send(); - }) - .doOnDiscard(Send.class, Send::close); + .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java index 7be152b..20824ea 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java @@ -15,7 +15,7 @@ public class AdaptiveLocalSearcher implements LocalSearcher { private static final LocalSearcher countSearcher = new CountLocalSearcher(); @Override - public Mono> collect(Mono> indexSearcher, + public Mono collect(Mono> indexSearcher, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { @@ -39,7 +39,7 @@ public class AdaptiveLocalSearcher implements LocalSearcher { return "adaptivelocal"; } - public Mono> transformedCollect(Mono> indexSearcher, + public Mono transformedCollect(Mono> indexSearcher, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java index 176f616..759195b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java @@ -32,7 +32,7 @@ public class AdaptiveMultiSearcher implements MultiSearcher { } @Override - public Mono> collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { @@ -47,7 +47,7 @@ public class AdaptiveMultiSearcher implements MultiSearcher { } } - public Mono> transformedCollectMulti(Mono> indexSearchersMono, + public Mono transformedCollectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLocalSearcher.java index 08ab49c..4f95952 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLocalSearcher.java @@ -13,7 +13,7 @@ import reactor.core.scheduler.Schedulers; public class CountLocalSearcher implements LocalSearcher { @Override - public Mono> collect(Mono> indexSearcherMono, + public Mono collect(Mono> indexSearcherMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { @@ -38,7 +38,7 @@ public class CountLocalSearcher implements LocalSearcher { }, is -> Mono.empty() ) - .map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), null).send()) + .map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), null)) .doOnDiscard(Send.class, Send::close); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalSearcher.java index 119fbd2..62f719d 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalSearcher.java @@ -12,7 +12,7 @@ public interface LocalSearcher { * @param keyFieldName the name of the key field * @param transformer the search query transformer */ - Mono> collect(Mono> indexSearcherMono, + Mono collect(Mono> indexSearcherMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/MultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/MultiSearcher.java index 280d01c..0ca66d9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/MultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/MultiSearcher.java @@ -18,7 +18,7 @@ public interface MultiSearcher extends LocalSearcher { * @param keyFieldName the name of the key field * @param transformer the search query transformer */ - Mono> collectMulti(Mono> indexSearchersMono, + Mono collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer); @@ -30,7 +30,7 @@ public interface MultiSearcher extends LocalSearcher { * @param transformer the search query transformer */ @Override - default Mono> collect(Mono> indexSearcherMono, + default Mono collect(Mono> indexSearcherMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/OfficialSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/OfficialSearcher.java index a4aa143..ce7592c 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/OfficialSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/OfficialSearcher.java @@ -36,7 +36,7 @@ public class OfficialSearcher implements MultiSearcher { } @Override - public Mono> collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { @@ -102,7 +102,7 @@ public class OfficialSearcher implements MultiSearcher { /** * Compute the results, extracting useful data */ - private Mono> computeResults(Mono dataMono, + private Mono computeResults(Mono dataMono, LLIndexSearchers indexSearchers, String keyFieldName, LocalQueryParams queryParams) { @@ -115,7 +115,7 @@ public class OfficialSearcher implements MultiSearcher { .skip(queryParams.offsetLong()) .take(queryParams.limitLong(), true); - return new LuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers::close).send(); + return new LuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers::close); }); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java index 6e2ccdf..1628b9d 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java @@ -32,7 +32,7 @@ import reactor.core.scheduler.Schedulers; public class PagedLocalSearcher implements LocalSearcher { @Override - public Mono> collect(Mono> indexSearcherMono, + public Mono collect(Mono> indexSearcherMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { @@ -120,7 +120,7 @@ public class PagedLocalSearcher implements LocalSearcher { }).single(); } - private Mono> computeOtherResults(Mono firstResultMono, + private Mono computeOtherResults(Mono firstResultMono, List indexSearchers, LocalQueryParams queryParams, String keyFieldName, @@ -133,7 +133,7 @@ public class PagedLocalSearcher implements LocalSearcher { Flux nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo); Flux combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux); - return new LuceneSearchResult(totalHitsCount, combinedFlux, onClose).send(); + return new LuceneSearchResult(totalHitsCount, combinedFlux, onClose); }).single(); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java index 99efd08..4307668 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java @@ -32,7 +32,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { } @Override - public Mono> collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { @@ -114,7 +114,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { }); } - private Send computeOtherResults(FirstPageResults firstResult, + private LuceneSearchResult computeOtherResults(FirstPageResults firstResult, List indexSearchers, LocalQueryParams queryParams, String keyFieldName, @@ -126,7 +126,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { Flux nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo); Flux combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux); - return new LuceneSearchResult(totalHitsCount, combinedFlux, onClose).send(); + return new LuceneSearchResult(totalHitsCount, combinedFlux, onClose); } /** diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java index 0625898..6c490d5 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java @@ -33,7 +33,7 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher { } @Override - public Mono> collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { @@ -94,7 +94,7 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher { /** * Compute the results, extracting useful data */ - private Mono> computeResults(Mono> dataMono, + private Mono computeResults(Mono> dataMono, LLIndexSearchers indexSearchers, String keyFieldName, LocalQueryParams queryParams) { @@ -106,7 +106,7 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher { indexSearchers.shards(), keyFieldName, true) .take(queryParams.limitLong(), true); - return new LuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers::close).send(); + return new LuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers::close); }); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java index 88958df..17e7b4d 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java @@ -31,7 +31,7 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher { } @Override - public Mono> collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { @@ -98,7 +98,7 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher { /** * Compute the results, extracting useful data */ - private Mono> computeResults(Mono> dataMono, + private Mono computeResults(Mono> dataMono, LLIndexSearchers indexSearchers, String keyFieldName, LocalQueryParams queryParams) { @@ -110,7 +110,7 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher { indexSearchers.shards(), keyFieldName, true) .take(queryParams.limitLong(), true); - return new LuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers::close).send(); + return new LuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers::close); }); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredSimpleMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredSimpleMultiSearcher.java index 63293d4..7ad0641 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredSimpleMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredSimpleMultiSearcher.java @@ -22,7 +22,7 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { } @Override - public Mono> collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { @@ -62,8 +62,7 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { List> resultsFluxes = new ArrayList<>(results.size()); boolean exactTotalHitsCount = true; long totalHitsCountValue = 0; - for (Send resultToReceive : results) { - LuceneSearchResult result = resultToReceive.receive(); + for (LuceneSearchResult result : results) { resultsToDrop.add(result); resultsFluxes.add(result.results()); exactTotalHitsCount &= result.totalHitsCount().exact(); @@ -81,7 +80,7 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { luceneSearchResult.close(); } indexSearchers.close(); - }).send(); + }); }); } ); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java index cdd78c3..37cb459 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java @@ -33,7 +33,7 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { private static final Supplier> QUEUE_SUPPLIER = Queues.get(1024); @Override - public Mono> collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { @@ -105,7 +105,7 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { .skip(queryParams2.offsetLong()) .take(queryParams2.limitLong(), true); - return new LuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers::close).send(); + return new LuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers::close); }); }); }, false); diff --git a/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java b/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java index a88de11..3f00d26 100644 --- a/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java +++ b/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java @@ -255,7 +255,7 @@ public class TestLuceneSearches { runSearchers(expectedQueryType, searcher -> { var luceneIndex = getLuceneIndex(expectedQueryType.shard(), searcher); var query = queryParamsBuilder.build(); - try (var results = run(luceneIndex.search(query)).receive()) { + try (var results = run(luceneIndex.search(query))) { var hits = results.totalHitsCount(); var keys = getResults(results); if (hits.exact()) { @@ -267,7 +267,7 @@ public class TestLuceneSearches { var officialSearcher = new OfficialSearcher(ENV); luceneIndex = getLuceneIndex(expectedQueryType.shard(), officialSearcher); var officialQuery = queryParamsBuilder.limit(ELEMENTS.size() * 2L).build(); - try (var officialResults = run(luceneIndex.search(officialQuery)).receive()) { + try (var officialResults = run(luceneIndex.search(officialQuery))) { var officialHits = officialResults.totalHitsCount(); var officialKeys = getResults(officialResults).stream().toList(); if (officialHits.exact()) {