diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 651567c..d32b0e2 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -353,16 +353,13 @@ public class LLUtils { public static > Mono usingSend(Mono> resourceSupplier, Function, Mono> resourceClosure, boolean cleanupOnSuccess) { - return Mono.usingWhen(resourceSupplier, resourceClosure, - r -> { - if (cleanupOnSuccess) { - return Mono.fromRunnable(r::close); - } else { - return Mono.empty(); - } - }, - (r, ex) -> Mono.fromRunnable(r::close), - r -> Mono.fromRunnable(r::close)) + return Mono.usingWhen(resourceSupplier, resourceClosure, r -> { + if (cleanupOnSuccess) { + return Mono.fromRunnable(r::close); + } else { + return Mono.empty(); + } + }, (r, ex) -> Mono.fromRunnable(r::close), r -> Mono.fromRunnable(r::close)) .doOnDiscard(Send.class, Send::close); } @@ -373,16 +370,13 @@ public class LLUtils { public static , V extends T> Mono usingResource(Mono resourceSupplier, Function> resourceClosure, boolean cleanupOnSuccess) { - return Mono.usingWhen(resourceSupplier, resourceClosure, - r -> { - if (cleanupOnSuccess) { - return Mono.fromRunnable(r::close); - } else { - return Mono.empty(); - } - }, - (r, ex) -> Mono.fromRunnable(r::close), - r -> Mono.fromRunnable(r::close)) + return Mono.usingWhen(resourceSupplier, resourceClosure, r -> { + if (cleanupOnSuccess) { + return Mono.fromRunnable(r::close); + } else { + return Mono.empty(); + } + }, (r, ex) -> Mono.fromRunnable(r::close), r -> Mono.fromRunnable(r::close)) .doOnDiscard(Resource.class, Resource::close) .doOnDiscard(Send.class, Send::close); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java new file mode 100644 index 0000000..ef99dde --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java @@ -0,0 +1,67 @@ +package it.cavallium.dbengine.lucene.searcher; + +import io.net5.buffer.api.Resource; +import io.net5.buffer.api.Send; +import io.net5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexSearcher; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSearcher { + + private final LuceneLocalSearcher localSearcher; + + public SimpleUnsortedUnscoredLuceneMultiSearcher(LuceneLocalSearcher localSearcher) { + this.localSearcher = localSearcher; + } + + @Override + public Mono> collect(Flux> indexSearchersFlux, + LocalQueryParams queryParams, + String keyFieldName) { + return Mono + .fromRunnable(() -> { + LLUtils.ensureBlocking(); + if (!queryParams.isSorted()) { + throw new UnsupportedOperationException("Sorted queries are not supported" + + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); + } + if (!queryParams.isScored()) { + throw new UnsupportedOperationException("Scored queries are not supported" + + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); + } + }) + .thenMany(indexSearchersFlux) + .flatMap(resSend -> localSearcher.collect(Mono.just(resSend), queryParams, keyFieldName)) + .collectList() + .map(results -> { + List resultsToDrop = new ArrayList<>(results.size()); + List> resultsFluxes = new ArrayList<>(results.size()); + boolean exactTotalHitsCount = true; + long totalHitsCountValue = 0; + for (Send resultToReceive : results) { + LuceneSearchResult result = resultToReceive.receive(); + resultsToDrop.add(result); + resultsFluxes.add(result.results()); + exactTotalHitsCount &= result.totalHitsCount().exact(); + totalHitsCountValue += result.totalHitsCount().value(); + } + + var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount); + Flux mergedFluxes = Flux.merge(resultsFluxes); + + return new LuceneSearchResult(totalHitsCount, mergedFluxes, d -> { + for (LuceneSearchResult luceneSearchResult : resultsToDrop) { + luceneSearchResult.close(); + } + }).send(); + }) + .doOnDiscard(Send.class, Send::close); + } +}