Rename query rewrite class

This commit is contained in:
Andrea Cavalli 2022-01-28 21:12:10 +01:00
parent 58943b5e08
commit dfe8361e19
18 changed files with 218 additions and 195 deletions

View File

@ -4,7 +4,7 @@ import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterrupti
import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE;
import static it.cavallium.dbengine.database.LLUtils.toDocument;
import static it.cavallium.dbengine.database.LLUtils.toFields;
import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION;
import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE;
import com.google.common.collect.Multimap;
import io.micrometer.core.instrument.Counter;
@ -457,7 +457,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
var searcher = searcherManager.retrieveSearcher(snapshot);
return localSearcher
.collect(searcher, localQueryParams, keyFieldName, NO_TRANSFORMATION)
.collect(searcher, localQueryParams, keyFieldName, NO_REWRITE)
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close));
}

View File

@ -25,7 +25,7 @@ import it.cavallium.dbengine.lucene.mlt.MoreLikeThisTransformer;
import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
import java.io.Closeable;
@ -251,7 +251,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
// Collect all the shards results into a single global result
return multiSearcher
.collectMulti(searchers, localQueryParams, keyFieldName, LLSearchTransformer.NO_TRANSFORMATION)
.collectMulti(searchers, localQueryParams, keyFieldName, GlobalQueryRewrite.NO_REWRITE)
// Transform the result type
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close));
}

View File

@ -1,19 +1,15 @@
package it.cavallium.dbengine.lucene.mlt;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import com.google.common.collect.Multimap;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import java.io.IOException;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper;
import org.apache.lucene.search.similarities.Similarity;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class MoreLikeThisTransformer implements LLSearchTransformer {
public class MoreLikeThisTransformer implements GlobalQueryRewrite {
private final Multimap<String, String> mltDocumentFields;
private final PerFieldAnalyzerWrapper luceneAnalyzer;
@ -28,28 +24,21 @@ public class MoreLikeThisTransformer implements LLSearchTransformer {
}
@Override
public Mono<LocalQueryParams> transform(Mono<TransformerInput> inputMono) {
return inputMono.publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())).handle((input, sink) -> {
try {
var rewrittenQuery = LuceneUtils.getMoreLikeThisQuery(input.indexSearchers(),
input.queryParams(),
luceneAnalyzer,
luceneSimilarity,
mltDocumentFields
);
var queryParams = input.queryParams();
sink.next(new LocalQueryParams(rewrittenQuery,
queryParams.offsetLong(),
queryParams.limitLong(),
queryParams.pageLimits(),
queryParams.minCompetitiveScore(),
queryParams.sort(),
queryParams.computePreciseHitsCount(),
queryParams.timeout()
));
} catch (IOException ex) {
sink.error(ex);
}
});
public LocalQueryParams rewrite(LLIndexSearchers indexSearchers, LocalQueryParams queryParams) throws IOException {
var rewrittenQuery = LuceneUtils.getMoreLikeThisQuery(indexSearchers,
queryParams,
luceneAnalyzer,
luceneSimilarity,
mltDocumentFields
);
return new LocalQueryParams(rewrittenQuery,
queryParams.offsetLong(),
queryParams.limitLong(),
queryParams.pageLimits(),
queryParams.minCompetitiveScore(),
queryParams.sort(),
queryParams.computePreciseHitsCount(),
queryParams.timeout()
);
}
}

View File

@ -1,14 +1,18 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.io.IOException;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class AdaptiveLocalSearcher implements LocalSearcher {
@ -41,19 +45,24 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
public Mono<LuceneSearchResult> collect(Mono<Send<LLIndexSearcher>> indexSearcher,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
Mono<Send<LLIndexSearchers>> indexSearchersMono = indexSearcher
.map(LLIndexSearchers::unsharded)
.map(ResourceSupport::send);
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
if (transformer == NO_REWRITE) {
return transformedCollect(indexSearcher, queryParams, keyFieldName, transformer);
} else {
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer
.transform(Mono.fromCallable(() -> new TransformerInput(indexSearchers, queryParams)))
.flatMap(queryParams2 -> this
.transformedCollect(indexSearcher, queryParams2, keyFieldName, LLSearchTransformer.NO_TRANSFORMATION)),
true);
return indexSearchersMono
.publishOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.<LocalQueryParams>handle((indexSearchers, sink) -> {
try {
sink.next(transformer.rewrite(indexSearchers.receive(), queryParams));
} catch (IOException ex) {
sink.error(ex);
}
})
.flatMap(queryParams2 -> transformedCollect(indexSearcher, queryParams2, keyFieldName, NO_REWRITE));
}
}
@ -66,7 +75,7 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
public Mono<LuceneSearchResult> transformedCollect(Mono<Send<LLIndexSearcher>> indexSearcher,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
// offset + limit
long realLimit = queryParams.offsetLong() + queryParams.limitLong();
long maxAllowedInMemoryLimit

View File

@ -1,12 +1,16 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.io.IOException;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class AdaptiveMultiSearcher implements MultiSearcher {
@ -39,15 +43,20 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
GlobalQueryRewrite transformer) {
if (transformer == NO_REWRITE) {
return transformedCollectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
} else {
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer
.transform(Mono.fromCallable(() -> new TransformerInput(indexSearchers, queryParams)))
.flatMap(queryParams2 -> this
.transformedCollectMulti(indexSearchersMono, queryParams2, keyFieldName, LLSearchTransformer.NO_TRANSFORMATION)),
true);
return indexSearchersMono
.publishOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.<LocalQueryParams>handle((indexSearchers, sink) -> {
try {
sink.next(transformer.rewrite(indexSearchers.receive(), queryParams));
} catch (IOException ex) {
sink.error(ex);
}
})
.flatMap(queryParams2 -> transformedCollectMulti(indexSearchersMono, queryParams2, keyFieldName, NO_REWRITE));
}
}
@ -55,7 +64,7 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
public Mono<LuceneSearchResult> transformedCollectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
// offset + limit
long realLimit = queryParams.offsetLong() + queryParams.limitLong();
long maxAllowedInMemoryLimit

View File

@ -2,21 +2,15 @@ package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.TotalHitCountCollectorManager;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.time.Duration;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TotalHitCountCollector;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -28,68 +22,63 @@ public class CountMultiSearcher implements MultiSearcher {
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = indexSearchersMono
.publishOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.handle((indexSearchers, sink) -> {
try {
sink.next(transformer.rewrite(indexSearchers.receive(), queryParams));
} catch (IOException ex) {
sink.error(ex);
}
});
}
return LLUtils.usingSendResource(indexSearchersMono,
indexSearchers -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = transformer.transform(Mono
.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams)));
return queryParamsMono.flatMap(queryParams2 -> LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> {
var localQueryParams = getLocalQueryParams(queryParams2);
return Mono.fromRunnable(() -> {
LLUtils.ensureBlocking();
if (queryParams2.isSorted() && queryParams2.limitLong() > 0) {
throw new UnsupportedOperationException(
"Sorted queries are not supported" + " by SimpleUnsortedUnscoredLuceneMultiSearcher");
}
if (queryParams2.needsScores() && queryParams2.limitLong() > 0) {
throw new UnsupportedOperationException(
"Scored queries are not supported" + " by SimpleUnsortedUnscoredLuceneMultiSearcher");
}
}).thenMany(Flux.fromIterable(indexSearchers.shards())).flatMap(searcher -> {
var llSearcher = Mono.fromCallable(() -> new LLIndexSearcher(searcher, false, null).send());
return this.collect(llSearcher, localQueryParams, keyFieldName, transformer);
}).collectList().map(results -> {
List<LuceneSearchResult> resultsToDrop = new ArrayList<>(results.size());
List<Flux<LLKeyScore>> resultsFluxes = new ArrayList<>(results.size());
boolean exactTotalHitsCount = true;
long totalHitsCountValue = 0;
for (LuceneSearchResult result : results) {
resultsToDrop.add(result);
resultsFluxes.add(result.results());
exactTotalHitsCount &= result.totalHitsCount().exact();
totalHitsCountValue += result.totalHitsCount().value();
}
var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount);
Flux<LLKeyScore> mergedFluxes = Flux
.merge(resultsFluxes)
.skip(queryParams2.offsetLong())
.take(queryParams2.limitLong(), true);
return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> {
for (LuceneSearchResult luceneSearchResult : resultsToDrop) {
luceneSearchResult.close();
}
return queryParamsMono.flatMap(queryParams2 -> {
var localQueryParams = getLocalQueryParams(queryParams2);
return Mono
.fromRunnable(() -> {
LLUtils.ensureBlocking();
if (queryParams2.isSorted() && queryParams2.limitLong() > 0) {
throw new UnsupportedOperationException("Sorted queries are not supported"
+ " by SimpleUnsortedUnscoredLuceneMultiSearcher");
}
if (queryParams2.needsScores() && queryParams2.limitLong() > 0) {
throw new UnsupportedOperationException("Scored queries are not supported"
+ " by SimpleUnsortedUnscoredLuceneMultiSearcher");
}
})
.thenMany(Flux.fromIterable(indexSearchers.shards()))
.flatMap(searcher -> {
var llSearcher = Mono.fromCallable(() -> new LLIndexSearcher(searcher, false, null).send());
return this.collect(llSearcher, localQueryParams, keyFieldName, transformer);
})
.collectList()
.map(results -> {
List<LuceneSearchResult> resultsToDrop = new ArrayList<>(results.size());
List<Flux<LLKeyScore>> resultsFluxes = new ArrayList<>(results.size());
boolean exactTotalHitsCount = true;
long totalHitsCountValue = 0;
for (LuceneSearchResult result : results) {
resultsToDrop.add(result);
resultsFluxes.add(result.results());
exactTotalHitsCount &= result.totalHitsCount().exact();
totalHitsCountValue += result.totalHitsCount().value();
}
var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount);
Flux<LLKeyScore> mergedFluxes = Flux
.merge(resultsFluxes)
.skip(queryParams2.offsetLong())
.take(queryParams2.limitLong(), true);
return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> {
for (LuceneSearchResult luceneSearchResult : resultsToDrop) {
luceneSearchResult.close();
}
indexSearchers.close();
});
});
}
);
},
false
);
indexSearchers.close();
});
});
}, false));
}
private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) {
@ -108,17 +97,18 @@ public class CountMultiSearcher implements MultiSearcher {
public Mono<LuceneSearchResult> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
return Mono
.usingWhen(
indexSearcherMono,
indexSearcher -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = transformer.transform(Mono
.fromSupplier(() -> new TransformerInput(LLIndexSearchers.unsharded(indexSearcher), queryParams)));
queryParamsMono = Mono
.fromCallable(() -> transformer.rewrite(LLIndexSearchers.unsharded(indexSearcher), queryParams))
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()));
}
return queryParamsMono

View File

@ -0,0 +1,12 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import java.io.IOException;
import reactor.core.publisher.Mono;
public interface GlobalQueryRewrite {
GlobalQueryRewrite NO_REWRITE = (indexSearchers, queryParamsMono) -> queryParamsMono;
LocalQueryParams rewrite(LLIndexSearchers indexSearchers, LocalQueryParams localQueryParams) throws IOException;
}

View File

@ -1,18 +0,0 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import java.util.List;
import org.apache.lucene.index.IndexReader;
import reactor.core.publisher.Mono;
public interface LLSearchTransformer {
LLSearchTransformer NO_TRANSFORMATION = queryParamsMono -> queryParamsMono
.map(TransformerInput::queryParams);
record TransformerInput(LLIndexSearchers indexSearchers,
LocalQueryParams queryParams) {}
Mono<LocalQueryParams> transform(Mono<TransformerInput> inputMono);
}

View File

@ -15,7 +15,7 @@ public interface LocalSearcher {
Mono<LuceneSearchResult> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer);
GlobalQueryRewrite transformer);
/**
* Get the name of this searcher type

View File

@ -16,7 +16,7 @@ public interface MultiSearcher extends LocalSearcher {
Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer);
GlobalQueryRewrite transformer);
/**
* @param indexSearcherMono Lucene index searcher
@ -28,7 +28,7 @@ public interface MultiSearcher extends LocalSearcher {
default Mono<LuceneSearchResult> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
var searchers = indexSearcherMono.map(a -> LLIndexSearchers.unsharded(a).send());
return this.collectMulti(searchers, queryParams, keyFieldName, transformer);
}

View File

@ -1,22 +1,23 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.io.IOException;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopScoreDocCollector;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class OfficialSearcher implements MultiSearcher {
@ -29,13 +30,20 @@ public class OfficialSearcher implements MultiSearcher {
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono
.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true);
queryParamsMono = indexSearchersMono
.publishOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.handle((indexSearchers, sink) -> {
try {
sink.next(transformer.rewrite(indexSearchers.receive(), queryParams));
} catch (IOException ex) {
sink.error(ex);
}
});
}
return queryParamsMono.flatMap(queryParams2 -> LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this

View File

@ -6,14 +6,12 @@ import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SE
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.UninterruptibleScheduler;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.TopDocsCollectorMultiManager;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@ -33,18 +31,19 @@ public class PagedLocalSearcher implements LocalSearcher {
public Mono<LuceneSearchResult> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
PaginationInfo paginationInfo = getPaginationInfo(queryParams);
var indexSearchersMono = indexSearcherMono.map(LLIndexSearchers::unsharded).map(ResourceSupport::send);
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = transformer.transform(Mono
.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams)));
queryParamsMono = Mono
.fromCallable(() -> transformer.rewrite(indexSearchers, queryParams))
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()));
}
return queryParamsMono.flatMap(queryParams2 -> this

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
import io.net5.buffer.api.Send;
@ -10,7 +11,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.PageLimits;
import it.cavallium.dbengine.lucene.collector.ScoringShardsCollectorMultiManager;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@ -18,7 +19,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
@ -36,13 +36,20 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono
.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true);
queryParamsMono = indexSearchersMono
.publishOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.handle((indexSearchers, sink) -> {
try {
sink.next(transformer.rewrite(indexSearchers.receive(), queryParams));
} catch (IOException ex) {
sink.error(ex);
}
});
}
return queryParamsMono.flatMap(queryParams2 -> {

View File

@ -1,23 +1,23 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLScoreDoc;
import it.cavallium.dbengine.lucene.collector.FullDocsCollector;
import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.util.List;
import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.IndexSearcher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class SortedByScoreFullMultiSearcher implements MultiSearcher {
@ -33,13 +33,20 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher {
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono
.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true);
queryParamsMono = indexSearchersMono
.publishOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.handle((indexSearchers, sink) -> {
try {
sink.next(transformer.rewrite(indexSearchers.receive(), queryParams));
} catch (IOException ex) {
sink.error(ex);
}
});
}
return queryParamsMono.flatMap(queryParams2 -> {

View File

@ -1,24 +1,23 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLFieldDoc;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.FullDocsCollector;
import it.cavallium.dbengine.lucene.collector.LMDBFullFieldDocCollector;
import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.util.List;
import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.IndexSearcher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class SortedScoredFullMultiSearcher implements MultiSearcher {
@ -34,13 +33,20 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher {
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono
.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true);
queryParamsMono = indexSearchersMono
.publishOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.handle((indexSearchers, sink) -> {
try {
sink.next(transformer.rewrite(indexSearchers.receive(), queryParams));
} catch (IOException ex) {
sink.error(ex);
}
});
}
return queryParamsMono.flatMap(queryParams2 -> LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this

View File

@ -1,6 +1,6 @@
package it.cavallium.dbengine.lucene.searcher;
import static java.util.Objects.requireNonNull;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
@ -8,12 +8,12 @@ import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.util.List;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class UnsortedStreamingMultiSearcher implements MultiSearcher {
@ -21,15 +21,16 @@ public class UnsortedStreamingMultiSearcher implements MultiSearcher {
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
if (transformer == GlobalQueryRewrite.NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = transformer.transform(Mono
.fromCallable(() -> new TransformerInput(indexSearchers, queryParams)));
queryParamsMono = Mono
.fromCallable(() -> transformer.rewrite(indexSearchers, queryParams))
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()));
}
return queryParamsMono.map(queryParams2 -> {

View File

@ -6,7 +6,7 @@ import static java.util.Objects.requireNonNullElseGet;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
@ -29,7 +29,7 @@ public class SwappableLuceneSearcher implements LocalSearcher, MultiSearcher, Cl
public Mono<LuceneSearchResult> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
var single = requireNonNullElseGet(this.single.get(), this.multi::get);
requireNonNull(single, "LuceneLocalSearcher not set");
return single.collect(indexSearcherMono, queryParams, keyFieldName, transformer);
@ -54,7 +54,7 @@ public class SwappableLuceneSearcher implements LocalSearcher, MultiSearcher, Cl
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
var multi = requireNonNull(this.multi.get(), "LuceneMultiSearcher not set");
return multi.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
}

View File

@ -1,13 +1,15 @@
package it.cavallium.dbengine;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult;
@ -16,6 +18,7 @@ import java.util.ArrayList;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher {
@ -29,16 +32,17 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher {
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
GlobalQueryRewrite transformer) {
return LLUtils.usingSendResource(indexSearchersMono,
indexSearchers -> {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
if (transformer == NO_REWRITE) {
queryParamsMono = Mono.just(queryParams);
} else {
var transformerInput = Mono.just(new TransformerInput(indexSearchers, queryParams));
queryParamsMono = transformer.transform(transformerInput);
queryParamsMono = Mono
.fromCallable(() -> transformer.rewrite(indexSearchers, queryParams))
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()));
}
return queryParamsMono.flatMap(queryParams2 -> {