Missing: moreLikeThis

This commit is contained in:
Andrea Cavalli 2021-09-19 19:59:37 +02:00
parent 31422847c2
commit 65db1711b5
29 changed files with 847 additions and 1182 deletions

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.client;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
@ -52,9 +53,9 @@ public interface LuceneIndex<T, U> extends LLSnapshottable {
Mono<Void> deleteAll();
Mono<SearchResultKeys<T>> moreLikeThis(ClientQueryParams<SearchResultKey<T>> queryParams, T key, U mltDocumentValue);
Mono<Send<SearchResultKeys<T>>> moreLikeThis(ClientQueryParams<SearchResultKey<T>> queryParams, T key, U mltDocumentValue);
default Mono<SearchResult<T, U>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
default Mono<Send<SearchResult<T, U>>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
T key,
U mltDocumentValue,
ValueGetter<T, U> valueGetter) {
@ -64,21 +65,19 @@ public interface LuceneIndex<T, U> extends LLSnapshottable {
getValueGetterTransformer(valueGetter));
}
Mono<SearchResult<T, U>> moreLikeThisWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
Mono<Send<SearchResult<T, U>>> moreLikeThisWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
T key,
U mltDocumentValue,
ValueTransformer<T, U> valueTransformer);
Mono<SearchResultKeys<T>> search(ClientQueryParams<SearchResultKey<T>> queryParams);
Mono<Send<SearchResultKeys<T>>> search(ClientQueryParams<SearchResultKey<T>> queryParams);
default Mono<SearchResult<T, U>> searchWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
default Mono<Send<SearchResult<T, U>>> searchWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
ValueGetter<T, U> valueGetter) {
return this.searchWithTransformer(queryParams,
getValueGetterTransformer(valueGetter)
);
return this.searchWithTransformer(queryParams, getValueGetterTransformer(valueGetter));
}
Mono<SearchResult<T, U>> searchWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
Mono<Send<SearchResult<T, U>>> searchWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
ValueTransformer<T, U> valueTransformer);
Mono<TotalHitsCount> count(@Nullable CompositeSnapshot snapshot, Query query);

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.client;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
@ -84,58 +85,73 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
return luceneIndex.deleteAll();
}
private Mono<SearchResultKeys<T>> transformLuceneResultWithTransformer(LLSearchResultShard llSearchResult) {
return Mono.just(new SearchResultKeys<>(llSearchResult.results()
.map(signal -> new SearchResultKey<>(Mono.fromCallable(signal::key).map(indicizer::getKey), signal.score())),
llSearchResult.totalHitsCount(),
llSearchResult.release()
));
private Mono<Send<SearchResultKeys<T>>> transformLuceneResultWithTransformer(
Mono<Send<LLSearchResultShard>> llSearchResultMono) {
return llSearchResultMono.map(llSearchResultToReceive -> {
var llSearchResult = llSearchResultToReceive.receive();
return new SearchResultKeys<>(llSearchResult.results()
.map(signal -> new SearchResultKey<>(Mono
.fromCallable(signal::key)
.map(indicizer::getKey), signal.score())),
llSearchResult.totalHitsCount(),
d -> llSearchResult.close()
).send();
});
}
private Mono<SearchResult<T, U>> transformLuceneResultWithValues(LLSearchResultShard llSearchResult,
private Mono<Send<SearchResult<T, U>>> transformLuceneResultWithValues(
Mono<Send<LLSearchResultShard>> llSearchResultMono,
ValueGetter<T, U> valueGetter) {
return Mono.fromCallable(() -> new SearchResult<>(llSearchResult.results().map(signal -> {
var key = Mono.fromCallable(signal::key).map(indicizer::getKey);
return new SearchResultItem<>(key, key.flatMap(valueGetter::get), signal.score());
}), llSearchResult.totalHitsCount(), llSearchResult.release()));
return llSearchResultMono.map(llSearchResultToReceive -> {
var llSearchResult = llSearchResultToReceive.receive();
return new SearchResult<>(llSearchResult.results().map(signal -> {
var key = Mono.fromCallable(signal::key).map(indicizer::getKey);
return new SearchResultItem<>(key, key.flatMap(valueGetter::get), signal.score());
}), llSearchResult.totalHitsCount(), d -> llSearchResult.close()).send();
});
}
private Mono<SearchResult<T, U>> transformLuceneResultWithTransformer(LLSearchResultShard llSearchResult,
private Mono<Send<SearchResult<T, U>>> transformLuceneResultWithTransformer(
Mono<Send<LLSearchResultShard>> llSearchResultMono,
ValueTransformer<T, U> valueTransformer) {
var scoresWithKeysFlux = llSearchResult
.results()
.flatMapSequential(signal -> Mono
.fromCallable(signal::key)
.map(indicizer::getKey)
.map(key -> Tuples.of(signal.score(), key))
);
var resultItemsFlux = valueTransformer
.transform(scoresWithKeysFlux)
.filter(tuple3 -> tuple3.getT3().isPresent())
.map(tuple3 -> new SearchResultItem<>(Mono.just(tuple3.getT2()),
Mono.just(tuple3.getT3().orElseThrow()),
tuple3.getT1()
));
return Mono.fromCallable(() -> new SearchResult<>(resultItemsFlux,
llSearchResult.totalHitsCount(),
llSearchResult.release()
));
return llSearchResultMono
.map(llSearchResultToReceive -> {
var llSearchResult = llSearchResultToReceive.receive();
var scoresWithKeysFlux = llSearchResult
.results()
.flatMapSequential(signal -> Mono
.fromCallable(signal::key)
.map(indicizer::getKey)
.map(key -> Tuples.of(signal.score(), key))
);
var resultItemsFlux = valueTransformer
.transform(scoresWithKeysFlux)
.filter(tuple3 -> tuple3.getT3().isPresent())
.map(tuple3 -> new SearchResultItem<>(Mono.just(tuple3.getT2()),
Mono.just(tuple3.getT3().orElseThrow()),
tuple3.getT1()
));
return new SearchResult<>(resultItemsFlux,
llSearchResult.totalHitsCount(),
d -> llSearchResult.close()
).send();
});
}
@Override
public Mono<SearchResultKeys<T>> moreLikeThis(ClientQueryParams<SearchResultKey<T>> queryParams,
public Mono<Send<SearchResultKeys<T>>> moreLikeThis(ClientQueryParams<SearchResultKey<T>> queryParams,
T key,
U mltDocumentValue) {
Flux<Tuple2<String, Set<String>>> mltDocumentFields
= indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue);
return luceneIndex
.moreLikeThis(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName(), mltDocumentFields)
.flatMap(this::transformLuceneResultWithTransformer);
.transform(this::transformLuceneResultWithTransformer);
}
@Override
public Mono<SearchResult<T, U>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
public Mono<Send<SearchResult<T, U>>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
T key,
U mltDocumentValue,
ValueGetter<T, U> valueGetter) {
@ -147,13 +163,12 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
indicizer.getKeyFieldName(),
mltDocumentFields
)
.flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult,
valueGetter
));
.transform(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult,
valueGetter));
}
@Override
public Mono<SearchResult<T, U>> moreLikeThisWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
public Mono<Send<SearchResult<T, U>>> moreLikeThisWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
T key,
U mltDocumentValue,
ValueTransformer<T, U> valueTransformer) {
@ -165,40 +180,51 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
indicizer.getKeyFieldName(),
mltDocumentFields
)
.flatMap(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult, valueTransformer));
.transform(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult,
valueTransformer));
}
@Override
public Mono<SearchResultKeys<T>> search(ClientQueryParams<SearchResultKey<T>> queryParams) {
public Mono<Send<SearchResultKeys<T>>> search(ClientQueryParams<SearchResultKey<T>> queryParams) {
return luceneIndex
.search(resolveSnapshot(queryParams.snapshot()),
queryParams.toQueryParams(),
indicizer.getKeyFieldName()
)
.flatMap(this::transformLuceneResultWithTransformer);
.transform(this::transformLuceneResultWithTransformer);
}
@Override
public Mono<SearchResult<T, U>> searchWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
public Mono<Send<SearchResult<T, U>>> searchWithValues(
ClientQueryParams<SearchResultItem<T, U>> queryParams,
ValueGetter<T, U> valueGetter) {
return luceneIndex
.search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName())
.flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, valueGetter));
.search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(),
indicizer.getKeyFieldName())
.transform(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult,
valueGetter));
}
@Override
public Mono<SearchResult<T, U>> searchWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
public Mono<Send<SearchResult<T, U>>> searchWithTransformer(
ClientQueryParams<SearchResultItem<T, U>> queryParams,
ValueTransformer<T, U> valueTransformer) {
return luceneIndex
.search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName())
.flatMap(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult, valueTransformer));
.search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(),
indicizer.getKeyFieldName())
.transform(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult,
valueTransformer));
}
@Override
public Mono<TotalHitsCount> count(@Nullable CompositeSnapshot snapshot, Query query) {
return this
.search(ClientQueryParams.<SearchResultKey<T>>builder().snapshot(snapshot).query(query).limit(0).build())
.flatMap(tSearchResultKeys -> tSearchResultKeys.release().thenReturn(tSearchResultKeys.totalHitsCount()));
.map(searchResultKeysSend -> {
try (var searchResultKeys = searchResultKeysSend.receive()) {
return searchResultKeys.totalHitsCount();
}
});
}
@Override

View File

@ -1,5 +1,8 @@
package it.cavallium.dbengine.client;
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLSearchResultShard;
import java.util.Objects;
@ -8,35 +11,20 @@ import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public final class SearchResult<T, U> {
public final class SearchResult<T, U> extends ResourceSupport<SearchResult<T, U>, SearchResult<T, U>> {
private static final Logger logger = LoggerFactory.getLogger(SearchResult.class);
private Flux<SearchResultItem<T, U>> results;
private TotalHitsCount totalHitsCount;
private volatile boolean releaseCalled;
private final Flux<SearchResultItem<T, U>> results;
private final TotalHitsCount totalHitsCount;
private final Mono<Void> release;
public SearchResult(Flux<SearchResultItem<T, U>> results, TotalHitsCount totalHitsCount, Mono<Void> release) {
public SearchResult(Flux<SearchResultItem<T, U>> results, TotalHitsCount totalHitsCount,
Drop<SearchResult<T, U>> drop) {
super(new SearchResult.CloseOnDrop<>(drop));
this.results = results;
this.totalHitsCount = totalHitsCount;
this.release = Mono.fromRunnable(() -> {
if (releaseCalled) {
logger.warn(this.getClass().getName() + "::release has been called twice!");
}
releaseCalled = true;
}).then(release);
}
public static <T, U> SearchResult<T, U> empty() {
var sr = new SearchResult<T, U>(Flux.empty(), TotalHitsCount.of(0, true), Mono.empty());
sr.releaseCalled = true;
return sr;
}
public Flux<SearchResultItem<T, U>> resultsThenRelease() {
return Flux.usingWhen(Mono.just(true), _unused -> results, _unused -> release);
return new SearchResult<T, U>(Flux.empty(), TotalHitsCount.of(0, true), d -> {});
}
public Flux<SearchResultItem<T, U>> results() {
@ -47,39 +35,40 @@ public final class SearchResult<T, U> {
return totalHitsCount;
}
public Mono<Void> release() {
return release;
}
@Override
public boolean equals(Object obj) {
if (obj == this)
return true;
if (obj == null || obj.getClass() != this.getClass())
return false;
var that = (SearchResult) obj;
return Objects.equals(this.results, that.results) && Objects.equals(this.totalHitsCount, that.totalHitsCount)
&& Objects.equals(this.release, that.release);
}
@Override
public int hashCode() {
return Objects.hash(results, totalHitsCount, release);
}
@Override
public String toString() {
return "SearchResult[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ", " + "release="
+ release + ']';
return "SearchResult[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ']';
}
@SuppressWarnings("deprecation")
@Override
protected void finalize() throws Throwable {
if (!releaseCalled) {
logger.warn(this.getClass().getName() + "::release has not been called before class finalization!");
}
super.finalize();
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<SearchResult<T, U>> prepareSend() {
var results = this.results;
var totalHitsCount = this.totalHitsCount;
makeInaccessible();
return drop -> new SearchResult<>(results, totalHitsCount, drop);
}
private void makeInaccessible() {
this.results = null;
this.totalHitsCount = null;
}
private static class CloseOnDrop<V, W> implements Drop<SearchResult<V, W>> {
private final Drop<SearchResult<V, W>> delegate;
public CloseOnDrop(Drop<SearchResult<V, W>> drop) {
this.delegate = drop;
}
@Override
public void drop(SearchResult<V, W> obj) {
delegate.drop(obj);
}
}
}

View File

@ -1,5 +1,8 @@
package it.cavallium.dbengine.client;
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.collections.ValueGetter;
@ -11,42 +14,29 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SuppressWarnings("unused")
public final class SearchResultKeys<T> {
public final class SearchResultKeys<T> extends ResourceSupport<SearchResultKeys<T>, SearchResultKeys<T>> {
private static final Logger logger = LoggerFactory.getLogger(SearchResultKeys.class);
private volatile boolean releaseCalled;
private Flux<SearchResultKey<T>> results;
private TotalHitsCount totalHitsCount;
private final Flux<SearchResultKey<T>> results;
private final TotalHitsCount totalHitsCount;
private final Mono<Void> release;
public SearchResultKeys(Flux<SearchResultKey<T>> results, TotalHitsCount totalHitsCount, Mono<Void> release) {
public SearchResultKeys(Flux<SearchResultKey<T>> results, TotalHitsCount totalHitsCount,
Drop<SearchResultKeys<T>> drop) {
super(new SearchResultKeys.CloseOnDrop<>(drop));
this.results = results;
this.totalHitsCount = totalHitsCount;
this.release = Mono.fromRunnable(() -> {
if (releaseCalled) {
logger.warn(this.getClass().getName() + "::release has been called twice!");
}
releaseCalled = true;
}).then(release);
}
public static <T> SearchResultKeys<T> empty() {
var sr = new SearchResultKeys<T>(Flux.empty(), TotalHitsCount.of(0, true), Mono.empty());
sr.releaseCalled = true;
return sr;
return new SearchResultKeys<T>(Flux.empty(), TotalHitsCount.of(0, true), d -> {});
}
public <U> SearchResult<T, U> withValues(ValueGetter<T, U> valuesGetter) {
return new SearchResult<>(results.map(item -> new SearchResultItem<>(item.key(),
item.key().flatMap(valuesGetter::get),
item.score()
)), totalHitsCount, release);
}
public Flux<SearchResultKey<T>> resultsThenRelease() {
return Flux.usingWhen(Mono.just(true), _unused -> results, _unused -> release);
)), totalHitsCount, d -> this.close());
}
public Flux<SearchResultKey<T>> results() {
@ -57,39 +47,41 @@ public final class SearchResultKeys<T> {
return totalHitsCount;
}
public Mono<Void> release() {
return release;
}
@Override
public boolean equals(Object obj) {
if (obj == this)
return true;
if (obj == null || obj.getClass() != this.getClass())
return false;
var that = (SearchResultKeys) obj;
return Objects.equals(this.results, that.results) && Objects.equals(this.totalHitsCount, that.totalHitsCount)
&& Objects.equals(this.release, that.release);
}
@Override
public int hashCode() {
return Objects.hash(results, totalHitsCount, release);
}
@Override
public String toString() {
return "SearchResultKeys[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ", " + "release="
+ release + ']';
return "SearchResultKeys[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ']';
}
@SuppressWarnings("deprecation")
@Override
protected void finalize() throws Throwable {
if (!releaseCalled) {
logger.warn(this.getClass().getName() + "::release has not been called before class finalization!");
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<SearchResultKeys<T>> prepareSend() {
var results = this.results;
var totalHitsCount = this.totalHitsCount;
makeInaccessible();
return drop -> new SearchResultKeys<>(results, totalHitsCount, drop);
}
private void makeInaccessible() {
this.results = null;
this.totalHitsCount = null;
}
private static class CloseOnDrop<U> implements Drop<SearchResultKeys<U>> {
private final Drop<SearchResultKeys<U>> delegate;
public CloseOnDrop(Drop<SearchResultKeys<U>> drop) {
this.delegate = drop;
}
@Override
public void drop(SearchResultKeys<U> obj) {
delegate.drop(obj);
}
super.finalize();
}
}

View File

@ -10,19 +10,25 @@ import io.net5.buffer.api.Send;
import io.net5.util.IllegalReferenceCountException;
import io.net5.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.collections.DatabaseStage;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex;
import it.cavallium.dbengine.database.disk.MemorySegmentUtils;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.RandomSortField;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@ -37,10 +43,19 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.queries.mlt.MoreLikeThis;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.similarities.TFIDFSimilarity;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDB;
@ -381,6 +396,74 @@ public class LLUtils {
.doOnDiscard(Send.class, Send::close);
}
public static Mono<LocalQueryParams> getMoreLikeThisQuery(
LLIndexSearcher indexSearcher,
@Nullable LLSnapshot snapshot,
LocalQueryParams localQueryParams,
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux) {
Query luceneAdditionalQuery;
try {
luceneAdditionalQuery = localQueryParams.query();
} catch (Exception e) {
return Mono.error(e);
}
return mltDocumentFieldsFlux
.collectMap(Tuple2::getT1, Tuple2::getT2, HashMap::new)
.flatMap(mltDocumentFields -> {
mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty());
if (mltDocumentFields.isEmpty()) {
return Mono.just(new LocalQueryParams(new MatchNoDocsQuery(),
localQueryParams.offset(),
localQueryParams.limit(),
localQueryParams.minCompetitiveScore(),
localQueryParams.sort(),
localQueryParams.scoreMode()
));
}
new IndexSearcher
return indexSearcher.getIndexSearcher().search(snapshot, indexSearcher -> Mono.fromCallable(() -> {
var mlt = new MoreLikeThis(indexSearcher.getIndexReader());
mlt.setAnalyzer(llLocalLuceneIndex.indexWriter.getAnalyzer());
mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
mlt.setMinTermFreq(1);
mlt.setMinDocFreq(3);
mlt.setMaxDocFreqPct(20);
mlt.setBoost(localQueryParams.scoreMode().needsScores());
mlt.setStopWords(EnglishItalianStopFilter.getStopWordsString());
var similarity = llLocalLuceneIndex.getSimilarity();
if (similarity instanceof TFIDFSimilarity) {
mlt.setSimilarity((TFIDFSimilarity) similarity);
} else {
LLLocalLuceneIndex.logger.trace(MARKER_ROCKSDB, "Using an unsupported similarity algorithm for MoreLikeThis:"
+ " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity);
}
// Get the reference docId and apply it to MoreLikeThis, to generate the query
@SuppressWarnings({"unchecked", "rawtypes"})
var mltQuery = mlt.like((Map) mltDocumentFields);
Query luceneQuery;
if (!(luceneAdditionalQuery instanceof MatchAllDocsQuery)) {
luceneQuery = new BooleanQuery.Builder()
.add(mltQuery, Occur.MUST)
.add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST)
.build();
} else {
luceneQuery = mltQuery;
}
return luceneQuery;
})
.subscribeOn(Schedulers.boundedElastic())
.map(luceneQuery -> new LocalQueryParams(luceneQuery,
localQueryParams.offset(),
localQueryParams.limit(),
localQueryParams.minCompetitiveScore(),
localQueryParams.sort(),
localQueryParams.scoreMode()
)));
});
}
public static record DirectBuffer(@NotNull Send<Buffer> buffer, @NotNull ByteBuffer byteBuffer) {}
@NotNull

View File

@ -0,0 +1,81 @@
package it.cavallium.dbengine.database.disk;
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
public class LLIndexContext extends ResourceSupport<LLIndexContext, LLIndexContext> {
private LLIndexSearcher indexSearcher;
private LLSearchTransformer indexQueryTransformer;
protected LLIndexContext(Send<LLIndexSearcher> indexSearcher,
LLSearchTransformer indexQueryTransformer,
Drop<LLIndexContext> drop) {
super(new CloseOnDrop(drop));
this.indexSearcher = indexSearcher.receive();
this.indexQueryTransformer = indexQueryTransformer;
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<LLIndexContext> prepareSend() {
var indexSearcher = this.indexSearcher.send();
var indexQueryTransformer = this.indexQueryTransformer;
makeInaccessible();
return drop -> new LLIndexContext(indexSearcher, indexQueryTransformer, drop);
}
private void makeInaccessible() {
this.indexSearcher = null;
this.indexQueryTransformer = null;
}
public IndexSearcher getIndexSearcher() {
if (!isOwned()) {
throw new UnsupportedOperationException("Closed");
}
return indexSearcher.getIndexSearcher();
}
public IndexReader getIndexReader() {
if (!isOwned()) {
throw new UnsupportedOperationException("Closed");
}
return indexSearcher.getIndexReader();
}
public LLSearchTransformer getIndexQueryTransformer() {
if (!isOwned()) {
throw new UnsupportedOperationException("Closed");
}
return indexQueryTransformer;
}
private static class CloseOnDrop implements Drop<LLIndexContext> {
private final Drop<LLIndexContext> delegate;
public CloseOnDrop(Drop<LLIndexContext> drop) {
this.delegate = drop;
}
@Override
public void drop(LLIndexContext obj) {
try {
if (obj.indexSearcher != null) obj.indexSearcher.close();
delegate.drop(obj);
} finally {
obj.makeInaccessible();
}
}
}
}

View File

@ -0,0 +1,199 @@
package it.cavallium.dbengine.database.disk;
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Term;
public interface LLIndexContexts extends Resource<LLIndexContexts> {
static LLIndexContexts of(List<Send<LLIndexContext>> indexSearchers) {
return new ShardedIndexSearchers(indexSearchers, d -> {});
}
static UnshardedIndexSearchers unsharded(Send<LLIndexContext> indexSearcher) {
return new UnshardedIndexSearchers(indexSearcher, d -> {});
}
Iterable<LLIndexContext> shards();
LLIndexContext shard(int shardIndex);
IndexReader allShards();
class UnshardedIndexSearchers extends ResourceSupport<LLIndexContexts, UnshardedIndexSearchers>
implements LLIndexContexts {
private LLIndexContext indexSearcher;
public UnshardedIndexSearchers(Send<LLIndexContext> indexSearcher, Drop<UnshardedIndexSearchers> drop) {
super(new CloseOnDrop(drop));
this.indexSearcher = indexSearcher.receive();
}
@Override
public Iterable<LLIndexContext> shards() {
return Collections.singleton(indexSearcher);
}
@Override
public LLIndexContext shard(int shardIndex) {
if (!isOwned()) {
throw attachTrace(new IllegalStateException("UnshardedIndexSearchers must be owned to be used"));
}
if (shardIndex != -1) {
throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid, this is a unsharded index");
}
return indexSearcher;
}
@Override
public IndexReader allShards() {
return indexSearcher.getIndexReader();
}
public LLIndexContext shard() {
return this.shard(0);
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<UnshardedIndexSearchers> prepareSend() {
Send<LLIndexContext> indexSearcher = this.indexSearcher.send();
this.makeInaccessible();
return drop -> new UnshardedIndexSearchers(indexSearcher, drop);
}
private void makeInaccessible() {
this.indexSearcher = null;
}
private static class CloseOnDrop implements Drop<UnshardedIndexSearchers> {
private final Drop<UnshardedIndexSearchers> delegate;
public CloseOnDrop(Drop<UnshardedIndexSearchers> drop) {
this.delegate = drop;
}
@Override
public void drop(UnshardedIndexSearchers obj) {
try {
if (obj.indexSearcher != null) obj.indexSearcher.close();
delegate.drop(obj);
} finally {
obj.makeInaccessible();
}
}
}
}
class ShardedIndexSearchers extends ResourceSupport<LLIndexContexts, ShardedIndexSearchers>
implements LLIndexContexts {
private List<LLIndexContext> indexSearchers;
public ShardedIndexSearchers(List<Send<LLIndexContext>> indexSearchers, Drop<ShardedIndexSearchers> drop) {
super(new CloseOnDrop(drop));
this.indexSearchers = new ArrayList<>(indexSearchers.size());
for (Send<LLIndexContext> indexSearcher : indexSearchers) {
this.indexSearchers.add(indexSearcher.receive());
}
}
@Override
public Iterable<LLIndexContext> shards() {
return Collections.unmodifiableList(indexSearchers);
}
@Override
public LLIndexContext shard(int shardIndex) {
if (!isOwned()) {
throw attachTrace(new IllegalStateException("ShardedIndexSearchers must be owned to be used"));
}
if (shardIndex < 0) {
throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid");
}
return indexSearchers.get(shardIndex);
}
@Override
public IndexReader allShards() {
var irs = new IndexReader[indexSearchers.size()];
for (int i = 0, s = indexSearchers.size(); i < s; i++) {
irs[i] = indexSearchers.get(i).getIndexReader();
}
Object2IntOpenHashMap<IndexReader> indexes = new Object2IntOpenHashMap<>();
for (int i = 0; i < irs.length; i++) {
indexes.put(irs[i], i);
}
try {
return new MultiReader(irs, Comparator.comparingInt(indexes::getInt), true);
} catch (IOException ex) {
// This shouldn't happen
throw new UncheckedIOException(ex);
}
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<ShardedIndexSearchers> prepareSend() {
List<Send<LLIndexContext>> indexSearchers = new ArrayList<>(this.indexSearchers.size());
for (LLIndexContext indexSearcher : this.indexSearchers) {
indexSearchers.add(indexSearcher.send());
}
this.makeInaccessible();
return drop -> new ShardedIndexSearchers(indexSearchers, drop);
}
private void makeInaccessible() {
this.indexSearchers = null;
}
private static class CloseOnDrop implements Drop<ShardedIndexSearchers> {
private final Drop<ShardedIndexSearchers> delegate;
public CloseOnDrop(Drop<ShardedIndexSearchers> drop) {
this.delegate = drop;
}
@Override
public void drop(ShardedIndexSearchers obj) {
try {
if (obj.indexSearchers != null) {
for (LLIndexContext indexSearcher : obj.indexSearchers) {
indexSearcher.close();
}
}
delegate.drop(obj);
} finally {
obj.makeInaccessible();
}
}
}
}
}

View File

@ -85,10 +85,12 @@ public class LLIndexSearcher extends ResourceSupport<LLIndexSearcher, LLIndexSea
obj.associatedSearcherManager.release(obj.indexSearcher);
}
}
delegate.drop(obj);
} catch (IOException e) {
logger.error("Failed to drop CachedIndexSearcher", e);
} finally {
obj.makeInaccessible();
}
delegate.drop(obj);
}
}
}

View File

@ -1,7 +1,6 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.DirectIOOptions;
@ -10,7 +9,6 @@ import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.NRTCachingOptions;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.EnglishItalianStopFilter;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResultShard;
@ -23,11 +21,12 @@ import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LuceneLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
@ -39,15 +38,7 @@ import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.misc.store.DirectIODirectory;
import org.apache.lucene.queries.mlt.MoreLikeThis;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.search.similarities.TFIDFSimilarity;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
@ -331,10 +322,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
QueryParams queryParams,
String keyFieldName,
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux) {
return getMoreLikeThisQuery(snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux)
return LLUtils
.getMoreLikeThisQuery(this, snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux)
.flatMap(modifiedLocalQuery -> searcherManager
.retrieveSearcher(snapshot)
.flatMap(indexSearcher -> localSearcher.collect(indexSearcher, modifiedLocalQuery, keyFieldName))
.transform(indexSearcher -> localSearcher.collect(indexSearcher, modifiedLocalQuery, keyFieldName))
)
.map(resultToReceive -> {
var result = resultToReceive.receive();
@ -343,11 +335,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.doOnDiscard(Send.class, Send::close);
}
public Mono<Void> distributedMoreLikeThis(@Nullable LLSnapshot snapshot,
public Mono<LLSearchTransformer> getMoreLikeThisTransformer(@Nullable LLSnapshot snapshot,
QueryParams queryParams,
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux,
LuceneMultiSearcher shardSearcher) {
return getMoreLikeThisQuery(snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux)
return LLUtils
.getMoreLikeThisQuery(this, snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux)
.flatMap(modifiedLocalQuery -> searcherManager
.retrieveSearcher(snapshot)
.flatMap(indexSearcher -> shardSearcher.searchOn(indexSearcher, modifiedLocalQuery))
@ -355,78 +348,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.doOnDiscard(Send.class, Send::close);
}
public Mono<LocalQueryParams> getMoreLikeThisQuery(@Nullable LLSnapshot snapshot,
LocalQueryParams localQueryParams,
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux) {
Query luceneAdditionalQuery;
try {
luceneAdditionalQuery = localQueryParams.query();
} catch (Exception e) {
return Mono.error(e);
}
return mltDocumentFieldsFlux
.collectMap(Tuple2::getT1, Tuple2::getT2, HashMap::new)
.flatMap(mltDocumentFields -> {
mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty());
if (mltDocumentFields.isEmpty()) {
return Mono.just(new LocalQueryParams(new MatchNoDocsQuery(),
localQueryParams.offset(),
localQueryParams.limit(),
localQueryParams.minCompetitiveScore(),
localQueryParams.sort(),
localQueryParams.scoreMode()
));
}
return this.searcherManager.search(snapshot, indexSearcher -> Mono.fromCallable(() -> {
var mlt = new MoreLikeThis(indexSearcher.getIndexReader());
mlt.setAnalyzer(indexWriter.getAnalyzer());
mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
mlt.setMinTermFreq(1);
mlt.setMinDocFreq(3);
mlt.setMaxDocFreqPct(20);
mlt.setBoost(localQueryParams.scoreMode().needsScores());
mlt.setStopWords(EnglishItalianStopFilter.getStopWordsString());
var similarity = getSimilarity();
if (similarity instanceof TFIDFSimilarity) {
mlt.setSimilarity((TFIDFSimilarity) similarity);
} else {
logger.trace(MARKER_ROCKSDB, "Using an unsupported similarity algorithm for MoreLikeThis:"
+ " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity);
}
// Get the reference docId and apply it to MoreLikeThis, to generate the query
@SuppressWarnings({"unchecked", "rawtypes"})
var mltQuery = mlt.like((Map) mltDocumentFields);
Query luceneQuery;
if (!(luceneAdditionalQuery instanceof MatchAllDocsQuery)) {
luceneQuery = new BooleanQuery.Builder()
.add(mltQuery, Occur.MUST)
.add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST)
.build();
} else {
luceneQuery = mltQuery;
}
return luceneQuery;
})
.subscribeOn(Schedulers.boundedElastic())
.map(luceneQuery -> new LocalQueryParams(luceneQuery,
localQueryParams.offset(),
localQueryParams.limit(),
localQueryParams.minCompetitiveScore(),
localQueryParams.sort(),
localQueryParams.scoreMode()
)));
});
}
@Override
public Mono<Send<LLSearchResultShard>> search(@Nullable LLSnapshot snapshot, QueryParams queryParams,
String keyFieldName) {
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
return searcherManager
.retrieveSearcher(snapshot)
.flatMap(indexSearcher -> localSearcher.collect(indexSearcher, localQueryParams, keyFieldName))
.transform(indexSearcher -> localSearcher.collect(indexSearcher, localQueryParams,
LLSearchTransformer.NO_TRANSFORMATION, keyFieldName))
.map(resultToReceive -> {
var result = resultToReceive.receive();
return new LLSearchResultShard(result.results(), result.totalHitsCount(), d -> result.close()).send();
@ -434,13 +363,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.doOnDiscard(Send.class, Send::close);
}
public Mono<Void> distributedSearch(@Nullable LLSnapshot snapshot,
QueryParams queryParams,
LuceneMultiSearcher shardSearcher) {
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
public Mono<Send<LLIndexContext>> retrieveContext(@Nullable LLSnapshot snapshot,
@Nullable LLSearchTransformer indexQueryTransformer) {
return searcherManager
.retrieveSearcher(snapshot)
.flatMap(indexSearcher -> shardSearcher.searchOn(indexSearcher, localQueryParams))
.map(indexSearcherToReceive -> new LLIndexContext(indexSearcherToReceive,
Objects.requireNonNullElse(indexQueryTransformer, LLSearchTransformer.NO_TRANSFORMATION),
d -> {}).send())
.doOnDiscard(Send.class, Send::close);
}

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.database.disk;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
@ -11,6 +12,7 @@ import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher;
import java.io.IOException;
@ -26,6 +28,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -86,6 +89,20 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
return luceneIndices[0].getLuceneIndexName();
}
private Flux<Send<LLIndexContext>> getIndexContexts(LLSnapshot snapshot,
Function<LLLocalLuceneIndex, LLSearchTransformer> indexQueryTransformers) {
return Flux
.fromArray(luceneIndices)
.index()
// Resolve the snapshot of each shard
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
.flatMap(luceneSnapshot -> tuple.getT2().retrieveContext(
luceneSnapshot.orElse(null), indexQueryTransformers.apply(tuple.getT2()))
)
);
}
@Override
public Mono<Void> addDocument(LLTerm id, LLDocument doc) {
return getLuceneIndex(id).addDocument(id, doc);
@ -176,12 +193,23 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<LLSearchResultShard> moreLikeThis(@Nullable LLSnapshot snapshot,
public Mono<Send<LLSearchResultShard>> moreLikeThis(@Nullable LLSnapshot snapshot,
QueryParams queryParams,
String keyFieldName,
Flux<Tuple2<String, Set<String>>> mltDocumentFields) {
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
record LuceneIndexWithSnapshot(LLLocalLuceneIndex luceneIndex, Optional<LLSnapshot> snapshot) {}
Flux<Send<LLIndexContext>> serchers = this
.getIndexContexts(snapshot, luceneIndex -> LLSearchTransformer.NO_TRANSFORMATION);
// Collect all the shards results into a single global result
return multiSearcher
.collect(serchers, localQueryParams, keyFieldName)
// Transform the result type
.map(resultToReceive -> {
var result = resultToReceive.receive();
return new LLSearchResultShard(result.results(), result.totalHitsCount(),
d -> result.close()).send();
});
return multiSearcher
// Create shard searcher
@ -205,31 +233,21 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<LLSearchResultShard> search(@Nullable LLSnapshot snapshot,
public Mono<Send<LLSearchResultShard>> search(@Nullable LLSnapshot snapshot,
QueryParams queryParams,
String keyFieldName) {
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
record LuceneIndexWithSnapshot(LLLocalLuceneIndex luceneIndex, Optional<LLSnapshot> snapshot) {}
Flux<Send<LLIndexSearcher>> serchers = getIndexContexts(snapshot);
// Collect all the shards results into a single global result
return multiSearcher
// Create shard searcher
.createShardSearcher(localQueryParams)
.flatMap(shardSearcher -> Flux
// Iterate the indexed shards
.fromArray(luceneIndices).index()
// Resolve the snapshot of each shard
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
.map(luceneSnapshot -> new LuceneIndexWithSnapshot(tuple.getT2(), luceneSnapshot))
)
// Execute the query and collect it using the shard searcher
.flatMap(luceneIndexWithSnapshot -> luceneIndexWithSnapshot.luceneIndex()
.distributedSearch(luceneIndexWithSnapshot.snapshot.orElse(null), queryParams, shardSearcher))
// Collect all the shards results into a single global result
.then(shardSearcher.collect(localQueryParams, keyFieldName, luceneSearcherScheduler))
)
// Fix the result type
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release()));
.collect(serchers, localQueryParams, keyFieldName)
// Transform the result type
.map(resultToReceive -> {
var result = resultToReceive.receive();
return new LLSearchResultShard(result.results(), result.totalHitsCount(),
d -> result.close()).send();
});
}
@Override
@ -289,4 +307,32 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
public boolean isLowMemoryMode() {
return luceneIndices[0].isLowMemoryMode();
}
private class MoreLikeThisTransformer implements LLSearchTransformer {
private final LLLocalLuceneIndex luceneIndex;
private final LLSnapshot snapshot;
private final String keyFieldName;
private final Flux<Tuple2<String, Set<String>>> mltDocumentFields;
public MoreLikeThisTransformer(LLLocalLuceneIndex luceneIndex,
@Nullable LLSnapshot snapshot,
String keyFieldName,
Flux<Tuple2<String, Set<String>>> mltDocumentFields) {
this.luceneIndex = luceneIndex;
this.snapshot = snapshot;
this.keyFieldName = keyFieldName;
this.mltDocumentFields = mltDocumentFields;
}
@Override
public Mono<LocalQueryParams> transform(Mono<LocalQueryParams> queryParamsMono) {
return queryParamsMono
.flatMap(queryParams -> {
luceneIndex.getMoreLikeThisTransformer(snapshot, queryParams, mltDocumentFields, );
});
LLLocalMultiLuceneIndex.this.
return null;
}
}
}

View File

@ -1,41 +1,32 @@
package it.cavallium.dbengine.lucene;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.query.BasicType;
import it.cavallium.dbengine.client.query.QueryParser;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.ValueGetter;
import it.cavallium.dbengine.database.disk.LLIndexContexts;
import it.cavallium.dbengine.lucene.analyzer.NCharGramAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer;
import it.cavallium.dbengine.lucene.searcher.IndexSearchers;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher;
import it.cavallium.dbengine.lucene.similarity.NGramSimilarity;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.LowerCaseFilter;
@ -65,11 +56,9 @@ import org.novasearch.lucene.search.similarities.BM25Similarity.BM25Model;
import org.novasearch.lucene.search.similarities.LdpSimilarity;
import org.novasearch.lucene.search.similarities.LtcSimilarity;
import org.novasearch.lucene.search.similarities.RobertsonSimilarity;
import org.reactivestreams.Publisher;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
@ -367,7 +356,7 @@ public class LuceneUtils {
}
public static Flux<LLKeyScore> convertHits(Flux<ScoreDoc> hitsFlux,
IndexSearchers indexSearchers,
LLIndexContexts indexSearchers,
String keyFieldName,
boolean preserveOrder) {
if (preserveOrder) {
@ -392,7 +381,7 @@ public class LuceneUtils {
@Nullable
private static LLKeyScore mapHitBlocking(ScoreDoc hit,
IndexSearchers indexSearchers,
LLIndexContexts indexSearchers,
String keyFieldName) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called mapHitBlocking in a nonblocking thread");

View File

@ -1,34 +1,24 @@
package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import org.apache.lucene.search.IndexSearcher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher {
private static final LuceneLocalSearcher localSearcher = new SimpleLuceneLocalSearcher();
private static final LuceneLocalSearcher unscoredPagedLuceneLocalSearcher = new LocalLuceneWrapper(new UnscoredUnsortedContinuousLuceneMultiSearcher(), d -> {});
private static final LuceneLocalSearcher countSearcher = new CountLuceneLocalSearcher();
@Override
public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexSearcher>> indexSearcher,
public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexContext>> indexSearcher,
LocalQueryParams queryParams,
String keyFieldName) {
Mono<Send<LuceneSearchResult>> collectionMono;
if (queryParams.limit() == 0) {
collectionMono = countSearcher.collect(indexSearcher, queryParams, keyFieldName);
} else if (!queryParams.isScored() && queryParams.offset() == 0 && queryParams.limit() >= 2147483630
&& !queryParams.isSorted()) {
collectionMono = unscoredPagedLuceneLocalSearcher.collect(indexSearcher, queryParams, keyFieldName);
return countSearcher.collect(indexSearcher, queryParams, keyFieldName);
} else {
collectionMono = localSearcher.collect(indexSearcher, queryParams, keyFieldName);
return localSearcher.collect(indexSearcher, queryParams, keyFieldName);
}
return Mono.fromRunnable(LLUtils::ensureBlocking).then(collectionMono);
}
}

View File

@ -1,31 +1,32 @@
package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher {
private static final LuceneMultiSearcher scoredLuceneMultiSearcher = new ScoredLuceneMultiSearcher();
private static final LuceneMultiSearcher countLuceneMultiSearcher
= new SimpleUnsortedUnscoredLuceneMultiSearcher(new CountLuceneLocalSearcher());
private static final LuceneMultiSearcher unscoredPagedLuceneMultiSearcher = new UnscoredPagedLuceneMultiSearcher();
private static final LuceneMultiSearcher scoredSimpleLuceneShardSearcher
= new ScoredSimpleLuceneShardSearcher();
private static final LuceneMultiSearcher unscoredIterableLuceneMultiSearcher = new UnscoredUnsortedContinuousLuceneMultiSearcher();
private static final LuceneMultiSearcher countLuceneMultiSearcher = new CountLuceneMultiSearcher();
private static final LuceneMultiSearcher unscoredPagedLuceneMultiSearcher
= new SimpleUnsortedUnscoredLuceneMultiSearcher(new SimpleLuceneLocalSearcher());
@Override
public Mono<Send<LuceneMultiSearcher>> createShardSearcher(LocalQueryParams queryParams) {
Mono<Send<LuceneMultiSearcher>> shardSearcherCreationMono;
if (queryParams.limit() <= 0) {
shardSearcherCreationMono = countLuceneMultiSearcher.createShardSearcher(queryParams);
} else if (queryParams.isScored()) {
shardSearcherCreationMono = scoredLuceneMultiSearcher.createShardSearcher(queryParams);
} else if (queryParams.offset() == 0 && queryParams.limit() >= 2147483630 && !queryParams.isSorted()) {
shardSearcherCreationMono = unscoredIterableLuceneMultiSearcher.createShardSearcher(queryParams);
public Mono<Send<LuceneSearchResult>> collect(Flux<Send<LLIndexContext>> indexSearchersFlux,
LocalQueryParams queryParams,
String keyFieldName) {
if (queryParams.limit() == 0) {
return countLuceneMultiSearcher.collect(indexSearchersFlux, queryParams, keyFieldName);
} else if (queryParams.isSorted() || queryParams.isScored()) {
return scoredSimpleLuceneShardSearcher.collect(indexSearchersFlux, queryParams, keyFieldName);
} else {
shardSearcherCreationMono = unscoredPagedLuceneMultiSearcher.createShardSearcher(queryParams);
return unscoredPagedLuceneMultiSearcher.collect(indexSearchersFlux, queryParams, keyFieldName);
}
return Mono.fromRunnable(LLUtils::ensureBlocking).then(shardSearcherCreationMono);
}
}

View File

@ -1,10 +1,9 @@
package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -13,7 +12,7 @@ import reactor.core.scheduler.Schedulers;
public class CountLuceneLocalSearcher implements LuceneLocalSearcher {
@Override
public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexContext>> indexSearcherMono,
LocalQueryParams queryParams,
String keyFieldName) {
return Mono

View File

@ -1,88 +0,0 @@
package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import java.util.concurrent.atomic.AtomicLong;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class CountLuceneMultiSearcher implements LuceneMultiSearcher {
@Override
public Mono<Send<LuceneMultiSearcher>> createShardSearcher(LocalQueryParams queryParams) {
return Mono.fromCallable(() -> new CountLuceneShardSearcher(new AtomicLong(0), d -> {}).send());
}
private static class CountLuceneShardSearcher extends
ResourceSupport<LuceneMultiSearcher, CountLuceneShardSearcher> implements LuceneMultiSearcher {
private AtomicLong totalHitsCount;
public CountLuceneShardSearcher(AtomicLong totalHitsCount, Drop<CountLuceneShardSearcher> drop) {
super(new CloseOnDrop(drop));
this.totalHitsCount = totalHitsCount;
}
@Override
public Mono<Void> searchOn(Send<LLIndexSearcher> indexSearcher, LocalQueryParams queryParams) {
return Mono
.fromCallable(() -> {
try (var is = indexSearcher.receive()) {
if (!isOwned()) {
throw attachTrace(new IllegalStateException("CountLuceneMultiSearcher must be owned to be used"));
}
LLUtils.ensureBlocking();
totalHitsCount.addAndGet(is.getIndexSearcher().count(queryParams.query()));
return null;
}
});
}
@Override
public Mono<Send<LuceneSearchResult>> collect(LocalQueryParams queryParams, String keyFieldName) {
return Mono.fromCallable(() -> {
if (!isOwned()) {
throw attachTrace(new IllegalStateException("CountLuceneMultiSearcher must be owned to be used"));
}
LLUtils.ensureBlocking();
return new LuceneSearchResult(TotalHitsCount.of(totalHitsCount.get(), true), Flux.empty(), d -> {})
.send();
});
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<CountLuceneShardSearcher> prepareSend() {
var totalHitsCount = this.totalHitsCount;
makeInaccessible();
return drop -> new CountLuceneShardSearcher(totalHitsCount, drop);
}
private void makeInaccessible() {
this.totalHitsCount = null;
}
private static class CloseOnDrop implements Drop<CountLuceneShardSearcher> {
private final Drop<CountLuceneShardSearcher> delegate;
public CloseOnDrop(Drop<CountLuceneShardSearcher> drop) {
this.delegate = drop;
}
@Override
public void drop(CountLuceneShardSearcher obj) {
delegate.drop(obj);
}
}
}
}

View File

@ -0,0 +1,8 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import reactor.core.publisher.Flux;
record FirstPageResults(TotalHitsCount totalHitsCount, Flux<LLKeyScore> firstPageHitsFlux,
CurrentPageInfo nextPageInfo) {}

View File

@ -1,144 +0,0 @@
package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.UnpooledDirectByteBuf;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.search.IndexSearcher;
public interface IndexSearchers extends Resource<IndexSearchers> {
static IndexSearchers of(List<LLIndexSearcher> indexSearchers) {
return new ShardedIndexSearchers(indexSearchers, d -> {});
}
static UnshardedIndexSearchers unsharded(Send<LLIndexSearcher> indexSearcher) {
return new UnshardedIndexSearchers(indexSearcher, d -> {});
}
LLIndexSearcher shard(int shardIndex);
class UnshardedIndexSearchers extends ResourceSupport<IndexSearchers, UnshardedIndexSearchers>
implements IndexSearchers {
private LLIndexSearcher indexSearcher;
public UnshardedIndexSearchers(Send<LLIndexSearcher> indexSearcher, Drop<UnshardedIndexSearchers> drop) {
super(new CloseOnDrop(drop));
this.indexSearcher = indexSearcher.receive();
}
@Override
public LLIndexSearcher shard(int shardIndex) {
if (!isOwned()) {
throw attachTrace(new IllegalStateException("UnshardedIndexSearchers must be owned to be used"));
}
if (shardIndex != -1) {
throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid, this is a unsharded index");
}
return indexSearcher;
}
public LLIndexSearcher shard() {
return this.shard(0);
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<UnshardedIndexSearchers> prepareSend() {
Send<LLIndexSearcher> indexSearcher = this.indexSearcher.send();
this.makeInaccessible();
return drop -> new UnshardedIndexSearchers(indexSearcher, drop);
}
private void makeInaccessible() {
this.indexSearcher = null;
}
private static class CloseOnDrop implements Drop<UnshardedIndexSearchers> {
private final Drop<UnshardedIndexSearchers> delegate;
public CloseOnDrop(Drop<UnshardedIndexSearchers> drop) {
this.delegate = drop;
}
@Override
public void drop(UnshardedIndexSearchers obj) {
try {
if (obj.indexSearcher != null) obj.indexSearcher.close();
delegate.drop(obj);
} finally {
obj.makeInaccessible();
}
}
}
}
class ShardedIndexSearchers extends ResourceSupport<IndexSearchers, ShardedIndexSearchers>
implements IndexSearchers {
private List<LLIndexSearcher> indexSearchers;
public ShardedIndexSearchers(List<LLIndexSearcher> indexSearchers, Drop<ShardedIndexSearchers> drop) {
super(new CloseOnDrop(drop));
this.indexSearchers = indexSearchers;
}
@Override
public LLIndexSearcher shard(int shardIndex) {
if (!isOwned()) {
throw attachTrace(new IllegalStateException("ShardedIndexSearchers must be owned to be used"));
}
if (shardIndex < 0) {
throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid");
}
return indexSearchers.get(shardIndex);
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<ShardedIndexSearchers> prepareSend() {
List<LLIndexSearcher> indexSearchers = this.indexSearchers;
this.makeInaccessible();
return drop -> new ShardedIndexSearchers(indexSearchers, drop);
}
private void makeInaccessible() {
this.indexSearchers = null;
}
private static class CloseOnDrop implements Drop<ShardedIndexSearchers> {
private final Drop<ShardedIndexSearchers> delegate;
public CloseOnDrop(Drop<ShardedIndexSearchers> drop) {
this.delegate = drop;
}
@Override
public void drop(ShardedIndexSearchers obj) {
try {
delegate.drop(obj);
} finally {
obj.makeInaccessible();
}
}
}
}
}

View File

@ -0,0 +1,10 @@
package it.cavallium.dbengine.lucene.searcher;
import reactor.core.publisher.Mono;
public interface LLSearchTransformer {
LLSearchTransformer NO_TRANSFORMATION = queryParamsMono -> queryParamsMono;
Mono<LocalQueryParams> transform(Mono<LocalQueryParams> queryParamsMono);
}

View File

@ -1,7 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import reactor.core.publisher.Mono;
@ -12,7 +12,7 @@ public interface LuceneLocalSearcher {
* @param queryParams the query parameters
* @param keyFieldName the name of the key field
*/
Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexContext>> indexSearcherMono,
LocalQueryParams queryParams,
String keyFieldName);
}

View File

@ -1,12 +1,10 @@
package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
public interface LuceneMultiSearcher extends LuceneLocalSearcher {
@ -14,7 +12,7 @@ public interface LuceneMultiSearcher extends LuceneLocalSearcher {
* @param queryParams the query parameters
* @param keyFieldName the name of the key field
*/
Mono<Send<LuceneSearchResult>> collect(Flux<Send<LLIndexSearcher>> indexSearchersFlux,
Mono<Send<LuceneSearchResult>> collect(Flux<Send<LLIndexContext>> indexSearchersFlux,
LocalQueryParams queryParams,
String keyFieldName);
@ -24,7 +22,7 @@ public interface LuceneMultiSearcher extends LuceneLocalSearcher {
* @param keyFieldName the name of the key field
*/
@Override
default Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
default Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexContext>> indexSearcherMono,
LocalQueryParams queryParams,
String keyFieldName) {
return this.collect(indexSearcherMono.flux(), queryParams, keyFieldName);

View File

@ -0,0 +1,5 @@
package it.cavallium.dbengine.lucene.searcher;
import org.apache.lucene.search.TopDocs;
record PageData(TopDocs topDocs, CurrentPageInfo nextPageInfo) {}

View File

@ -1,37 +0,0 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
import it.cavallium.dbengine.lucene.LuceneUtils;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import reactor.core.publisher.Mono;
public class ScoredLuceneMultiSearcher implements LuceneMultiSearcher {
@Override
public Mono<LuceneMultiSearcher> createShardSearcher(LocalQueryParams queryParams) {
return Mono
.fromCallable(() -> {
Sort luceneSort = queryParams.sort();
if (luceneSort == null) {
luceneSort = Sort.RELEVANCE;
}
PaginationInfo paginationInfo;
if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) {
paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true);
} else {
paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false);
}
CollectorManager<TopFieldCollector, TopDocs> sharedManager = new ScoringShardsCollectorManager(luceneSort,
LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()),
null, LuceneUtils.totalHitsThreshold(), LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()),
LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()));
return new ScoredSimpleLuceneShardSearcher(sharedManager, queryParams.query(), paginationInfo);
});
}
}

View File

@ -1,159 +1,185 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexContexts;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher {
public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher {
private final Object lock = new Object();
private final List<IndexSearcher> indexSearchersArray = new ArrayList<>();
private final List<Mono<Void>> indexSearcherReleasersArray = new ArrayList<>();
private final List<TopFieldCollector> collectors = new ArrayList<>();
private final CollectorManager<TopFieldCollector, TopDocs> firstPageSharedManager;
private final Query luceneQuery;
private final PaginationInfo paginationInfo;
public ScoredSimpleLuceneShardSearcher(CollectorManager<TopFieldCollector, TopDocs> firstPageSharedManager,
Query luceneQuery, PaginationInfo paginationInfo) {
this.firstPageSharedManager = firstPageSharedManager;
this.luceneQuery = luceneQuery;
this.paginationInfo = paginationInfo;
public ScoredSimpleLuceneShardSearcher() {
}
@Override
public Mono<Void> searchOn(IndexSearcher indexSearcher,
Mono<Void> releaseIndexSearcher,
public Mono<Send<LuceneSearchResult>> collect(Flux<Send<LLIndexContext>> indexSearchersFlux,
LocalQueryParams queryParams,
Scheduler scheduler) {
return Mono.<Void>fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called searchOn in a nonblocking thread");
}
TopFieldCollector collector;
synchronized (lock) {
//noinspection BlockingMethodInNonBlockingContext
collector = firstPageSharedManager.newCollector();
indexSearchersArray.add(indexSearcher);
indexSearcherReleasersArray.add(releaseIndexSearcher);
collectors.add(collector);
}
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(luceneQuery, collector);
return null;
}).subscribeOn(scheduler);
String keyFieldName) {
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
PaginationInfo paginationInfo = getPaginationInfo(queryParams);
var indexSearchersMono = indexSearchersFlux.collectList().map(LLIndexContexts::of);
return LLUtils.usingResource(indexSearchersMono, indexSearchers -> this
// Search first page results
.searchFirstPage(indexSearchers, queryParams, paginationInfo)
// Compute the results of the first page
.transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers,
keyFieldName, queryParams))
// Compute other results
.transform(firstResult -> this.computeOtherResults(firstResult, indexSearchers, queryParams, keyFieldName))
// Ensure that one LuceneSearchResult is always returned
.single(),
false);
}
@Override
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName, Scheduler collectorScheduler) {
if (Schedulers.isInNonBlockingThread()) {
return Mono.error(() -> new UnsupportedOperationException("Called collect in a nonblocking thread"));
private Sort getSort(LocalQueryParams queryParams) {
Sort luceneSort = queryParams.sort();
if (luceneSort == null) {
luceneSort = Sort.RELEVANCE;
}
if (!queryParams.isScored()) {
return Mono.error(() -> new UnsupportedOperationException("Can't execute an unscored query"
+ " with a scored lucene shard searcher"));
return luceneSort;
}
/**
* Get the pagination info
*/
private PaginationInfo getPaginationInfo(LocalQueryParams queryParams) {
if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) {
return new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true);
} else {
return new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false);
}
}
/**
* Search effectively the raw results of the first page
*/
private Mono<PageData> searchFirstPage(LLIndexContexts indexSearchers,
LocalQueryParams queryParams,
PaginationInfo paginationInfo) {
var limit = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit());
var pagination = !paginationInfo.forceSinglePage();
var resultsOffset = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset());
return Mono
.fromSupplier(() -> new CurrentPageInfo(null, limit, 0))
.flatMap(s -> this.searchPage(queryParams, indexSearchers, pagination, resultsOffset, s));
}
/**
* Compute the results of the first page, extracting useful data
*/
private Mono<FirstPageResults> computeFirstPageResults(Mono<PageData> firstPageDataMono,
LLIndexContexts indexSearchers,
String keyFieldName,
LocalQueryParams queryParams) {
return firstPageDataMono.map(firstPageData -> {
var totalHitsCount = LuceneUtils.convertTotalHitsCount(firstPageData.topDocs().totalHits);
Flux<LLKeyScore> firstPageHitsFlux = LuceneUtils.convertHits(Flux.fromArray(firstPageData.topDocs().scoreDocs),
indexSearchers, keyFieldName, true)
.take(queryParams.limit(), true);
CurrentPageInfo nextPageInfo = firstPageData.nextPageInfo();
return new FirstPageResults(totalHitsCount, firstPageHitsFlux, nextPageInfo);
});
}
private Mono<Send<LuceneSearchResult>> computeOtherResults(Mono<FirstPageResults> firstResultMono,
LLIndexContexts indexSearchers,
LocalQueryParams queryParams,
String keyFieldName) {
return firstResultMono.map(firstResult -> {
var totalHitsCount = firstResult.totalHitsCount();
var firstPageHitsFlux = firstResult.firstPageHitsFlux();
var secondPageInfo = firstResult.nextPageInfo();
Flux<LLKeyScore> nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo);
Flux<LLKeyScore> combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux);
return new LuceneSearchResult(totalHitsCount, combinedFlux, d -> indexSearchers.close()).send();
});
}
/**
* Search effectively the merged raw results of the next pages
*/
private Flux<LLKeyScore> searchOtherPages(LLIndexContexts indexSearchers,
LocalQueryParams queryParams, String keyFieldName, CurrentPageInfo secondPageInfo) {
return Flux
.defer(() -> {
AtomicReference<CurrentPageInfo> currentPageInfoRef = new AtomicReference<>(secondPageInfo);
return Flux
.defer(() -> searchPage(queryParams, indexSearchers, true, 0, currentPageInfoRef.get()))
.doOnNext(s -> currentPageInfoRef.set(s.nextPageInfo()))
.repeatWhen(s -> s.takeWhile(n -> n > 0));
})
.subscribeOn(Schedulers.boundedElastic())
.map(PageData::topDocs)
.flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs))
.transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers,
keyFieldName, true));
}
/**
*
* @param resultsOffset offset of the resulting topDocs. Useful if you want to
* skip the first n results in the first page
*/
private Mono<PageData> searchPage(LocalQueryParams queryParams,
LLIndexContexts indexSearchers,
boolean allowPagination,
int resultsOffset,
CurrentPageInfo s) {
return Mono
.fromCallable(() -> {
TopDocs result;
Mono<Void> release;
synchronized (lock) {
//noinspection BlockingMethodInNonBlockingContext
result = firstPageSharedManager.reduce(collectors);
release = Mono.when(indexSearcherReleasersArray);
LLUtils.ensureBlocking();
if (resultsOffset < 0) {
throw new IndexOutOfBoundsException(resultsOffset);
}
IndexSearchers indexSearchers;
synchronized (lock) {
indexSearchers = IndexSearchers.of(indexSearchersArray);
if ((s.pageIndex() == 0 || s.last() != null) && s.remainingLimit() > 0) {
var sort = getSort(queryParams);
var limit = s.currentPageLimit();
var totalHitsThreshold = LuceneUtils.totalHitsThreshold();
return new ScoringShardsCollectorManager(sort, limit, null,
totalHitsThreshold, resultsOffset, s.currentPageLimit());
} else {
return null;
}
Flux<LLKeyScore> firstPageHits = LuceneUtils
.convertHits(Flux.fromArray(result.scoreDocs), indexSearchers, keyFieldName, collectorScheduler, true);
Flux<LLKeyScore> nextHits;
nextHits = Flux
.<TopDocs, CurrentPageInfo>generate(
() -> new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs),
paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, emitter) -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
if (s.last() != null && s.remainingLimit() > 0) {
Sort luceneSort = queryParams.sort();
if (luceneSort == null) {
luceneSort = Sort.RELEVANCE;
}
CollectorManager<TopFieldCollector, TopDocs> sharedManager
= new ScoringShardsCollectorManager(luceneSort, s.currentPageLimit(),
(FieldDoc) s.last(), LuceneUtils.totalHitsThreshold(), 0, s.currentPageLimit());
try {
var collectors = new ObjectArrayList<TopFieldCollector>(indexSearchersArray.size());
for (IndexSearcher indexSearcher : indexSearchersArray) {
//noinspection BlockingMethodInNonBlockingContext
TopFieldCollector collector = sharedManager.newCollector();
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(luceneQuery, collector);
collectors.add(collector);
}
//noinspection BlockingMethodInNonBlockingContext
var pageTopDocs = sharedManager.reduce(collectors);
var pageLastDoc = LuceneUtils.getLastFieldDoc(pageTopDocs.scoreDocs);
emitter.next(pageTopDocs);
s = new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(),
s.pageIndex() + 1);
} catch (IOException ex) {
emitter.error(ex);
s = EMPTY_STATUS;
}
} else {
emitter.complete();
s = EMPTY_STATUS;
}
return s;
})
.subscribeOn(collectorScheduler)
.transform(flux -> {
if (paginationInfo.forceSinglePage()
|| paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) {
return Flux.empty();
} else {
return flux;
}
})
.flatMapIterable(topFieldDoc -> Arrays.asList(topFieldDoc.scoreDocs))
.transform(scoreDocs -> LuceneUtils.convertHits(scoreDocs,
indexSearchers, keyFieldName, collectorScheduler, true));
return new LuceneSearchResult(LuceneUtils.convertTotalHitsCount(result.totalHits),
firstPageHits
.concatWith(nextHits),
//.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)),
release
);
})
.subscribeOn(collectorScheduler);
.flatMap(sharedManager -> Flux
.fromIterable(indexSearchers.shards())
.flatMap(shard -> Mono.fromCallable(() -> {
var collector = sharedManager.newCollector();
shard.getIndexSearcher().search(queryParams.query(), collector);
return collector;
}))
.collectList()
.flatMap(collectors -> Mono.fromCallable(() -> {
var pageTopDocs = sharedManager.reduce(collectors);
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
long nextRemainingLimit;
if (allowPagination) {
nextRemainingLimit = s.remainingLimit() - s.currentPageLimit();
} else {
nextRemainingLimit = 0L;
}
var nextPageIndex = s.pageIndex() + 1;
var nextPageInfo = new CurrentPageInfo(pageLastDoc, nextRemainingLimit, nextPageIndex);
return new PageData(pageTopDocs, nextPageInfo);
}))
);
}
}

View File

@ -5,12 +5,12 @@ import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LI
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexContexts;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.IndexSearchers.UnshardedIndexSearchers;
import it.cavallium.dbengine.database.disk.LLIndexContexts.UnshardedIndexSearchers;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
@ -25,14 +25,14 @@ import reactor.core.scheduler.Schedulers;
public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
@Override
public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexContext>> indexSearcherMono,
LocalQueryParams queryParams,
String keyFieldName) {
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
PaginationInfo paginationInfo = getPaginationInfo(queryParams);
var indexSearchersMono = indexSearcherMono.map(IndexSearchers::unsharded);
var indexSearchersMono = indexSearcherMono.map(LLIndexContexts::unsharded);
return LLUtils.usingResource(indexSearchersMono, indexSearchers -> this
// Search first page results
@ -72,32 +72,11 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
.handle((s, sink) -> this.searchPageSync(queryParams, indexSearchers, pagination, resultsOffset, s, sink));
}
/**
* Search effectively the merged raw results of the next pages
*/
private Flux<LLKeyScore> searchOtherPages(UnshardedIndexSearchers indexSearchers,
LocalQueryParams queryParams, String keyFieldName, CurrentPageInfo secondPageInfo) {
return Flux
.<PageData, CurrentPageInfo>generate(
() -> secondPageInfo,
(s, sink) -> searchPageSync(queryParams, indexSearchers, true, 0, s, sink),
s -> {}
)
.subscribeOn(Schedulers.boundedElastic())
.map(PageData::topDocs)
.flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs))
.transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers,
keyFieldName, true));
}
private static record FirstPageResults(TotalHitsCount totalHitsCount, Flux<LLKeyScore> firstPageHitsFlux,
CurrentPageInfo nextPageInfo) {}
/**
* Compute the results of the first page, extracting useful data
*/
private Mono<FirstPageResults> computeFirstPageResults(Mono<PageData> firstPageDataMono,
IndexSearchers indexSearchers,
LLIndexContexts indexSearchers,
String keyFieldName,
LocalQueryParams queryParams) {
return firstPageDataMono.map(firstPageData -> {
@ -129,7 +108,23 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
});
}
private static record PageData(TopDocs topDocs, CurrentPageInfo nextPageInfo) {}
/**
* Search effectively the merged raw results of the next pages
*/
private Flux<LLKeyScore> searchOtherPages(UnshardedIndexSearchers indexSearchers,
LocalQueryParams queryParams, String keyFieldName, CurrentPageInfo secondPageInfo) {
return Flux
.<PageData, CurrentPageInfo>generate(
() -> secondPageInfo,
(s, sink) -> searchPageSync(queryParams, indexSearchers, true, 0, s, sink),
s -> {}
)
.subscribeOn(Schedulers.boundedElastic())
.map(PageData::topDocs)
.flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs))
.transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers,
keyFieldName, true));
}
/**
*

View File

@ -6,6 +6,7 @@ import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import java.util.ArrayList;
import java.util.Comparator;
@ -22,7 +23,7 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea
}
@Override
public Mono<Send<LuceneSearchResult>> collect(Flux<Send<LLIndexSearcher>> indexSearchersFlux,
public Mono<Send<LuceneSearchResult>> collect(Flux<Send<LLIndexContext>> indexSearchersFlux,
LocalQueryParams queryParams,
String keyFieldName) {
return Mono
@ -38,7 +39,7 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea
}
})
.thenMany(indexSearchersFlux)
.flatMap(resSend -> localSearcher.collect(Mono.just(resSend), queryParams, keyFieldName))
.flatMap(resSend -> localSearcher.collect(Mono.just(resSend).share(), queryParams, keyFieldName))
.collectList()
.map(results -> {
List<LuceneSearchResult> resultsToDrop = new ArrayList<>(results.size());

View File

@ -1,34 +0,0 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
import it.cavallium.dbengine.lucene.LuceneUtils;
import reactor.core.publisher.Mono;
public class UnscoredPagedLuceneMultiSearcher implements LuceneMultiSearcher {
@Override
public Mono<LuceneMultiSearcher> createShardSearcher(LocalQueryParams queryParams) {
return Mono
.fromCallable(() -> {
if (queryParams.isScored()) {
throw new UnsupportedOperationException("Can't use the unscored searcher to do a scored or sorted query");
}
PaginationInfo paginationInfo;
if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) {
paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true);
} else {
paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false);
}
UnscoredTopDocsCollectorManager unsortedCollectorManager = new UnscoredTopDocsCollectorManager(() -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(),
LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()),
null,
LuceneUtils.totalHitsThreshold(),
!paginationInfo.forceSinglePage(),
queryParams.isScored()
), queryParams.offset(), queryParams.limit(), queryParams.sort());
return new UnscoredPagedLuceneShardSearcher(unsortedCollectorManager, queryParams.query(), paginationInfo);
});
}
}

View File

@ -1,151 +0,0 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
class UnscoredPagedLuceneShardSearcher implements LuceneMultiSearcher {
private final Object lock = new Object();
private final List<IndexSearcher> indexSearchersArray = new ArrayList<>();
private final List<Mono<Void>> indexSearcherReleasersArray = new ArrayList<>();
private final List<TopDocsCollector<ScoreDoc>> collectors = new ArrayList<>();
private final CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> firstPageUnsortedCollectorManager;
private final Query luceneQuery;
private final PaginationInfo paginationInfo;
public UnscoredPagedLuceneShardSearcher(
CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> firstPagensortedCollectorManager,
Query luceneQuery,
PaginationInfo paginationInfo) {
this.firstPageUnsortedCollectorManager = firstPagensortedCollectorManager;
this.luceneQuery = luceneQuery;
this.paginationInfo = paginationInfo;
}
@Override
public Mono<Void> searchOn(IndexSearcher indexSearcher,
Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams,
Scheduler scheduler) {
return Mono.<Void>fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called searchOn in a nonblocking thread");
}
TopDocsCollector<ScoreDoc> collector;
synchronized (lock) {
//noinspection BlockingMethodInNonBlockingContext
collector = firstPageUnsortedCollectorManager.newCollector();
indexSearchersArray.add(indexSearcher);
indexSearcherReleasersArray.add(releaseIndexSearcher);
collectors.add(collector);
}
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(luceneQuery, collector);
return null;
}).subscribeOn(scheduler);
}
@Override
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) {
return Mono
.fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
TopDocs result;
Mono<Void> release;
synchronized (lock) {
//noinspection BlockingMethodInNonBlockingContext
result = firstPageUnsortedCollectorManager.reduce(collectors);
release = Mono.when(indexSearcherReleasersArray);
}
IndexSearchers indexSearchers;
synchronized (lock) {
indexSearchers = IndexSearchers.of(indexSearchersArray);
}
Flux<LLKeyScore> firstPageHits = LuceneUtils
.convertHits(Flux.fromArray(result.scoreDocs), indexSearchers, keyFieldName, scheduler, false);
Flux<LLKeyScore> nextHits = Flux
.<TopDocs, CurrentPageInfo>generate(
() -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(result.scoreDocs),
paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, sink) -> {
if (s.last() != null && s.remainingLimit() > 0 && s.currentPageLimit() > 0) {
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
Query luceneQuery = queryParams.query();
int perShardCollectorLimit = s.currentPageLimit() / indexSearchersArray.size();
UnscoredTopDocsCollectorManager currentPageUnsortedCollectorManager
= new UnscoredTopDocsCollectorManager(
() -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), perShardCollectorLimit,
s.last(), LuceneUtils.totalHitsThreshold(), true, queryParams.isScored()),
0, s.currentPageLimit(), queryParams.sort());
try {
var collectors = new ObjectArrayList<TopDocsCollector<ScoreDoc>>(indexSearchersArray.size());
for (IndexSearcher indexSearcher : indexSearchersArray) {
//noinspection BlockingMethodInNonBlockingContext
var collector = currentPageUnsortedCollectorManager.newCollector();
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(luceneQuery, collector);
collectors.add(collector);
}
//noinspection BlockingMethodInNonBlockingContext
TopDocs pageTopDocs = currentPageUnsortedCollectorManager.reduce(collectors);
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
sink.next(pageTopDocs);
return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(),
s.pageIndex() + 1);
} catch (IOException ex) {
sink.error(ex);
return EMPTY_STATUS;
}
} else {
sink.complete();
return EMPTY_STATUS;
}
}
)
.subscribeOn(scheduler)
.flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs))
.transform(scoreDocsFlux -> LuceneUtils.convertHits(scoreDocsFlux,
indexSearchers, keyFieldName, scheduler, false))
.transform(flux -> {
if (paginationInfo.forceSinglePage()
|| paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) {
return Flux.empty();
} else {
return flux;
}
});
return new LuceneSearchResult(LuceneUtils.convertTotalHitsCount(result.totalHits), firstPageHits
.concatWith(nextHits),
//.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)),
release
);
})
.subscribeOn(scheduler);
}
}

View File

@ -1,70 +0,0 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.ALLOW_UNSCORED_PAGINATION_MODE;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.Collection;
import java.util.function.Supplier;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldDocs;
import org.jetbrains.annotations.Nullable;
import reactor.core.scheduler.Schedulers;
public class UnscoredTopDocsCollectorManager implements
CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> {
private final Supplier<TopDocsCollector<ScoreDoc>> collectorSupplier;
private final long offset;
private final long limit;
private final Sort sort;
public UnscoredTopDocsCollectorManager(Supplier<TopDocsCollector<ScoreDoc>> collectorSupplier,
long offset,
long limit,
@Nullable Sort sort) {
this.collectorSupplier = collectorSupplier;
this.offset = offset;
this.limit = limit;
this.sort = sort;
}
@Override
public TopDocsCollector<ScoreDoc> newCollector() throws IOException {
return collectorSupplier.get();
}
@Override
public TopDocs reduce(Collection<TopDocsCollector<ScoreDoc>> collection) throws IOException {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called reduce in a nonblocking thread");
}
int i = 0;
TopDocs[] topDocsArray;
if (sort != null) {
topDocsArray = new TopFieldDocs[collection.size()];
} else {
topDocsArray = new TopDocs[collection.size()];
}
for (TopDocsCollector<? extends ScoreDoc> topDocsCollector : collection) {
var topDocs = topDocsCollector.topDocs();
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
scoreDoc.shardIndex = i;
}
topDocsArray[i] = topDocs;
i++;
}
return LuceneUtils.mergeTopDocs(sort,
LuceneUtils.safeLongToInt(offset),
LuceneUtils.safeLongToInt(limit),
topDocsArray,
TIE_BREAKER
);
}
}

View File

@ -1,179 +0,0 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.SimpleCollector;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMultiSearcher {
private static final Scheduler UNSCORED_UNSORTED_EXECUTOR = Schedulers.newBoundedElastic(Runtime
.getRuntime()
.availableProcessors(), Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "UnscoredUnsortedExecutor");
@Override
public Mono<LuceneMultiSearcher> createShardSearcher(LocalQueryParams queryParams) {
return Mono
.fromCallable(() -> {
AtomicBoolean alreadySubscribed = new AtomicBoolean(false);
Many<ScoreDoc> scoreDocsSink = Sinks.many().unicast().onBackpressureBuffer();
// 1 is the collect phase
AtomicInteger remainingCollectors = new AtomicInteger(1);
if (queryParams.isScored()) {
throw new UnsupportedOperationException("Can't use the unscored searcher to do a scored or sorted query");
}
var cm = new CollectorManager<Collector, Void>() {
class IterableCollector extends SimpleCollector {
private int shardIndex;
private LeafReaderContext context;
@Override
public void collect(int i) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
var scoreDoc = new ScoreDoc(context.docBase + i, 0, shardIndex);
synchronized (scoreDocsSink) {
while (scoreDocsSink.tryEmitNext(scoreDoc) == EmitResult.FAIL_OVERFLOW) {
LockSupport.parkNanos(10);
}
}
}
@Override
protected void doSetNextReader(LeafReaderContext context) {
this.context = context;
}
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
public void setShardIndex(int shardIndex) {
this.shardIndex = shardIndex;
}
}
@Override
public IterableCollector newCollector() {
return new IterableCollector();
}
@Override
public Void reduce(Collection<Collector> collection) {
throw new UnsupportedOperationException();
}
};
return new LuceneMultiSearcher() {
private final Object lock = new Object();
private final List<IndexSearcher> indexSearchersArray = new ArrayList<>();
private final List<Mono<Void>> indexSearcherReleasersArray = new ArrayList<>();
@Override
public Mono<Void> searchOn(IndexSearcher indexSearcher,
Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams,
Scheduler scheduler) {
return Mono
.<Void>fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called searchOn in a nonblocking thread");
}
//noinspection BlockingMethodInNonBlockingContext
var collector = cm.newCollector();
int collectorShardIndex;
synchronized (lock) {
collectorShardIndex = indexSearchersArray.size();
indexSearchersArray.add(indexSearcher);
indexSearcherReleasersArray.add(releaseIndexSearcher);
}
collector.setShardIndex(collectorShardIndex);
remainingCollectors.incrementAndGet();
UNSCORED_UNSORTED_EXECUTOR.schedule(() -> {
try {
indexSearcher.search(queryParams.query(), collector);
synchronized (scoreDocsSink) {
decrementRemainingCollectors(scoreDocsSink, remainingCollectors);
}
} catch (IOException e) {
scoreDocsSink.tryEmitError(e);
}
});
return null;
})
.subscribeOn(scheduler);
}
@Override
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams,
String keyFieldName,
Scheduler scheduler) {
return Mono
.fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
synchronized (scoreDocsSink) {
decrementRemainingCollectors(scoreDocsSink, remainingCollectors);
}
if (!alreadySubscribed.compareAndSet(false, true)) {
throw new UnsupportedOperationException("Already subscribed!");
}
IndexSearchers indexSearchers;
Mono<Void> release;
synchronized (lock) {
indexSearchers = IndexSearchers.of(indexSearchersArray);
release = Mono.when(indexSearcherReleasersArray);
}
AtomicBoolean resultsAlreadySubscribed = new AtomicBoolean(false);
var scoreDocsFlux = Mono.<Void>fromCallable(() -> {
if (!resultsAlreadySubscribed.compareAndSet(false, true)) {
throw new UnsupportedOperationException("Already subscribed!");
}
return null;
}).thenMany(scoreDocsSink.asFlux());
var resultsFlux = LuceneUtils
.convertHits(scoreDocsFlux, indexSearchers, keyFieldName, scheduler, false);
return new LuceneSearchResult(TotalHitsCount.of(0, false), resultsFlux, release);
})
.subscribeOn(scheduler);
}
};
});
}
private static void decrementRemainingCollectors(Many<ScoreDoc> scoreDocsSink, AtomicInteger remainingCollectors) {
if (remainingCollectors.decrementAndGet() <= 0) {
scoreDocsSink.tryEmitComplete();
}
}
}