Prefer standard schedulers

This commit is contained in:
Andrea Cavalli 2021-04-03 02:20:37 +02:00
parent a2bcc07825
commit 3f508352fc

View File

@ -69,9 +69,11 @@ import reactor.util.function.Tuples;
public class LLLocalLuceneIndex implements LLLuceneIndex {
private static final boolean USE_STANDARD_SCHEDULERS = true;
protected static final Logger logger = LoggerFactory.getLogger(LLLocalLuceneIndex.class);
private static final LuceneStreamSearcher streamSearcher = new AdaptiveStreamSearcher();
private static final AllowOnlyQueryParsingCollectorStreamSearcher allowOnlyQueryParsingCollectorStreamSearcher = new AllowOnlyQueryParsingCollectorStreamSearcher();
private static final AllowOnlyQueryParsingCollectorStreamSearcher allowOnlyQueryParsingCollectorStreamSearcher
= new AllowOnlyQueryParsingCollectorStreamSearcher();
/**
* Global lucene index scheduler.
* There is only a single thread globally to not overwhelm the disk with
@ -84,16 +86,21 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
true
);
private final Scheduler luceneBlockingScheduler;
private static final Function<String, Scheduler> boundedSchedulerSupplier = name -> Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
private static final Function<String, Scheduler> boundedSchedulerSupplier = name ->
Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(),
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"lucene-" + name,
60
);
private final Supplier<Scheduler> lowMemorySchedulerSupplier = Suppliers.memoize(() ->
Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "lucene-low-memory", Integer.MAX_VALUE))::get;
private final Supplier<Scheduler> querySchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("query"))::get;
private final Supplier<Scheduler> blockingSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("blocking"))::get;
private final Supplier<Scheduler> blockingLuceneSearchSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("search-blocking"))::get;
Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"lucene-low-memory", Integer.MAX_VALUE))::get;
private final Supplier<Scheduler> querySchedulerSupplier = USE_STANDARD_SCHEDULERS ?
Schedulers::boundedElastic : Suppliers.memoize(() -> boundedSchedulerSupplier.apply("query"))::get;
private final Supplier<Scheduler> blockingSchedulerSupplier = USE_STANDARD_SCHEDULERS ?
Schedulers::boundedElastic : Suppliers.memoize(() -> boundedSchedulerSupplier.apply("blocking"))::get;
private final Supplier<Scheduler> blockingLuceneSearchSchedulerSupplier = USE_STANDARD_SCHEDULERS ?
Schedulers::boundedElastic : Suppliers.memoize(() -> boundedSchedulerSupplier.apply("search-blocking"))::get;
/**
* Lucene query scheduler.
*/
@ -150,7 +157,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
indexWriterConfig.setSimilarity(getSimilarity());
this.indexWriter = new IndexWriter(directory, indexWriterConfig);
this.searcherManager = new SearcherManager(indexWriter, false, false, null);
this.searcherManager
= new SearcherManager(indexWriter, false, false, null);
if (lowMemory) {
this.luceneQueryScheduler = this.luceneBlockingScheduler = lowMemorySchedulerSupplier.get();
} else {
@ -431,7 +439,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (similarity instanceof TFIDFSimilarity) {
mlt.setSimilarity((TFIDFSimilarity) similarity);
} else {
logger.trace("Using an unsupported similarity algorithm for MoreLikeThis: {}. You must use a similarity instance based on TFIDFSimilarity!", similarity);
logger.trace("Using an unsupported similarity algorithm for MoreLikeThis:"
+ " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity);
}
// Get the reference doc and apply it to MoreLikeThis, to generate the query
@ -481,11 +490,18 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return search(snapshot, queryParams, keyFieldName, false, 0, 1);
}
public Mono<LLSearchResult> distributedSearch(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, long actionId, int scoreDivisor) {
public Mono<LLSearchResult> distributedSearch(@Nullable LLSnapshot snapshot,
QueryParams queryParams,
String keyFieldName,
long actionId,
int scoreDivisor) {
return search(snapshot, queryParams, keyFieldName, false, actionId, scoreDivisor);
}
public Mono<Void> distributedPreSearch(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, long actionId) {
public Mono<Void> distributedPreSearch(@Nullable LLSnapshot snapshot,
QueryParams queryParams,
String keyFieldName,
long actionId) {
return this
.search(snapshot, queryParams, keyFieldName, true, actionId, 1)
.then();