diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java index e313868..19832a1 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.client; +import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.query.ClientQueryParams; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; @@ -52,9 +53,9 @@ public interface LuceneIndex extends LLSnapshottable { Mono deleteAll(); - Mono> moreLikeThis(ClientQueryParams> queryParams, T key, U mltDocumentValue); + Mono>> moreLikeThis(ClientQueryParams> queryParams, T key, U mltDocumentValue); - default Mono> moreLikeThisWithValues(ClientQueryParams> queryParams, + default Mono>> moreLikeThisWithValues(ClientQueryParams> queryParams, T key, U mltDocumentValue, ValueGetter valueGetter) { @@ -64,21 +65,19 @@ public interface LuceneIndex extends LLSnapshottable { getValueGetterTransformer(valueGetter)); } - Mono> moreLikeThisWithTransformer(ClientQueryParams> queryParams, + Mono>> moreLikeThisWithTransformer(ClientQueryParams> queryParams, T key, U mltDocumentValue, ValueTransformer valueTransformer); - Mono> search(ClientQueryParams> queryParams); + Mono>> search(ClientQueryParams> queryParams); - default Mono> searchWithValues(ClientQueryParams> queryParams, + default Mono>> searchWithValues(ClientQueryParams> queryParams, ValueGetter valueGetter) { - return this.searchWithTransformer(queryParams, - getValueGetterTransformer(valueGetter) - ); + return this.searchWithTransformer(queryParams, getValueGetterTransformer(valueGetter)); } - Mono> searchWithTransformer(ClientQueryParams> queryParams, + Mono>> searchWithTransformer(ClientQueryParams> queryParams, ValueTransformer valueTransformer); Mono count(@Nullable CompositeSnapshot snapshot, Query query); diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index b242a9b..e3d6ef7 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.client; +import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.query.ClientQueryParams; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; @@ -84,58 +85,73 @@ public class LuceneIndexImpl implements LuceneIndex { return luceneIndex.deleteAll(); } - private Mono> transformLuceneResultWithTransformer(LLSearchResultShard llSearchResult) { - return Mono.just(new SearchResultKeys<>(llSearchResult.results() - .map(signal -> new SearchResultKey<>(Mono.fromCallable(signal::key).map(indicizer::getKey), signal.score())), - llSearchResult.totalHitsCount(), - llSearchResult.release() - )); + private Mono>> transformLuceneResultWithTransformer( + Mono> llSearchResultMono) { + return llSearchResultMono.map(llSearchResultToReceive -> { + var llSearchResult = llSearchResultToReceive.receive(); + return new SearchResultKeys<>(llSearchResult.results() + .map(signal -> new SearchResultKey<>(Mono + .fromCallable(signal::key) + .map(indicizer::getKey), signal.score())), + llSearchResult.totalHitsCount(), + d -> llSearchResult.close() + ).send(); + }); } - private Mono> transformLuceneResultWithValues(LLSearchResultShard llSearchResult, + private Mono>> transformLuceneResultWithValues( + Mono> llSearchResultMono, ValueGetter valueGetter) { - return Mono.fromCallable(() -> new SearchResult<>(llSearchResult.results().map(signal -> { - var key = Mono.fromCallable(signal::key).map(indicizer::getKey); - return new SearchResultItem<>(key, key.flatMap(valueGetter::get), signal.score()); - }), llSearchResult.totalHitsCount(), llSearchResult.release())); + return llSearchResultMono.map(llSearchResultToReceive -> { + var llSearchResult = llSearchResultToReceive.receive(); + return new SearchResult<>(llSearchResult.results().map(signal -> { + var key = Mono.fromCallable(signal::key).map(indicizer::getKey); + return new SearchResultItem<>(key, key.flatMap(valueGetter::get), signal.score()); + }), llSearchResult.totalHitsCount(), d -> llSearchResult.close()).send(); + }); } - private Mono> transformLuceneResultWithTransformer(LLSearchResultShard llSearchResult, + private Mono>> transformLuceneResultWithTransformer( + Mono> llSearchResultMono, ValueTransformer valueTransformer) { - var scoresWithKeysFlux = llSearchResult - .results() - .flatMapSequential(signal -> Mono - .fromCallable(signal::key) - .map(indicizer::getKey) - .map(key -> Tuples.of(signal.score(), key)) - ); - var resultItemsFlux = valueTransformer - .transform(scoresWithKeysFlux) - .filter(tuple3 -> tuple3.getT3().isPresent()) - .map(tuple3 -> new SearchResultItem<>(Mono.just(tuple3.getT2()), - Mono.just(tuple3.getT3().orElseThrow()), - tuple3.getT1() - )); - return Mono.fromCallable(() -> new SearchResult<>(resultItemsFlux, - llSearchResult.totalHitsCount(), - llSearchResult.release() - )); + return llSearchResultMono + .map(llSearchResultToReceive -> { + var llSearchResult = llSearchResultToReceive.receive(); + var scoresWithKeysFlux = llSearchResult + .results() + .flatMapSequential(signal -> Mono + .fromCallable(signal::key) + .map(indicizer::getKey) + .map(key -> Tuples.of(signal.score(), key)) + ); + var resultItemsFlux = valueTransformer + .transform(scoresWithKeysFlux) + .filter(tuple3 -> tuple3.getT3().isPresent()) + .map(tuple3 -> new SearchResultItem<>(Mono.just(tuple3.getT2()), + Mono.just(tuple3.getT3().orElseThrow()), + tuple3.getT1() + )); + return new SearchResult<>(resultItemsFlux, + llSearchResult.totalHitsCount(), + d -> llSearchResult.close() + ).send(); + }); } @Override - public Mono> moreLikeThis(ClientQueryParams> queryParams, + public Mono>> moreLikeThis(ClientQueryParams> queryParams, T key, U mltDocumentValue) { Flux>> mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); return luceneIndex .moreLikeThis(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName(), mltDocumentFields) - .flatMap(this::transformLuceneResultWithTransformer); + .transform(this::transformLuceneResultWithTransformer); } @Override - public Mono> moreLikeThisWithValues(ClientQueryParams> queryParams, + public Mono>> moreLikeThisWithValues(ClientQueryParams> queryParams, T key, U mltDocumentValue, ValueGetter valueGetter) { @@ -147,13 +163,12 @@ public class LuceneIndexImpl implements LuceneIndex { indicizer.getKeyFieldName(), mltDocumentFields ) - .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, - valueGetter - )); + .transform(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, + valueGetter)); } @Override - public Mono> moreLikeThisWithTransformer(ClientQueryParams> queryParams, + public Mono>> moreLikeThisWithTransformer(ClientQueryParams> queryParams, T key, U mltDocumentValue, ValueTransformer valueTransformer) { @@ -165,40 +180,51 @@ public class LuceneIndexImpl implements LuceneIndex { indicizer.getKeyFieldName(), mltDocumentFields ) - .flatMap(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult, valueTransformer)); + .transform(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult, + valueTransformer)); } @Override - public Mono> search(ClientQueryParams> queryParams) { + public Mono>> search(ClientQueryParams> queryParams) { return luceneIndex .search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName() ) - .flatMap(this::transformLuceneResultWithTransformer); + .transform(this::transformLuceneResultWithTransformer); } @Override - public Mono> searchWithValues(ClientQueryParams> queryParams, + public Mono>> searchWithValues( + ClientQueryParams> queryParams, ValueGetter valueGetter) { return luceneIndex - .search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName()) - .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, valueGetter)); + .search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), + indicizer.getKeyFieldName()) + .transform(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, + valueGetter)); } @Override - public Mono> searchWithTransformer(ClientQueryParams> queryParams, + public Mono>> searchWithTransformer( + ClientQueryParams> queryParams, ValueTransformer valueTransformer) { return luceneIndex - .search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName()) - .flatMap(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult, valueTransformer)); + .search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), + indicizer.getKeyFieldName()) + .transform(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult, + valueTransformer)); } @Override public Mono count(@Nullable CompositeSnapshot snapshot, Query query) { return this .search(ClientQueryParams.>builder().snapshot(snapshot).query(query).limit(0).build()) - .flatMap(tSearchResultKeys -> tSearchResultKeys.release().thenReturn(tSearchResultKeys.totalHitsCount())); + .map(searchResultKeysSend -> { + try (var searchResultKeys = searchResultKeysSend.receive()) { + return searchResultKeys.totalHitsCount(); + } + }); } @Override diff --git a/src/main/java/it/cavallium/dbengine/client/SearchResult.java b/src/main/java/it/cavallium/dbengine/client/SearchResult.java index 342e0a7..40e6ba7 100644 --- a/src/main/java/it/cavallium/dbengine/client/SearchResult.java +++ b/src/main/java/it/cavallium/dbengine/client/SearchResult.java @@ -1,5 +1,8 @@ package it.cavallium.dbengine.client; +import io.net5.buffer.api.Drop; +import io.net5.buffer.api.Owned; +import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLSearchResultShard; import java.util.Objects; @@ -8,35 +11,20 @@ import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public final class SearchResult { +public final class SearchResult extends ResourceSupport, SearchResult> { - private static final Logger logger = LoggerFactory.getLogger(SearchResult.class); + private Flux> results; + private TotalHitsCount totalHitsCount; - private volatile boolean releaseCalled; - - private final Flux> results; - private final TotalHitsCount totalHitsCount; - private final Mono release; - - public SearchResult(Flux> results, TotalHitsCount totalHitsCount, Mono release) { + public SearchResult(Flux> results, TotalHitsCount totalHitsCount, + Drop> drop) { + super(new SearchResult.CloseOnDrop<>(drop)); this.results = results; this.totalHitsCount = totalHitsCount; - this.release = Mono.fromRunnable(() -> { - if (releaseCalled) { - logger.warn(this.getClass().getName() + "::release has been called twice!"); - } - releaseCalled = true; - }).then(release); } public static SearchResult empty() { - var sr = new SearchResult(Flux.empty(), TotalHitsCount.of(0, true), Mono.empty()); - sr.releaseCalled = true; - return sr; - } - - public Flux> resultsThenRelease() { - return Flux.usingWhen(Mono.just(true), _unused -> results, _unused -> release); + return new SearchResult(Flux.empty(), TotalHitsCount.of(0, true), d -> {}); } public Flux> results() { @@ -47,39 +35,40 @@ public final class SearchResult { return totalHitsCount; } - public Mono release() { - return release; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) - return true; - if (obj == null || obj.getClass() != this.getClass()) - return false; - var that = (SearchResult) obj; - return Objects.equals(this.results, that.results) && Objects.equals(this.totalHitsCount, that.totalHitsCount) - && Objects.equals(this.release, that.release); - } - - @Override - public int hashCode() { - return Objects.hash(results, totalHitsCount, release); - } - @Override public String toString() { - return "SearchResult[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ", " + "release=" - + release + ']'; + return "SearchResult[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ']'; } - @SuppressWarnings("deprecation") @Override - protected void finalize() throws Throwable { - if (!releaseCalled) { - logger.warn(this.getClass().getName() + "::release has not been called before class finalization!"); - } - super.finalize(); + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); } + @Override + protected Owned> prepareSend() { + var results = this.results; + var totalHitsCount = this.totalHitsCount; + makeInaccessible(); + return drop -> new SearchResult<>(results, totalHitsCount, drop); + } + + private void makeInaccessible() { + this.results = null; + this.totalHitsCount = null; + } + + private static class CloseOnDrop implements Drop> { + + private final Drop> delegate; + + public CloseOnDrop(Drop> drop) { + this.delegate = drop; + } + + @Override + public void drop(SearchResult obj) { + delegate.drop(obj); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java index adabd99..d784a14 100644 --- a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java +++ b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java @@ -1,5 +1,8 @@ package it.cavallium.dbengine.client; +import io.net5.buffer.api.Drop; +import io.net5.buffer.api.Owned; +import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.collections.ValueGetter; @@ -11,42 +14,29 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @SuppressWarnings("unused") -public final class SearchResultKeys { +public final class SearchResultKeys extends ResourceSupport, SearchResultKeys> { private static final Logger logger = LoggerFactory.getLogger(SearchResultKeys.class); - private volatile boolean releaseCalled; + private Flux> results; + private TotalHitsCount totalHitsCount; - private final Flux> results; - private final TotalHitsCount totalHitsCount; - private final Mono release; - - public SearchResultKeys(Flux> results, TotalHitsCount totalHitsCount, Mono release) { + public SearchResultKeys(Flux> results, TotalHitsCount totalHitsCount, + Drop> drop) { + super(new SearchResultKeys.CloseOnDrop<>(drop)); this.results = results; this.totalHitsCount = totalHitsCount; - this.release = Mono.fromRunnable(() -> { - if (releaseCalled) { - logger.warn(this.getClass().getName() + "::release has been called twice!"); - } - releaseCalled = true; - }).then(release); } public static SearchResultKeys empty() { - var sr = new SearchResultKeys(Flux.empty(), TotalHitsCount.of(0, true), Mono.empty()); - sr.releaseCalled = true; - return sr; + return new SearchResultKeys(Flux.empty(), TotalHitsCount.of(0, true), d -> {}); } public SearchResult withValues(ValueGetter valuesGetter) { return new SearchResult<>(results.map(item -> new SearchResultItem<>(item.key(), item.key().flatMap(valuesGetter::get), item.score() - )), totalHitsCount, release); - } - - public Flux> resultsThenRelease() { - return Flux.usingWhen(Mono.just(true), _unused -> results, _unused -> release); + )), totalHitsCount, d -> this.close()); } public Flux> results() { @@ -57,39 +47,41 @@ public final class SearchResultKeys { return totalHitsCount; } - public Mono release() { - return release; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) - return true; - if (obj == null || obj.getClass() != this.getClass()) - return false; - var that = (SearchResultKeys) obj; - return Objects.equals(this.results, that.results) && Objects.equals(this.totalHitsCount, that.totalHitsCount) - && Objects.equals(this.release, that.release); - } - - @Override - public int hashCode() { - return Objects.hash(results, totalHitsCount, release); - } - @Override public String toString() { - return "SearchResultKeys[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ", " + "release=" - + release + ']'; + return "SearchResultKeys[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ']'; } - @SuppressWarnings("deprecation") @Override - protected void finalize() throws Throwable { - if (!releaseCalled) { - logger.warn(this.getClass().getName() + "::release has not been called before class finalization!"); + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); + } + + @Override + protected Owned> prepareSend() { + var results = this.results; + var totalHitsCount = this.totalHitsCount; + makeInaccessible(); + return drop -> new SearchResultKeys<>(results, totalHitsCount, drop); + } + + private void makeInaccessible() { + this.results = null; + this.totalHitsCount = null; + } + + private static class CloseOnDrop implements Drop> { + + private final Drop> delegate; + + public CloseOnDrop(Drop> drop) { + this.delegate = drop; + } + + @Override + public void drop(SearchResultKeys obj) { + delegate.drop(obj); } - super.finalize(); } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index d32b0e2..200f4e4 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -10,19 +10,25 @@ import io.net5.buffer.api.Send; import io.net5.util.IllegalReferenceCountException; import io.net5.util.internal.PlatformDependent; import it.cavallium.dbengine.database.collections.DatabaseStage; +import it.cavallium.dbengine.database.disk.LLIndexContext; +import it.cavallium.dbengine.database.disk.LLIndexSearcher; +import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex; import it.cavallium.dbengine.database.disk.MemorySegmentUtils; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.RandomSortField; +import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -37,10 +43,19 @@ import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; +import org.apache.lucene.queries.mlt.MoreLikeThis; +import org.apache.lucene.search.BooleanClause.Occur; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.ConstantScoreQuery; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.MatchNoDocsQuery; +import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.search.similarities.TFIDFSimilarity; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.RocksDB; @@ -381,6 +396,74 @@ public class LLUtils { .doOnDiscard(Send.class, Send::close); } + public static Mono getMoreLikeThisQuery( + LLIndexSearcher indexSearcher, + @Nullable LLSnapshot snapshot, + LocalQueryParams localQueryParams, + Flux>> mltDocumentFieldsFlux) { + Query luceneAdditionalQuery; + try { + luceneAdditionalQuery = localQueryParams.query(); + } catch (Exception e) { + return Mono.error(e); + } + return mltDocumentFieldsFlux + .collectMap(Tuple2::getT1, Tuple2::getT2, HashMap::new) + .flatMap(mltDocumentFields -> { + mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty()); + if (mltDocumentFields.isEmpty()) { + return Mono.just(new LocalQueryParams(new MatchNoDocsQuery(), + localQueryParams.offset(), + localQueryParams.limit(), + localQueryParams.minCompetitiveScore(), + localQueryParams.sort(), + localQueryParams.scoreMode() + )); + } + new IndexSearcher + return indexSearcher.getIndexSearcher().search(snapshot, indexSearcher -> Mono.fromCallable(() -> { + var mlt = new MoreLikeThis(indexSearcher.getIndexReader()); + mlt.setAnalyzer(llLocalLuceneIndex.indexWriter.getAnalyzer()); + mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new)); + mlt.setMinTermFreq(1); + mlt.setMinDocFreq(3); + mlt.setMaxDocFreqPct(20); + mlt.setBoost(localQueryParams.scoreMode().needsScores()); + mlt.setStopWords(EnglishItalianStopFilter.getStopWordsString()); + var similarity = llLocalLuceneIndex.getSimilarity(); + if (similarity instanceof TFIDFSimilarity) { + mlt.setSimilarity((TFIDFSimilarity) similarity); + } else { + LLLocalLuceneIndex.logger.trace(MARKER_ROCKSDB, "Using an unsupported similarity algorithm for MoreLikeThis:" + + " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity); + } + + // Get the reference docId and apply it to MoreLikeThis, to generate the query + @SuppressWarnings({"unchecked", "rawtypes"}) + var mltQuery = mlt.like((Map) mltDocumentFields); + Query luceneQuery; + if (!(luceneAdditionalQuery instanceof MatchAllDocsQuery)) { + luceneQuery = new BooleanQuery.Builder() + .add(mltQuery, Occur.MUST) + .add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST) + .build(); + } else { + luceneQuery = mltQuery; + } + + return luceneQuery; + }) + .subscribeOn(Schedulers.boundedElastic()) + .map(luceneQuery -> new LocalQueryParams(luceneQuery, + localQueryParams.offset(), + localQueryParams.limit(), + localQueryParams.minCompetitiveScore(), + localQueryParams.sort(), + localQueryParams.scoreMode() + ))); + }); + } + public static record DirectBuffer(@NotNull Send buffer, @NotNull ByteBuffer byteBuffer) {} @NotNull diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexContext.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexContext.java new file mode 100644 index 0000000..032c5d3 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexContext.java @@ -0,0 +1,81 @@ +package it.cavallium.dbengine.database.disk; + +import io.net5.buffer.api.Drop; +import io.net5.buffer.api.Owned; +import io.net5.buffer.api.Send; +import io.net5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.IndexSearcher; + +public class LLIndexContext extends ResourceSupport { + + private LLIndexSearcher indexSearcher; + private LLSearchTransformer indexQueryTransformer; + + protected LLIndexContext(Send indexSearcher, + LLSearchTransformer indexQueryTransformer, + Drop drop) { + super(new CloseOnDrop(drop)); + this.indexSearcher = indexSearcher.receive(); + this.indexQueryTransformer = indexQueryTransformer; + } + + @Override + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); + } + + @Override + protected Owned prepareSend() { + var indexSearcher = this.indexSearcher.send(); + var indexQueryTransformer = this.indexQueryTransformer; + makeInaccessible(); + return drop -> new LLIndexContext(indexSearcher, indexQueryTransformer, drop); + } + + private void makeInaccessible() { + this.indexSearcher = null; + this.indexQueryTransformer = null; + } + + public IndexSearcher getIndexSearcher() { + if (!isOwned()) { + throw new UnsupportedOperationException("Closed"); + } + return indexSearcher.getIndexSearcher(); + } + + public IndexReader getIndexReader() { + if (!isOwned()) { + throw new UnsupportedOperationException("Closed"); + } + return indexSearcher.getIndexReader(); + } + + public LLSearchTransformer getIndexQueryTransformer() { + if (!isOwned()) { + throw new UnsupportedOperationException("Closed"); + } + return indexQueryTransformer; + } + + private static class CloseOnDrop implements Drop { + + private final Drop delegate; + + public CloseOnDrop(Drop drop) { + this.delegate = drop; + } + + @Override + public void drop(LLIndexContext obj) { + try { + if (obj.indexSearcher != null) obj.indexSearcher.close(); + delegate.drop(obj); + } finally { + obj.makeInaccessible(); + } + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexContexts.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexContexts.java new file mode 100644 index 0000000..97a795b --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexContexts.java @@ -0,0 +1,199 @@ +package it.cavallium.dbengine.database.disk; + +import io.net5.buffer.api.Drop; +import io.net5.buffer.api.Owned; +import io.net5.buffer.api.Resource; +import io.net5.buffer.api.Send; +import io.net5.buffer.api.internal.ResourceSupport; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexReaderContext; +import org.apache.lucene.index.MultiReader; +import org.apache.lucene.index.StoredFieldVisitor; +import org.apache.lucene.index.Term; + +public interface LLIndexContexts extends Resource { + + static LLIndexContexts of(List> indexSearchers) { + return new ShardedIndexSearchers(indexSearchers, d -> {}); + } + + static UnshardedIndexSearchers unsharded(Send indexSearcher) { + return new UnshardedIndexSearchers(indexSearcher, d -> {}); + } + + Iterable shards(); + + LLIndexContext shard(int shardIndex); + + IndexReader allShards(); + + class UnshardedIndexSearchers extends ResourceSupport + implements LLIndexContexts { + + private LLIndexContext indexSearcher; + + public UnshardedIndexSearchers(Send indexSearcher, Drop drop) { + super(new CloseOnDrop(drop)); + this.indexSearcher = indexSearcher.receive(); + } + + @Override + public Iterable shards() { + return Collections.singleton(indexSearcher); + } + + @Override + public LLIndexContext shard(int shardIndex) { + if (!isOwned()) { + throw attachTrace(new IllegalStateException("UnshardedIndexSearchers must be owned to be used")); + } + if (shardIndex != -1) { + throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid, this is a unsharded index"); + } + return indexSearcher; + } + + @Override + public IndexReader allShards() { + return indexSearcher.getIndexReader(); + } + + public LLIndexContext shard() { + return this.shard(0); + } + + @Override + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); + } + + @Override + protected Owned prepareSend() { + Send indexSearcher = this.indexSearcher.send(); + this.makeInaccessible(); + return drop -> new UnshardedIndexSearchers(indexSearcher, drop); + } + + private void makeInaccessible() { + this.indexSearcher = null; + } + + private static class CloseOnDrop implements Drop { + + private final Drop delegate; + + public CloseOnDrop(Drop drop) { + this.delegate = drop; + } + + @Override + public void drop(UnshardedIndexSearchers obj) { + try { + if (obj.indexSearcher != null) obj.indexSearcher.close(); + delegate.drop(obj); + } finally { + obj.makeInaccessible(); + } + } + } + } + + class ShardedIndexSearchers extends ResourceSupport + implements LLIndexContexts { + + private List indexSearchers; + + public ShardedIndexSearchers(List> indexSearchers, Drop drop) { + super(new CloseOnDrop(drop)); + this.indexSearchers = new ArrayList<>(indexSearchers.size()); + for (Send indexSearcher : indexSearchers) { + this.indexSearchers.add(indexSearcher.receive()); + } + } + + @Override + public Iterable shards() { + return Collections.unmodifiableList(indexSearchers); + } + + @Override + public LLIndexContext shard(int shardIndex) { + if (!isOwned()) { + throw attachTrace(new IllegalStateException("ShardedIndexSearchers must be owned to be used")); + } + if (shardIndex < 0) { + throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid"); + } + return indexSearchers.get(shardIndex); + } + + @Override + public IndexReader allShards() { + var irs = new IndexReader[indexSearchers.size()]; + for (int i = 0, s = indexSearchers.size(); i < s; i++) { + irs[i] = indexSearchers.get(i).getIndexReader(); + } + Object2IntOpenHashMap indexes = new Object2IntOpenHashMap<>(); + for (int i = 0; i < irs.length; i++) { + indexes.put(irs[i], i); + } + try { + return new MultiReader(irs, Comparator.comparingInt(indexes::getInt), true); + } catch (IOException ex) { + // This shouldn't happen + throw new UncheckedIOException(ex); + } + } + + @Override + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); + } + + @Override + protected Owned prepareSend() { + List> indexSearchers = new ArrayList<>(this.indexSearchers.size()); + for (LLIndexContext indexSearcher : this.indexSearchers) { + indexSearchers.add(indexSearcher.send()); + } + this.makeInaccessible(); + return drop -> new ShardedIndexSearchers(indexSearchers, drop); + } + + private void makeInaccessible() { + this.indexSearchers = null; + } + + private static class CloseOnDrop implements Drop { + + private final Drop delegate; + + public CloseOnDrop(Drop drop) { + this.delegate = drop; + } + + @Override + public void drop(ShardedIndexSearchers obj) { + try { + if (obj.indexSearchers != null) { + for (LLIndexContext indexSearcher : obj.indexSearchers) { + indexSearcher.close(); + } + } + delegate.drop(obj); + } finally { + obj.makeInaccessible(); + } + } + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java index 7bf4f68..e534357 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java @@ -85,10 +85,12 @@ public class LLIndexSearcher extends ResourceSupport>> mltDocumentFieldsFlux) { - return getMoreLikeThisQuery(snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux) + return LLUtils + .getMoreLikeThisQuery(this, snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux) .flatMap(modifiedLocalQuery -> searcherManager .retrieveSearcher(snapshot) - .flatMap(indexSearcher -> localSearcher.collect(indexSearcher, modifiedLocalQuery, keyFieldName)) + .transform(indexSearcher -> localSearcher.collect(indexSearcher, modifiedLocalQuery, keyFieldName)) ) .map(resultToReceive -> { var result = resultToReceive.receive(); @@ -343,11 +335,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .doOnDiscard(Send.class, Send::close); } - public Mono distributedMoreLikeThis(@Nullable LLSnapshot snapshot, + public Mono getMoreLikeThisTransformer(@Nullable LLSnapshot snapshot, QueryParams queryParams, Flux>> mltDocumentFieldsFlux, LuceneMultiSearcher shardSearcher) { - return getMoreLikeThisQuery(snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux) + return LLUtils + .getMoreLikeThisQuery(this, snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux) .flatMap(modifiedLocalQuery -> searcherManager .retrieveSearcher(snapshot) .flatMap(indexSearcher -> shardSearcher.searchOn(indexSearcher, modifiedLocalQuery)) @@ -355,78 +348,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .doOnDiscard(Send.class, Send::close); } - public Mono getMoreLikeThisQuery(@Nullable LLSnapshot snapshot, - LocalQueryParams localQueryParams, - Flux>> mltDocumentFieldsFlux) { - Query luceneAdditionalQuery; - try { - luceneAdditionalQuery = localQueryParams.query(); - } catch (Exception e) { - return Mono.error(e); - } - return mltDocumentFieldsFlux - .collectMap(Tuple2::getT1, Tuple2::getT2, HashMap::new) - .flatMap(mltDocumentFields -> { - mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty()); - if (mltDocumentFields.isEmpty()) { - return Mono.just(new LocalQueryParams(new MatchNoDocsQuery(), - localQueryParams.offset(), - localQueryParams.limit(), - localQueryParams.minCompetitiveScore(), - localQueryParams.sort(), - localQueryParams.scoreMode() - )); - } - return this.searcherManager.search(snapshot, indexSearcher -> Mono.fromCallable(() -> { - var mlt = new MoreLikeThis(indexSearcher.getIndexReader()); - mlt.setAnalyzer(indexWriter.getAnalyzer()); - mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new)); - mlt.setMinTermFreq(1); - mlt.setMinDocFreq(3); - mlt.setMaxDocFreqPct(20); - mlt.setBoost(localQueryParams.scoreMode().needsScores()); - mlt.setStopWords(EnglishItalianStopFilter.getStopWordsString()); - var similarity = getSimilarity(); - if (similarity instanceof TFIDFSimilarity) { - mlt.setSimilarity((TFIDFSimilarity) similarity); - } else { - logger.trace(MARKER_ROCKSDB, "Using an unsupported similarity algorithm for MoreLikeThis:" - + " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity); - } - - // Get the reference docId and apply it to MoreLikeThis, to generate the query - @SuppressWarnings({"unchecked", "rawtypes"}) - var mltQuery = mlt.like((Map) mltDocumentFields); - Query luceneQuery; - if (!(luceneAdditionalQuery instanceof MatchAllDocsQuery)) { - luceneQuery = new BooleanQuery.Builder() - .add(mltQuery, Occur.MUST) - .add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST) - .build(); - } else { - luceneQuery = mltQuery; - } - - return luceneQuery; - }) - .subscribeOn(Schedulers.boundedElastic()) - .map(luceneQuery -> new LocalQueryParams(luceneQuery, - localQueryParams.offset(), - localQueryParams.limit(), - localQueryParams.minCompetitiveScore(), - localQueryParams.sort(), - localQueryParams.scoreMode() - ))); - }); - } - @Override public Mono> search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName) { LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams); return searcherManager .retrieveSearcher(snapshot) - .flatMap(indexSearcher -> localSearcher.collect(indexSearcher, localQueryParams, keyFieldName)) + .transform(indexSearcher -> localSearcher.collect(indexSearcher, localQueryParams, + LLSearchTransformer.NO_TRANSFORMATION, keyFieldName)) .map(resultToReceive -> { var result = resultToReceive.receive(); return new LLSearchResultShard(result.results(), result.totalHitsCount(), d -> result.close()).send(); @@ -434,13 +363,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .doOnDiscard(Send.class, Send::close); } - public Mono distributedSearch(@Nullable LLSnapshot snapshot, - QueryParams queryParams, - LuceneMultiSearcher shardSearcher) { - LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams); + public Mono> retrieveContext(@Nullable LLSnapshot snapshot, + @Nullable LLSearchTransformer indexQueryTransformer) { return searcherManager .retrieveSearcher(snapshot) - .flatMap(indexSearcher -> shardSearcher.searchOn(indexSearcher, localQueryParams)) + .map(indexSearcherToReceive -> new LLIndexContext(indexSearcherToReceive, + Objects.requireNonNullElse(indexQueryTransformer, LLSearchTransformer.NO_TRANSFORMATION), + d -> {}).send()) .doOnDiscard(Send.class, Send::close); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index e1f22de..da10690 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.disk; +import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.client.LuceneOptions; @@ -11,6 +12,7 @@ import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneMultiSearcher; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher; import java.io.IOException; @@ -26,6 +28,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -86,6 +89,20 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { return luceneIndices[0].getLuceneIndexName(); } + private Flux> getIndexContexts(LLSnapshot snapshot, + Function indexQueryTransformers) { + return Flux + .fromArray(luceneIndices) + .index() + // Resolve the snapshot of each shard + .flatMap(tuple -> Mono + .fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1())) + .flatMap(luceneSnapshot -> tuple.getT2().retrieveContext( + luceneSnapshot.orElse(null), indexQueryTransformers.apply(tuple.getT2())) + ) + ); + } + @Override public Mono addDocument(LLTerm id, LLDocument doc) { return getLuceneIndex(id).addDocument(id, doc); @@ -176,12 +193,23 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } @Override - public Mono moreLikeThis(@Nullable LLSnapshot snapshot, + public Mono> moreLikeThis(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, Flux>> mltDocumentFields) { LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams); - record LuceneIndexWithSnapshot(LLLocalLuceneIndex luceneIndex, Optional snapshot) {} + Flux> serchers = this + .getIndexContexts(snapshot, luceneIndex -> LLSearchTransformer.NO_TRANSFORMATION); + + // Collect all the shards results into a single global result + return multiSearcher + .collect(serchers, localQueryParams, keyFieldName) + // Transform the result type + .map(resultToReceive -> { + var result = resultToReceive.receive(); + return new LLSearchResultShard(result.results(), result.totalHitsCount(), + d -> result.close()).send(); + }); return multiSearcher // Create shard searcher @@ -205,31 +233,21 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } @Override - public Mono search(@Nullable LLSnapshot snapshot, + public Mono> search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName) { LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams); - record LuceneIndexWithSnapshot(LLLocalLuceneIndex luceneIndex, Optional snapshot) {} + Flux> serchers = getIndexContexts(snapshot); + // Collect all the shards results into a single global result return multiSearcher - // Create shard searcher - .createShardSearcher(localQueryParams) - .flatMap(shardSearcher -> Flux - // Iterate the indexed shards - .fromArray(luceneIndices).index() - // Resolve the snapshot of each shard - .flatMap(tuple -> Mono - .fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1())) - .map(luceneSnapshot -> new LuceneIndexWithSnapshot(tuple.getT2(), luceneSnapshot)) - ) - // Execute the query and collect it using the shard searcher - .flatMap(luceneIndexWithSnapshot -> luceneIndexWithSnapshot.luceneIndex() - .distributedSearch(luceneIndexWithSnapshot.snapshot.orElse(null), queryParams, shardSearcher)) - // Collect all the shards results into a single global result - .then(shardSearcher.collect(localQueryParams, keyFieldName, luceneSearcherScheduler)) - ) - // Fix the result type - .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release())); + .collect(serchers, localQueryParams, keyFieldName) + // Transform the result type + .map(resultToReceive -> { + var result = resultToReceive.receive(); + return new LLSearchResultShard(result.results(), result.totalHitsCount(), + d -> result.close()).send(); + }); } @Override @@ -289,4 +307,32 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { public boolean isLowMemoryMode() { return luceneIndices[0].isLowMemoryMode(); } + + private class MoreLikeThisTransformer implements LLSearchTransformer { + + private final LLLocalLuceneIndex luceneIndex; + private final LLSnapshot snapshot; + private final String keyFieldName; + private final Flux>> mltDocumentFields; + + public MoreLikeThisTransformer(LLLocalLuceneIndex luceneIndex, + @Nullable LLSnapshot snapshot, + String keyFieldName, + Flux>> mltDocumentFields) { + this.luceneIndex = luceneIndex; + this.snapshot = snapshot; + this.keyFieldName = keyFieldName; + this.mltDocumentFields = mltDocumentFields; + } + + @Override + public Mono transform(Mono queryParamsMono) { + return queryParamsMono + .flatMap(queryParams -> { + luceneIndex.getMoreLikeThisTransformer(snapshot, queryParams, mltDocumentFields, ); + }); + LLLocalMultiLuceneIndex.this. + return null; + } + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 906ceac..773a166 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -1,41 +1,32 @@ package it.cavallium.dbengine.lucene; -import io.net5.buffer.api.Resource; -import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; -import it.cavallium.dbengine.client.query.BasicType; import it.cavallium.dbengine.client.query.QueryParser; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; -import it.cavallium.dbengine.database.LLScoreMode; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.ValueGetter; +import it.cavallium.dbengine.database.disk.LLIndexContexts; import it.cavallium.dbengine.lucene.analyzer.NCharGramAnalyzer; import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer; -import it.cavallium.dbengine.lucene.searcher.IndexSearchers; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; -import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher; import it.cavallium.dbengine.lucene.similarity.NGramSimilarity; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.Objects; -import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.LowerCaseFilter; @@ -65,11 +56,9 @@ import org.novasearch.lucene.search.similarities.BM25Similarity.BM25Model; import org.novasearch.lucene.search.similarities.LdpSimilarity; import org.novasearch.lucene.search.similarities.LtcSimilarity; import org.novasearch.lucene.search.similarities.RobertsonSimilarity; -import org.reactivestreams.Publisher; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.concurrent.Queues; @@ -367,7 +356,7 @@ public class LuceneUtils { } public static Flux convertHits(Flux hitsFlux, - IndexSearchers indexSearchers, + LLIndexContexts indexSearchers, String keyFieldName, boolean preserveOrder) { if (preserveOrder) { @@ -392,7 +381,7 @@ public class LuceneUtils { @Nullable private static LLKeyScore mapHitBlocking(ScoreDoc hit, - IndexSearchers indexSearchers, + LLIndexContexts indexSearchers, String keyFieldName) { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called mapHitBlocking in a nonblocking thread"); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java index 41e63c4..50a92d5 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java @@ -1,34 +1,24 @@ package it.cavallium.dbengine.lucene.searcher; import io.net5.buffer.api.Send; -import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexContext; import it.cavallium.dbengine.database.disk.LLIndexSearcher; -import org.apache.lucene.search.IndexSearcher; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher { private static final LuceneLocalSearcher localSearcher = new SimpleLuceneLocalSearcher(); - private static final LuceneLocalSearcher unscoredPagedLuceneLocalSearcher = new LocalLuceneWrapper(new UnscoredUnsortedContinuousLuceneMultiSearcher(), d -> {}); - private static final LuceneLocalSearcher countSearcher = new CountLuceneLocalSearcher(); @Override - public Mono> collect(Mono> indexSearcher, + public Mono> collect(Mono> indexSearcher, LocalQueryParams queryParams, String keyFieldName) { - Mono> collectionMono; if (queryParams.limit() == 0) { - collectionMono = countSearcher.collect(indexSearcher, queryParams, keyFieldName); - } else if (!queryParams.isScored() && queryParams.offset() == 0 && queryParams.limit() >= 2147483630 - && !queryParams.isSorted()) { - collectionMono = unscoredPagedLuceneLocalSearcher.collect(indexSearcher, queryParams, keyFieldName); + return countSearcher.collect(indexSearcher, queryParams, keyFieldName); } else { - collectionMono = localSearcher.collect(indexSearcher, queryParams, keyFieldName); + return localSearcher.collect(indexSearcher, queryParams, keyFieldName); } - return Mono.fromRunnable(LLUtils::ensureBlocking).then(collectionMono); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java index eca9414..f26170f 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java @@ -1,31 +1,32 @@ package it.cavallium.dbengine.lucene.searcher; import io.net5.buffer.api.Send; -import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexContext; +import it.cavallium.dbengine.database.disk.LLIndexSearcher; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher { - private static final LuceneMultiSearcher scoredLuceneMultiSearcher = new ScoredLuceneMultiSearcher(); + private static final LuceneMultiSearcher countLuceneMultiSearcher + = new SimpleUnsortedUnscoredLuceneMultiSearcher(new CountLuceneLocalSearcher()); - private static final LuceneMultiSearcher unscoredPagedLuceneMultiSearcher = new UnscoredPagedLuceneMultiSearcher(); + private static final LuceneMultiSearcher scoredSimpleLuceneShardSearcher + = new ScoredSimpleLuceneShardSearcher(); - private static final LuceneMultiSearcher unscoredIterableLuceneMultiSearcher = new UnscoredUnsortedContinuousLuceneMultiSearcher(); - - private static final LuceneMultiSearcher countLuceneMultiSearcher = new CountLuceneMultiSearcher(); + private static final LuceneMultiSearcher unscoredPagedLuceneMultiSearcher + = new SimpleUnsortedUnscoredLuceneMultiSearcher(new SimpleLuceneLocalSearcher()); @Override - public Mono> createShardSearcher(LocalQueryParams queryParams) { - Mono> shardSearcherCreationMono; - if (queryParams.limit() <= 0) { - shardSearcherCreationMono = countLuceneMultiSearcher.createShardSearcher(queryParams); - } else if (queryParams.isScored()) { - shardSearcherCreationMono = scoredLuceneMultiSearcher.createShardSearcher(queryParams); - } else if (queryParams.offset() == 0 && queryParams.limit() >= 2147483630 && !queryParams.isSorted()) { - shardSearcherCreationMono = unscoredIterableLuceneMultiSearcher.createShardSearcher(queryParams); + public Mono> collect(Flux> indexSearchersFlux, + LocalQueryParams queryParams, + String keyFieldName) { + if (queryParams.limit() == 0) { + return countLuceneMultiSearcher.collect(indexSearchersFlux, queryParams, keyFieldName); + } else if (queryParams.isSorted() || queryParams.isScored()) { + return scoredSimpleLuceneShardSearcher.collect(indexSearchersFlux, queryParams, keyFieldName); } else { - shardSearcherCreationMono = unscoredPagedLuceneMultiSearcher.createShardSearcher(queryParams); + return unscoredPagedLuceneMultiSearcher.collect(indexSearchersFlux, queryParams, keyFieldName); } - return Mono.fromRunnable(LLUtils::ensureBlocking).then(shardSearcherCreationMono); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java index 6343f0f..2d4ba85 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java @@ -1,10 +1,9 @@ package it.cavallium.dbengine.lucene.searcher; -import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; -import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexContext; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,7 +12,7 @@ import reactor.core.scheduler.Schedulers; public class CountLuceneLocalSearcher implements LuceneLocalSearcher { @Override - public Mono> collect(Mono> indexSearcherMono, + public Mono> collect(Mono> indexSearcherMono, LocalQueryParams queryParams, String keyFieldName) { return Mono diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java deleted file mode 100644 index 97603ca..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java +++ /dev/null @@ -1,88 +0,0 @@ -package it.cavallium.dbengine.lucene.searcher; - -import io.net5.buffer.api.Drop; -import io.net5.buffer.api.Owned; -import io.net5.buffer.api.Send; -import io.net5.buffer.api.internal.ResourceSupport; -import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; -import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.disk.LLIndexSearcher; -import java.util.concurrent.atomic.AtomicLong; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -public class CountLuceneMultiSearcher implements LuceneMultiSearcher { - - @Override - public Mono> createShardSearcher(LocalQueryParams queryParams) { - return Mono.fromCallable(() -> new CountLuceneShardSearcher(new AtomicLong(0), d -> {}).send()); - } - - private static class CountLuceneShardSearcher extends - ResourceSupport implements LuceneMultiSearcher { - - private AtomicLong totalHitsCount; - - public CountLuceneShardSearcher(AtomicLong totalHitsCount, Drop drop) { - super(new CloseOnDrop(drop)); - this.totalHitsCount = totalHitsCount; - } - - @Override - public Mono searchOn(Send indexSearcher, LocalQueryParams queryParams) { - return Mono - .fromCallable(() -> { - try (var is = indexSearcher.receive()) { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("CountLuceneMultiSearcher must be owned to be used")); - } - LLUtils.ensureBlocking(); - totalHitsCount.addAndGet(is.getIndexSearcher().count(queryParams.query())); - return null; - } - }); - } - - @Override - public Mono> collect(LocalQueryParams queryParams, String keyFieldName) { - return Mono.fromCallable(() -> { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("CountLuceneMultiSearcher must be owned to be used")); - } - LLUtils.ensureBlocking(); - return new LuceneSearchResult(TotalHitsCount.of(totalHitsCount.get(), true), Flux.empty(), d -> {}) - .send(); - }); - } - - @Override - protected RuntimeException createResourceClosedException() { - return new IllegalStateException("Closed"); - } - - @Override - protected Owned prepareSend() { - var totalHitsCount = this.totalHitsCount; - makeInaccessible(); - return drop -> new CountLuceneShardSearcher(totalHitsCount, drop); - } - - private void makeInaccessible() { - this.totalHitsCount = null; - } - - private static class CloseOnDrop implements Drop { - - private final Drop delegate; - - public CloseOnDrop(Drop drop) { - this.delegate = drop; - } - - @Override - public void drop(CountLuceneShardSearcher obj) { - delegate.drop(obj); - } - } - } -} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/FirstPageResults.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/FirstPageResults.java new file mode 100644 index 0000000..365233e --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/FirstPageResults.java @@ -0,0 +1,8 @@ +package it.cavallium.dbengine.lucene.searcher; + +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.database.LLKeyScore; +import reactor.core.publisher.Flux; + +record FirstPageResults(TotalHitsCount totalHitsCount, Flux firstPageHitsFlux, + CurrentPageInfo nextPageInfo) {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/IndexSearchers.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/IndexSearchers.java deleted file mode 100644 index 05925e8..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/IndexSearchers.java +++ /dev/null @@ -1,144 +0,0 @@ -package it.cavallium.dbengine.lucene.searcher; - -import io.net5.buffer.UnpooledDirectByteBuf; -import io.net5.buffer.api.Buffer; -import io.net5.buffer.api.Drop; -import io.net5.buffer.api.Owned; -import io.net5.buffer.api.Resource; -import io.net5.buffer.api.Send; -import io.net5.buffer.api.internal.ResourceSupport; -import it.cavallium.dbengine.database.LLRange; -import it.cavallium.dbengine.database.disk.LLIndexSearcher; -import java.util.ArrayList; -import java.util.List; -import org.apache.lucene.search.IndexSearcher; - -public interface IndexSearchers extends Resource { - - static IndexSearchers of(List indexSearchers) { - return new ShardedIndexSearchers(indexSearchers, d -> {}); - } - - static UnshardedIndexSearchers unsharded(Send indexSearcher) { - return new UnshardedIndexSearchers(indexSearcher, d -> {}); - } - - LLIndexSearcher shard(int shardIndex); - - class UnshardedIndexSearchers extends ResourceSupport - implements IndexSearchers { - - private LLIndexSearcher indexSearcher; - - public UnshardedIndexSearchers(Send indexSearcher, Drop drop) { - super(new CloseOnDrop(drop)); - this.indexSearcher = indexSearcher.receive(); - } - - @Override - public LLIndexSearcher shard(int shardIndex) { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("UnshardedIndexSearchers must be owned to be used")); - } - if (shardIndex != -1) { - throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid, this is a unsharded index"); - } - return indexSearcher; - } - - public LLIndexSearcher shard() { - return this.shard(0); - } - - @Override - protected RuntimeException createResourceClosedException() { - return new IllegalStateException("Closed"); - } - - @Override - protected Owned prepareSend() { - Send indexSearcher = this.indexSearcher.send(); - this.makeInaccessible(); - return drop -> new UnshardedIndexSearchers(indexSearcher, drop); - } - - private void makeInaccessible() { - this.indexSearcher = null; - } - - private static class CloseOnDrop implements Drop { - - private final Drop delegate; - - public CloseOnDrop(Drop drop) { - this.delegate = drop; - } - - @Override - public void drop(UnshardedIndexSearchers obj) { - try { - if (obj.indexSearcher != null) obj.indexSearcher.close(); - delegate.drop(obj); - } finally { - obj.makeInaccessible(); - } - } - } - } - - class ShardedIndexSearchers extends ResourceSupport - implements IndexSearchers { - - private List indexSearchers; - - public ShardedIndexSearchers(List indexSearchers, Drop drop) { - super(new CloseOnDrop(drop)); - this.indexSearchers = indexSearchers; - } - - @Override - public LLIndexSearcher shard(int shardIndex) { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("ShardedIndexSearchers must be owned to be used")); - } - if (shardIndex < 0) { - throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid"); - } - return indexSearchers.get(shardIndex); - } - - @Override - protected RuntimeException createResourceClosedException() { - return new IllegalStateException("Closed"); - } - - @Override - protected Owned prepareSend() { - List indexSearchers = this.indexSearchers; - this.makeInaccessible(); - return drop -> new ShardedIndexSearchers(indexSearchers, drop); - } - - private void makeInaccessible() { - this.indexSearchers = null; - } - - private static class CloseOnDrop implements Drop { - - private final Drop delegate; - - public CloseOnDrop(Drop drop) { - this.delegate = drop; - } - - @Override - public void drop(ShardedIndexSearchers obj) { - try { - delegate.drop(obj); - } finally { - obj.makeInaccessible(); - } - } - } - } -} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LLSearchTransformer.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LLSearchTransformer.java new file mode 100644 index 0000000..0383297 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LLSearchTransformer.java @@ -0,0 +1,10 @@ +package it.cavallium.dbengine.lucene.searcher; + +import reactor.core.publisher.Mono; + +public interface LLSearchTransformer { + + LLSearchTransformer NO_TRANSFORMATION = queryParamsMono -> queryParamsMono; + + Mono transform(Mono queryParamsMono); +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java index 3346666..767b241 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java @@ -1,7 +1,7 @@ package it.cavallium.dbengine.lucene.searcher; -import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.disk.LLIndexContext; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import reactor.core.publisher.Mono; @@ -12,7 +12,7 @@ public interface LuceneLocalSearcher { * @param queryParams the query parameters * @param keyFieldName the name of the key field */ - Mono> collect(Mono> indexSearcherMono, + Mono> collect(Mono> indexSearcherMono, LocalQueryParams queryParams, String keyFieldName); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneMultiSearcher.java index 16fa9cb..cf75cac 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneMultiSearcher.java @@ -1,12 +1,10 @@ package it.cavallium.dbengine.lucene.searcher; -import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.disk.LLIndexContext; import it.cavallium.dbengine.database.disk.LLIndexSearcher; -import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; public interface LuceneMultiSearcher extends LuceneLocalSearcher { @@ -14,7 +12,7 @@ public interface LuceneMultiSearcher extends LuceneLocalSearcher { * @param queryParams the query parameters * @param keyFieldName the name of the key field */ - Mono> collect(Flux> indexSearchersFlux, + Mono> collect(Flux> indexSearchersFlux, LocalQueryParams queryParams, String keyFieldName); @@ -24,7 +22,7 @@ public interface LuceneMultiSearcher extends LuceneLocalSearcher { * @param keyFieldName the name of the key field */ @Override - default Mono> collect(Mono> indexSearcherMono, + default Mono> collect(Mono> indexSearcherMono, LocalQueryParams queryParams, String keyFieldName) { return this.collect(indexSearcherMono.flux(), queryParams, keyFieldName); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/PageData.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/PageData.java new file mode 100644 index 0000000..f5be5ee --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/PageData.java @@ -0,0 +1,5 @@ +package it.cavallium.dbengine.lucene.searcher; + +import org.apache.lucene.search.TopDocs; + +record PageData(TopDocs topDocs, CurrentPageInfo nextPageInfo) {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredLuceneMultiSearcher.java deleted file mode 100644 index 97064be..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredLuceneMultiSearcher.java +++ /dev/null @@ -1,37 +0,0 @@ -package it.cavallium.dbengine.lucene.searcher; - -import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT; -import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT; - -import it.cavallium.dbengine.lucene.LuceneUtils; -import org.apache.lucene.search.CollectorManager; -import org.apache.lucene.search.Sort; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.search.TopFieldCollector; -import reactor.core.publisher.Mono; - -public class ScoredLuceneMultiSearcher implements LuceneMultiSearcher { - - @Override - public Mono createShardSearcher(LocalQueryParams queryParams) { - return Mono - .fromCallable(() -> { - Sort luceneSort = queryParams.sort(); - if (luceneSort == null) { - luceneSort = Sort.RELEVANCE; - } - PaginationInfo paginationInfo; - if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) { - paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true); - } else { - paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false); - } - CollectorManager sharedManager = new ScoringShardsCollectorManager(luceneSort, - LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), - null, LuceneUtils.totalHitsThreshold(), LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()), - LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit())); - return new ScoredSimpleLuceneShardSearcher(sharedManager, queryParams.query(), paginationInfo); - }); - } - -} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java index 2d3fab5..97fe3f9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java @@ -1,159 +1,185 @@ package it.cavallium.dbengine.lucene.searcher; -import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS; +import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT; +import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT; +import io.net5.buffer.api.Send; import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexContext; +import it.cavallium.dbengine.database.disk.LLIndexContexts; import it.cavallium.dbengine.lucene.LuceneUtils; -import it.unimi.dsi.fastutil.objects.ObjectArrayList; -import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; -import org.apache.lucene.search.CollectorManager; -import org.apache.lucene.search.FieldDoc; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import org.apache.lucene.search.Sort; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.search.TopFieldCollector; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher { +public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher { - private final Object lock = new Object(); - private final List indexSearchersArray = new ArrayList<>(); - private final List> indexSearcherReleasersArray = new ArrayList<>(); - private final List collectors = new ArrayList<>(); - private final CollectorManager firstPageSharedManager; - private final Query luceneQuery; - private final PaginationInfo paginationInfo; - - public ScoredSimpleLuceneShardSearcher(CollectorManager firstPageSharedManager, - Query luceneQuery, PaginationInfo paginationInfo) { - this.firstPageSharedManager = firstPageSharedManager; - this.luceneQuery = luceneQuery; - this.paginationInfo = paginationInfo; + public ScoredSimpleLuceneShardSearcher() { } @Override - public Mono searchOn(IndexSearcher indexSearcher, - Mono releaseIndexSearcher, + public Mono> collect(Flux> indexSearchersFlux, LocalQueryParams queryParams, - Scheduler scheduler) { - return Mono.fromCallable(() -> { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called searchOn in a nonblocking thread"); - } - TopFieldCollector collector; - synchronized (lock) { - //noinspection BlockingMethodInNonBlockingContext - collector = firstPageSharedManager.newCollector(); - indexSearchersArray.add(indexSearcher); - indexSearcherReleasersArray.add(releaseIndexSearcher); - collectors.add(collector); - } - //noinspection BlockingMethodInNonBlockingContext - indexSearcher.search(luceneQuery, collector); - return null; - }).subscribeOn(scheduler); + String keyFieldName) { + Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); + PaginationInfo paginationInfo = getPaginationInfo(queryParams); + + var indexSearchersMono = indexSearchersFlux.collectList().map(LLIndexContexts::of); + + return LLUtils.usingResource(indexSearchersMono, indexSearchers -> this + // Search first page results + .searchFirstPage(indexSearchers, queryParams, paginationInfo) + // Compute the results of the first page + .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers, + keyFieldName, queryParams)) + // Compute other results + .transform(firstResult -> this.computeOtherResults(firstResult, indexSearchers, queryParams, keyFieldName)) + // Ensure that one LuceneSearchResult is always returned + .single(), + false); } - @Override - public Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler collectorScheduler) { - if (Schedulers.isInNonBlockingThread()) { - return Mono.error(() -> new UnsupportedOperationException("Called collect in a nonblocking thread")); + private Sort getSort(LocalQueryParams queryParams) { + Sort luceneSort = queryParams.sort(); + if (luceneSort == null) { + luceneSort = Sort.RELEVANCE; } - if (!queryParams.isScored()) { - return Mono.error(() -> new UnsupportedOperationException("Can't execute an unscored query" - + " with a scored lucene shard searcher")); + return luceneSort; + } + + /** + * Get the pagination info + */ + private PaginationInfo getPaginationInfo(LocalQueryParams queryParams) { + if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) { + return new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true); + } else { + return new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false); } + } + + /** + * Search effectively the raw results of the first page + */ + private Mono searchFirstPage(LLIndexContexts indexSearchers, + LocalQueryParams queryParams, + PaginationInfo paginationInfo) { + var limit = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()); + var pagination = !paginationInfo.forceSinglePage(); + var resultsOffset = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()); + return Mono + .fromSupplier(() -> new CurrentPageInfo(null, limit, 0)) + .flatMap(s -> this.searchPage(queryParams, indexSearchers, pagination, resultsOffset, s)); + } + + /** + * Compute the results of the first page, extracting useful data + */ + private Mono computeFirstPageResults(Mono firstPageDataMono, + LLIndexContexts indexSearchers, + String keyFieldName, + LocalQueryParams queryParams) { + return firstPageDataMono.map(firstPageData -> { + var totalHitsCount = LuceneUtils.convertTotalHitsCount(firstPageData.topDocs().totalHits); + + Flux firstPageHitsFlux = LuceneUtils.convertHits(Flux.fromArray(firstPageData.topDocs().scoreDocs), + indexSearchers, keyFieldName, true) + .take(queryParams.limit(), true); + + CurrentPageInfo nextPageInfo = firstPageData.nextPageInfo(); + + return new FirstPageResults(totalHitsCount, firstPageHitsFlux, nextPageInfo); + }); + } + + private Mono> computeOtherResults(Mono firstResultMono, + LLIndexContexts indexSearchers, + LocalQueryParams queryParams, + String keyFieldName) { + return firstResultMono.map(firstResult -> { + var totalHitsCount = firstResult.totalHitsCount(); + var firstPageHitsFlux = firstResult.firstPageHitsFlux(); + var secondPageInfo = firstResult.nextPageInfo(); + + Flux nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo); + + Flux combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux); + return new LuceneSearchResult(totalHitsCount, combinedFlux, d -> indexSearchers.close()).send(); + }); + } + + /** + * Search effectively the merged raw results of the next pages + */ + private Flux searchOtherPages(LLIndexContexts indexSearchers, + LocalQueryParams queryParams, String keyFieldName, CurrentPageInfo secondPageInfo) { + return Flux + .defer(() -> { + AtomicReference currentPageInfoRef = new AtomicReference<>(secondPageInfo); + return Flux + .defer(() -> searchPage(queryParams, indexSearchers, true, 0, currentPageInfoRef.get())) + .doOnNext(s -> currentPageInfoRef.set(s.nextPageInfo())) + .repeatWhen(s -> s.takeWhile(n -> n > 0)); + }) + .subscribeOn(Schedulers.boundedElastic()) + .map(PageData::topDocs) + .flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs)) + .transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers, + keyFieldName, true)); + } + + /** + * + * @param resultsOffset offset of the resulting topDocs. Useful if you want to + * skip the first n results in the first page + */ + private Mono searchPage(LocalQueryParams queryParams, + LLIndexContexts indexSearchers, + boolean allowPagination, + int resultsOffset, + CurrentPageInfo s) { return Mono .fromCallable(() -> { - TopDocs result; - Mono release; - synchronized (lock) { - //noinspection BlockingMethodInNonBlockingContext - result = firstPageSharedManager.reduce(collectors); - release = Mono.when(indexSearcherReleasersArray); + LLUtils.ensureBlocking(); + if (resultsOffset < 0) { + throw new IndexOutOfBoundsException(resultsOffset); } - IndexSearchers indexSearchers; - synchronized (lock) { - indexSearchers = IndexSearchers.of(indexSearchersArray); + if ((s.pageIndex() == 0 || s.last() != null) && s.remainingLimit() > 0) { + var sort = getSort(queryParams); + var limit = s.currentPageLimit(); + var totalHitsThreshold = LuceneUtils.totalHitsThreshold(); + return new ScoringShardsCollectorManager(sort, limit, null, + totalHitsThreshold, resultsOffset, s.currentPageLimit()); + } else { + return null; } - Flux firstPageHits = LuceneUtils - .convertHits(Flux.fromArray(result.scoreDocs), indexSearchers, keyFieldName, collectorScheduler, true); - - Flux nextHits; - nextHits = Flux - .generate( - () -> new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs), - paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), - (s, emitter) -> { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called collect in a nonblocking thread"); - } - - if (s.last() != null && s.remainingLimit() > 0) { - Sort luceneSort = queryParams.sort(); - if (luceneSort == null) { - luceneSort = Sort.RELEVANCE; - } - CollectorManager sharedManager - = new ScoringShardsCollectorManager(luceneSort, s.currentPageLimit(), - (FieldDoc) s.last(), LuceneUtils.totalHitsThreshold(), 0, s.currentPageLimit()); - - try { - var collectors = new ObjectArrayList(indexSearchersArray.size()); - for (IndexSearcher indexSearcher : indexSearchersArray) { - //noinspection BlockingMethodInNonBlockingContext - TopFieldCollector collector = sharedManager.newCollector(); - //noinspection BlockingMethodInNonBlockingContext - indexSearcher.search(luceneQuery, collector); - - collectors.add(collector); - } - - //noinspection BlockingMethodInNonBlockingContext - var pageTopDocs = sharedManager.reduce(collectors); - var pageLastDoc = LuceneUtils.getLastFieldDoc(pageTopDocs.scoreDocs); - emitter.next(pageTopDocs); - - s = new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), - s.pageIndex() + 1); - } catch (IOException ex) { - emitter.error(ex); - s = EMPTY_STATUS; - } - } else { - emitter.complete(); - s = EMPTY_STATUS; - } - return s; - }) - .subscribeOn(collectorScheduler) - .transform(flux -> { - if (paginationInfo.forceSinglePage() - || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { - return Flux.empty(); - } else { - return flux; - } - }) - .flatMapIterable(topFieldDoc -> Arrays.asList(topFieldDoc.scoreDocs)) - .transform(scoreDocs -> LuceneUtils.convertHits(scoreDocs, - indexSearchers, keyFieldName, collectorScheduler, true)); - - return new LuceneSearchResult(LuceneUtils.convertTotalHitsCount(result.totalHits), - firstPageHits - .concatWith(nextHits), - //.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), - release - ); }) - .subscribeOn(collectorScheduler); + .flatMap(sharedManager -> Flux + .fromIterable(indexSearchers.shards()) + .flatMap(shard -> Mono.fromCallable(() -> { + var collector = sharedManager.newCollector(); + shard.getIndexSearcher().search(queryParams.query(), collector); + return collector; + })) + .collectList() + .flatMap(collectors -> Mono.fromCallable(() -> { + var pageTopDocs = sharedManager.reduce(collectors); + var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs); + long nextRemainingLimit; + if (allowPagination) { + nextRemainingLimit = s.remainingLimit() - s.currentPageLimit(); + } else { + nextRemainingLimit = 0L; + } + var nextPageIndex = s.pageIndex() + 1; + var nextPageInfo = new CurrentPageInfo(pageLastDoc, nextRemainingLimit, nextPageIndex); + return new PageData(pageTopDocs, nextPageInfo); + })) + ); } - } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java index 284b006..7c1526a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java @@ -5,12 +5,12 @@ import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LI import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT; import io.net5.buffer.api.Send; -import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.disk.LLIndexSearcher; +import it.cavallium.dbengine.database.disk.LLIndexContext; +import it.cavallium.dbengine.database.disk.LLIndexContexts; import it.cavallium.dbengine.lucene.LuceneUtils; -import it.cavallium.dbengine.lucene.searcher.IndexSearchers.UnshardedIndexSearchers; +import it.cavallium.dbengine.database.disk.LLIndexContexts.UnshardedIndexSearchers; import java.io.IOException; import java.util.Arrays; import java.util.Objects; @@ -25,14 +25,14 @@ import reactor.core.scheduler.Schedulers; public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { @Override - public Mono> collect(Mono> indexSearcherMono, + public Mono> collect(Mono> indexSearcherMono, LocalQueryParams queryParams, String keyFieldName) { Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); PaginationInfo paginationInfo = getPaginationInfo(queryParams); - var indexSearchersMono = indexSearcherMono.map(IndexSearchers::unsharded); + var indexSearchersMono = indexSearcherMono.map(LLIndexContexts::unsharded); return LLUtils.usingResource(indexSearchersMono, indexSearchers -> this // Search first page results @@ -72,32 +72,11 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { .handle((s, sink) -> this.searchPageSync(queryParams, indexSearchers, pagination, resultsOffset, s, sink)); } - /** - * Search effectively the merged raw results of the next pages - */ - private Flux searchOtherPages(UnshardedIndexSearchers indexSearchers, - LocalQueryParams queryParams, String keyFieldName, CurrentPageInfo secondPageInfo) { - return Flux - .generate( - () -> secondPageInfo, - (s, sink) -> searchPageSync(queryParams, indexSearchers, true, 0, s, sink), - s -> {} - ) - .subscribeOn(Schedulers.boundedElastic()) - .map(PageData::topDocs) - .flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs)) - .transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers, - keyFieldName, true)); - } - - private static record FirstPageResults(TotalHitsCount totalHitsCount, Flux firstPageHitsFlux, - CurrentPageInfo nextPageInfo) {} - /** * Compute the results of the first page, extracting useful data */ private Mono computeFirstPageResults(Mono firstPageDataMono, - IndexSearchers indexSearchers, + LLIndexContexts indexSearchers, String keyFieldName, LocalQueryParams queryParams) { return firstPageDataMono.map(firstPageData -> { @@ -129,7 +108,23 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { }); } - private static record PageData(TopDocs topDocs, CurrentPageInfo nextPageInfo) {} + /** + * Search effectively the merged raw results of the next pages + */ + private Flux searchOtherPages(UnshardedIndexSearchers indexSearchers, + LocalQueryParams queryParams, String keyFieldName, CurrentPageInfo secondPageInfo) { + return Flux + .generate( + () -> secondPageInfo, + (s, sink) -> searchPageSync(queryParams, indexSearchers, true, 0, s, sink), + s -> {} + ) + .subscribeOn(Schedulers.boundedElastic()) + .map(PageData::topDocs) + .flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs)) + .transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers, + keyFieldName, true)); + } /** * diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java index ef99dde..045c1d2 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java @@ -6,6 +6,7 @@ import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexContext; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import java.util.ArrayList; import java.util.Comparator; @@ -22,7 +23,7 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea } @Override - public Mono> collect(Flux> indexSearchersFlux, + public Mono> collect(Flux> indexSearchersFlux, LocalQueryParams queryParams, String keyFieldName) { return Mono @@ -38,7 +39,7 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea } }) .thenMany(indexSearchersFlux) - .flatMap(resSend -> localSearcher.collect(Mono.just(resSend), queryParams, keyFieldName)) + .flatMap(resSend -> localSearcher.collect(Mono.just(resSend).share(), queryParams, keyFieldName)) .collectList() .map(results -> { List resultsToDrop = new ArrayList<>(results.size()); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneMultiSearcher.java deleted file mode 100644 index 4162b75..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneMultiSearcher.java +++ /dev/null @@ -1,34 +0,0 @@ -package it.cavallium.dbengine.lucene.searcher; - -import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT; -import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT; - -import it.cavallium.dbengine.lucene.LuceneUtils; -import reactor.core.publisher.Mono; - -public class UnscoredPagedLuceneMultiSearcher implements LuceneMultiSearcher { - - @Override - public Mono createShardSearcher(LocalQueryParams queryParams) { - return Mono - .fromCallable(() -> { - if (queryParams.isScored()) { - throw new UnsupportedOperationException("Can't use the unscored searcher to do a scored or sorted query"); - } - PaginationInfo paginationInfo; - if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) { - paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true); - } else { - paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false); - } - UnscoredTopDocsCollectorManager unsortedCollectorManager = new UnscoredTopDocsCollectorManager(() -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), - LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), - null, - LuceneUtils.totalHitsThreshold(), - !paginationInfo.forceSinglePage(), - queryParams.isScored() - ), queryParams.offset(), queryParams.limit(), queryParams.sort()); - return new UnscoredPagedLuceneShardSearcher(unsortedCollectorManager, queryParams.query(), paginationInfo); - }); - } -} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java deleted file mode 100644 index 917befd..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java +++ /dev/null @@ -1,151 +0,0 @@ -package it.cavallium.dbengine.lucene.searcher; - -import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS; - -import it.cavallium.dbengine.database.LLKeyScore; -import it.cavallium.dbengine.lucene.LuceneUtils; -import it.unimi.dsi.fastutil.objects.ObjectArrayList; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import org.apache.lucene.search.CollectorManager; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.search.TopDocsCollector; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; - -class UnscoredPagedLuceneShardSearcher implements LuceneMultiSearcher { - - private final Object lock = new Object(); - private final List indexSearchersArray = new ArrayList<>(); - private final List> indexSearcherReleasersArray = new ArrayList<>(); - private final List> collectors = new ArrayList<>(); - private final CollectorManager, TopDocs> firstPageUnsortedCollectorManager; - private final Query luceneQuery; - private final PaginationInfo paginationInfo; - - public UnscoredPagedLuceneShardSearcher( - CollectorManager, TopDocs> firstPagensortedCollectorManager, - Query luceneQuery, - PaginationInfo paginationInfo) { - this.firstPageUnsortedCollectorManager = firstPagensortedCollectorManager; - this.luceneQuery = luceneQuery; - this.paginationInfo = paginationInfo; - } - - @Override - public Mono searchOn(IndexSearcher indexSearcher, - Mono releaseIndexSearcher, - LocalQueryParams queryParams, - Scheduler scheduler) { - return Mono.fromCallable(() -> { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called searchOn in a nonblocking thread"); - } - TopDocsCollector collector; - synchronized (lock) { - //noinspection BlockingMethodInNonBlockingContext - collector = firstPageUnsortedCollectorManager.newCollector(); - indexSearchersArray.add(indexSearcher); - indexSearcherReleasersArray.add(releaseIndexSearcher); - collectors.add(collector); - } - //noinspection BlockingMethodInNonBlockingContext - indexSearcher.search(luceneQuery, collector); - return null; - }).subscribeOn(scheduler); - } - - @Override - public Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) { - return Mono - .fromCallable(() -> { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called collect in a nonblocking thread"); - } - TopDocs result; - Mono release; - synchronized (lock) { - //noinspection BlockingMethodInNonBlockingContext - result = firstPageUnsortedCollectorManager.reduce(collectors); - release = Mono.when(indexSearcherReleasersArray); - } - IndexSearchers indexSearchers; - synchronized (lock) { - indexSearchers = IndexSearchers.of(indexSearchersArray); - } - Flux firstPageHits = LuceneUtils - .convertHits(Flux.fromArray(result.scoreDocs), indexSearchers, keyFieldName, scheduler, false); - - Flux nextHits = Flux - .generate( - () -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(result.scoreDocs), - paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), - (s, sink) -> { - if (s.last() != null && s.remainingLimit() > 0 && s.currentPageLimit() > 0) { - Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); - Query luceneQuery = queryParams.query(); - int perShardCollectorLimit = s.currentPageLimit() / indexSearchersArray.size(); - UnscoredTopDocsCollectorManager currentPageUnsortedCollectorManager - = new UnscoredTopDocsCollectorManager( - () -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), perShardCollectorLimit, - s.last(), LuceneUtils.totalHitsThreshold(), true, queryParams.isScored()), - 0, s.currentPageLimit(), queryParams.sort()); - - try { - var collectors = new ObjectArrayList>(indexSearchersArray.size()); - for (IndexSearcher indexSearcher : indexSearchersArray) { - //noinspection BlockingMethodInNonBlockingContext - var collector = currentPageUnsortedCollectorManager.newCollector(); - //noinspection BlockingMethodInNonBlockingContext - indexSearcher.search(luceneQuery, collector); - - collectors.add(collector); - } - //noinspection BlockingMethodInNonBlockingContext - TopDocs pageTopDocs = currentPageUnsortedCollectorManager.reduce(collectors); - var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs); - - sink.next(pageTopDocs); - return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), - s.pageIndex() + 1); - } catch (IOException ex) { - sink.error(ex); - return EMPTY_STATUS; - } - } else { - sink.complete(); - return EMPTY_STATUS; - } - } - ) - .subscribeOn(scheduler) - .flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs)) - .transform(scoreDocsFlux -> LuceneUtils.convertHits(scoreDocsFlux, - indexSearchers, keyFieldName, scheduler, false)) - .transform(flux -> { - if (paginationInfo.forceSinglePage() - || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { - return Flux.empty(); - } else { - return flux; - } - }); - - return new LuceneSearchResult(LuceneUtils.convertTotalHitsCount(result.totalHits), firstPageHits - .concatWith(nextHits), - //.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), - release - ); - }) - .subscribeOn(scheduler); - } - -} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredTopDocsCollectorManager.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredTopDocsCollectorManager.java deleted file mode 100644 index f158c73..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredTopDocsCollectorManager.java +++ /dev/null @@ -1,70 +0,0 @@ -package it.cavallium.dbengine.lucene.searcher; - -import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER; -import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.ALLOW_UNSCORED_PAGINATION_MODE; - -import it.cavallium.dbengine.lucene.LuceneUtils; -import java.io.IOException; -import java.util.Collection; -import java.util.function.Supplier; -import org.apache.lucene.search.Collector; -import org.apache.lucene.search.CollectorManager; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.Sort; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.search.TopDocsCollector; -import org.apache.lucene.search.TopFieldDocs; -import org.jetbrains.annotations.Nullable; -import reactor.core.scheduler.Schedulers; - -public class UnscoredTopDocsCollectorManager implements - CollectorManager, TopDocs> { - - private final Supplier> collectorSupplier; - private final long offset; - private final long limit; - private final Sort sort; - - public UnscoredTopDocsCollectorManager(Supplier> collectorSupplier, - long offset, - long limit, - @Nullable Sort sort) { - this.collectorSupplier = collectorSupplier; - this.offset = offset; - this.limit = limit; - this.sort = sort; - } - - @Override - public TopDocsCollector newCollector() throws IOException { - return collectorSupplier.get(); - } - - @Override - public TopDocs reduce(Collection> collection) throws IOException { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called reduce in a nonblocking thread"); - } - int i = 0; - TopDocs[] topDocsArray; - if (sort != null) { - topDocsArray = new TopFieldDocs[collection.size()]; - } else { - topDocsArray = new TopDocs[collection.size()]; - } - for (TopDocsCollector topDocsCollector : collection) { - var topDocs = topDocsCollector.topDocs(); - for (ScoreDoc scoreDoc : topDocs.scoreDocs) { - scoreDoc.shardIndex = i; - } - topDocsArray[i] = topDocs; - i++; - } - return LuceneUtils.mergeTopDocs(sort, - LuceneUtils.safeLongToInt(offset), - LuceneUtils.safeLongToInt(limit), - topDocsArray, - TIE_BREAKER - ); - } -} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java deleted file mode 100644 index eacd58a..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java +++ /dev/null @@ -1,179 +0,0 @@ -package it.cavallium.dbengine.lucene.searcher; - -import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; -import it.cavallium.dbengine.lucene.LuceneUtils; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.LockSupport; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.search.Collector; -import org.apache.lucene.search.CollectorManager; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.SimpleCollector; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.EmitResult; -import reactor.core.publisher.Sinks.Many; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; - -public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMultiSearcher { - - private static final Scheduler UNSCORED_UNSORTED_EXECUTOR = Schedulers.newBoundedElastic(Runtime - .getRuntime() - .availableProcessors(), Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "UnscoredUnsortedExecutor"); - - @Override - public Mono createShardSearcher(LocalQueryParams queryParams) { - return Mono - .fromCallable(() -> { - AtomicBoolean alreadySubscribed = new AtomicBoolean(false); - Many scoreDocsSink = Sinks.many().unicast().onBackpressureBuffer(); - // 1 is the collect phase - AtomicInteger remainingCollectors = new AtomicInteger(1); - - if (queryParams.isScored()) { - throw new UnsupportedOperationException("Can't use the unscored searcher to do a scored or sorted query"); - } - - var cm = new CollectorManager() { - - class IterableCollector extends SimpleCollector { - - private int shardIndex; - private LeafReaderContext context; - - @Override - public void collect(int i) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called collect in a nonblocking thread"); - } - var scoreDoc = new ScoreDoc(context.docBase + i, 0, shardIndex); - synchronized (scoreDocsSink) { - while (scoreDocsSink.tryEmitNext(scoreDoc) == EmitResult.FAIL_OVERFLOW) { - LockSupport.parkNanos(10); - } - } - } - - @Override - protected void doSetNextReader(LeafReaderContext context) { - this.context = context; - } - - @Override - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE_NO_SCORES; - } - - public void setShardIndex(int shardIndex) { - this.shardIndex = shardIndex; - } - } - - @Override - public IterableCollector newCollector() { - return new IterableCollector(); - } - - @Override - public Void reduce(Collection collection) { - throw new UnsupportedOperationException(); - } - }; - - return new LuceneMultiSearcher() { - private final Object lock = new Object(); - private final List indexSearchersArray = new ArrayList<>(); - private final List> indexSearcherReleasersArray = new ArrayList<>(); - @Override - public Mono searchOn(IndexSearcher indexSearcher, - Mono releaseIndexSearcher, - LocalQueryParams queryParams, - Scheduler scheduler) { - return Mono - .fromCallable(() -> { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called searchOn in a nonblocking thread"); - } - //noinspection BlockingMethodInNonBlockingContext - var collector = cm.newCollector(); - int collectorShardIndex; - synchronized (lock) { - collectorShardIndex = indexSearchersArray.size(); - indexSearchersArray.add(indexSearcher); - indexSearcherReleasersArray.add(releaseIndexSearcher); - } - collector.setShardIndex(collectorShardIndex); - remainingCollectors.incrementAndGet(); - UNSCORED_UNSORTED_EXECUTOR.schedule(() -> { - try { - indexSearcher.search(queryParams.query(), collector); - - synchronized (scoreDocsSink) { - decrementRemainingCollectors(scoreDocsSink, remainingCollectors); - } - } catch (IOException e) { - scoreDocsSink.tryEmitError(e); - } - }); - return null; - }) - .subscribeOn(scheduler); - } - - @Override - public Mono collect(LocalQueryParams queryParams, - String keyFieldName, - Scheduler scheduler) { - return Mono - .fromCallable(() -> { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called collect in a nonblocking thread"); - } - synchronized (scoreDocsSink) { - decrementRemainingCollectors(scoreDocsSink, remainingCollectors); - } - - if (!alreadySubscribed.compareAndSet(false, true)) { - throw new UnsupportedOperationException("Already subscribed!"); - } - - IndexSearchers indexSearchers; - Mono release; - synchronized (lock) { - indexSearchers = IndexSearchers.of(indexSearchersArray); - release = Mono.when(indexSearcherReleasersArray); - } - - AtomicBoolean resultsAlreadySubscribed = new AtomicBoolean(false); - - var scoreDocsFlux = Mono.fromCallable(() -> { - if (!resultsAlreadySubscribed.compareAndSet(false, true)) { - throw new UnsupportedOperationException("Already subscribed!"); - } - return null; - }).thenMany(scoreDocsSink.asFlux()); - var resultsFlux = LuceneUtils - .convertHits(scoreDocsFlux, indexSearchers, keyFieldName, scheduler, false); - - return new LuceneSearchResult(TotalHitsCount.of(0, false), resultsFlux, release); - }) - .subscribeOn(scheduler); - } - }; - }); - } - - private static void decrementRemainingCollectors(Many scoreDocsSink, AtomicInteger remainingCollectors) { - if (remainingCollectors.decrementAndGet() <= 0) { - scoreDocsSink.tryEmitComplete(); - } - } -}