diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index d32c10d..70b986b 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.client; +import static it.cavallium.dbengine.utils.StreamUtils.toListClose; + import it.cavallium.dbengine.client.Hits.CloseableHits; import it.cavallium.dbengine.client.Hits.LuceneHits; import it.cavallium.dbengine.client.query.ClientQueryParams; @@ -90,13 +92,12 @@ public class LuceneIndexImpl implements LuceneIndex { var mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); - var results = luceneIndex + var results = toListClose(luceneIndex .moreLikeThis(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName(), mltDocumentFields - ) - .toList(); + )); LLSearchResultShard mergedResults = mergeResults(queryParams, results); if (mergedResults != null) { return mapResults(mergedResults); @@ -107,12 +108,11 @@ public class LuceneIndexImpl implements LuceneIndex { @Override public Hits> search(ClientQueryParams queryParams) { - var results = luceneIndex + var results = toListClose(luceneIndex .search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName() - ) - .toList(); + )); var mergedResults = mergeResults(queryParams, results); if (mergedResults != null) { diff --git a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java index e9177b4..c1e9f35 100644 --- a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java +++ b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java @@ -10,6 +10,9 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Spliterator; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collector; @@ -96,6 +99,14 @@ public class StreamUtils { } } + public static X scheduleOnPool(ForkJoinPool pool, Callable supplier) { + try { + return pool.submit(supplier).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + private record BatchSpliterator(Spliterator base, int batchSize) implements Spliterator> { @Override