Optimize lucene schedulers
This commit is contained in:
parent
07ea61050f
commit
7f15a6e099
@ -27,11 +27,11 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
@ -77,14 +77,22 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
* There is only a single thread globally to not overwhelm the disk with
|
||||
* concurrent commits or concurrent refreshes.
|
||||
*/
|
||||
private static final Scheduler luceneBlockingScheduler = Schedulers.newBoundedElastic(1,
|
||||
private static final Scheduler luceneHeavyTasksScheduler = Schedulers.newBoundedElastic(1,
|
||||
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
|
||||
"lucene",
|
||||
120,
|
||||
Integer.MAX_VALUE,
|
||||
true
|
||||
);
|
||||
private static final Supplier<Scheduler> lowMemorySupplier = Suppliers.memoize(() ->
|
||||
private final Scheduler luceneBlockingScheduler;
|
||||
private static final Function<String, Scheduler> boundedSchedulerSupplier = name -> Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
|
||||
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
|
||||
"lucene-" + name,
|
||||
60
|
||||
);
|
||||
private static final Supplier<Scheduler> lowMemorySchedulerSupplier = Suppliers.memoize(() ->
|
||||
Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "lucene-low-memory", Integer.MAX_VALUE))::get;
|
||||
private static final Supplier<Scheduler> querySchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("query"))::get;
|
||||
private static final Supplier<Scheduler> blockingSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("blocking"))::get;
|
||||
/**
|
||||
* Lucene query scheduler.
|
||||
*/
|
||||
@ -142,14 +150,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
this.indexWriter = new IndexWriter(directory, indexWriterConfig);
|
||||
this.searcherManager = new SearcherManager(indexWriter, false, false, null);
|
||||
if (lowMemory) {
|
||||
this.luceneQueryScheduler = lowMemorySupplier.get();
|
||||
this.luceneQueryScheduler = this.luceneBlockingScheduler = lowMemorySchedulerSupplier.get();
|
||||
} else {
|
||||
this.luceneQueryScheduler = Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
|
||||
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
|
||||
"lucene-query",
|
||||
60,
|
||||
true
|
||||
);
|
||||
this.luceneBlockingScheduler = blockingSchedulerSupplier.get();
|
||||
this.luceneQueryScheduler = querySchedulerSupplier.get();
|
||||
}
|
||||
|
||||
// Create scheduled tasks lifecycle manager
|
||||
@ -165,7 +169,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
}
|
||||
|
||||
private void registerScheduledFixedTask(Runnable task, Duration duration) {
|
||||
scheduledTasksLifecycle.registerScheduledTask(luceneBlockingScheduler.schedulePeriodically(() -> {
|
||||
scheduledTasksLifecycle.registerScheduledTask(luceneHeavyTasksScheduler.schedulePeriodically(() -> {
|
||||
scheduledTasksLifecycle.startScheduledTask();
|
||||
try {
|
||||
task.run();
|
||||
@ -182,17 +186,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
|
||||
@Override
|
||||
public Mono<LLSnapshot> takeSnapshot() {
|
||||
return Mono
|
||||
.fromCallable(lastSnapshotSeqNo::incrementAndGet)
|
||||
.subscribeOn(luceneBlockingScheduler)
|
||||
.flatMap(snapshotSeqNo -> takeLuceneSnapshot()
|
||||
return takeLuceneSnapshot()
|
||||
.flatMap(snapshot -> Mono
|
||||
.fromCallable(() -> {
|
||||
var snapshotSeqNo = lastSnapshotSeqNo.incrementAndGet();
|
||||
this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot));
|
||||
return new LLSnapshot(snapshotSeqNo);
|
||||
})
|
||||
.subscribeOn(luceneBlockingScheduler)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@ -202,22 +203,22 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
*/
|
||||
private Mono<IndexCommit> takeLuceneSnapshot() {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
try {
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
return snapshotter.snapshot();
|
||||
} catch (IllegalStateException ex) {
|
||||
if ("No index commit to snapshot".equals(ex.getMessage())) {
|
||||
.fromCallable(snapshotter::snapshot)
|
||||
.subscribeOn(luceneBlockingScheduler)
|
||||
.onErrorResume(ex -> Mono
|
||||
.defer(() -> {
|
||||
if (ex instanceof IllegalStateException && "No index commit to snapshot".equals(ex.getMessage())) {
|
||||
return Mono.fromCallable(() -> {
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
indexWriter.commit();
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
return snapshotter.snapshot();
|
||||
}).subscribeOn(luceneHeavyTasksScheduler);
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
return Mono.error(ex);
|
||||
}
|
||||
})
|
||||
.subscribeOn(luceneBlockingScheduler);
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -314,7 +315,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
indexWriter.commit();
|
||||
return null;
|
||||
}).subscribeOn(luceneBlockingScheduler);
|
||||
}).subscribeOn(luceneHeavyTasksScheduler);
|
||||
}
|
||||
|
||||
private Mono<IndexSearcher> acquireSearcherWrapper(LLSnapshot snapshot, boolean distributedPre, long actionId) {
|
||||
@ -554,13 +555,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
|
||||
var searchFlux = Mono.<Void>create(sink -> {
|
||||
try {
|
||||
var opId = new Random().nextInt();
|
||||
if (doDistributedPre) {
|
||||
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
|
||||
totalHitsCountSink.tryEmitValue(0L);
|
||||
} else {
|
||||
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
|
||||
logger.warn(opId + " start");
|
||||
streamSearcher.search(indexSearcher,
|
||||
luceneQuery,
|
||||
boundedLimit,
|
||||
@ -569,7 +568,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
minCompetitiveScore,
|
||||
keyFieldName,
|
||||
keyScore -> {
|
||||
logger.warn(opId + " item");
|
||||
EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor));
|
||||
if (result.isSuccess()) {
|
||||
return HandleResult.CONTINUE;
|
||||
@ -590,7 +588,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
}
|
||||
},
|
||||
totalHitsCount -> {
|
||||
logger.warn(opId + " total-hits-count");
|
||||
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
|
||||
if (result.isFailure()) {
|
||||
if (result == EmitResult.FAIL_CANCELLED) {
|
||||
@ -607,7 +604,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
}
|
||||
);
|
||||
}
|
||||
logger.warn(opId + " complete");
|
||||
topKeysSink.tryEmitComplete();
|
||||
sink.success();
|
||||
} catch (IOException e) {
|
||||
@ -634,7 +630,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
directory.close();
|
||||
return null;
|
||||
})
|
||||
.subscribeOn(luceneBlockingScheduler);
|
||||
.subscribeOn(luceneHeavyTasksScheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -650,7 +646,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
}
|
||||
return null;
|
||||
})
|
||||
.subscribeOn(luceneBlockingScheduler);
|
||||
.subscribeOn(luceneHeavyTasksScheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -666,7 +662,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
}
|
||||
return null;
|
||||
})
|
||||
.subscribeOn(luceneBlockingScheduler);
|
||||
.subscribeOn(luceneHeavyTasksScheduler);
|
||||
}
|
||||
|
||||
private void scheduledCommit() {
|
||||
|
@ -6,7 +6,6 @@ import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.SortField;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
/**
|
||||
@ -76,14 +75,4 @@ public class AdaptiveStreamSearcher implements LuceneStreamSearcher {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isScoreSort(Sort luceneSort) {
|
||||
if (luceneSort == null) return false;
|
||||
for (SortField sortField : luceneSort.getSort()) {
|
||||
if (sortField != SortField.FIELD_SCORE) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user