Use uninterruptible scheduler for lucene searches

This commit is contained in:
Andrea Cavalli 2021-12-16 16:14:44 +01:00
parent 2e1678373c
commit 6e312fe102
11 changed files with 218 additions and 120 deletions

View File

@ -0,0 +1,91 @@
package it.cavallium.dbengine.client;
import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.NotNull;
import reactor.core.Disposable;
import reactor.core.scheduler.Scheduler;
public class UninterruptibleScheduler {
public static Scheduler uninterruptibleScheduler(Scheduler scheduler) {
return new Scheduler() {
@Override
public @NotNull Disposable schedule(@NotNull Runnable task) {
scheduler.schedule(task);
return () -> {};
}
@Override
public @NotNull Disposable schedule(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
scheduler.schedule(task, delay, unit);
return () -> {};
}
@Override
public @NotNull Disposable schedulePeriodically(@NotNull Runnable task,
long initialDelay,
long period,
@NotNull TimeUnit unit) {
scheduler.schedulePeriodically(task, initialDelay, period, unit);
return () -> {};
}
@Override
public boolean isDisposed() {
return scheduler.isDisposed();
}
@Override
public void dispose() {
scheduler.dispose();
}
@Override
public void start() {
scheduler.start();
}
@Override
public long now(@NotNull TimeUnit unit) {
return Scheduler.super.now(unit);
}
@Override
public @NotNull Worker createWorker() {
var worker = scheduler.createWorker();
return new Worker() {
@Override
public @NotNull Disposable schedule(@NotNull Runnable task) {
worker.schedule(task);
return () -> {};
}
@Override
public void dispose() {
}
@Override
public boolean isDisposed() {
return worker.isDisposed();
}
@Override
public @NotNull Disposable schedule(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
worker.schedule(task, delay, unit);
return () -> {};
}
@Override
public @NotNull Disposable schedulePeriodically(@NotNull Runnable task,
long initialDelay,
long period,
@NotNull TimeUnit unit) {
worker.schedulePeriodically(task, initialDelay, period, unit);
return () -> {};
}
};
}
};
}
}

View File

@ -30,7 +30,8 @@ public final record ClientQueryParams(@Nullable CompositeSnapshot snapshot,
.limit(Long.MAX_VALUE)
.minCompetitiveScore(null)
.sort(null)
.timeout(Duration.ofSeconds(10))
// Default timeout: 2 minutes
.timeout(Duration.ofMinutes(2))
.computePreciseHitsCount(true);
}

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@ -34,7 +36,8 @@ import reactor.core.scheduler.Schedulers;
public class CachedIndexSearcherManager implements IndexSearcherManager {
private static final Logger logger = LoggerFactory.getLogger(CachedIndexSearcherManager.class);
private final Executor SEARCH_EXECUTOR = command -> Schedulers.boundedElastic().schedule(command);
private final Executor SEARCH_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new ShortNamedThreadFactory("lucene-search").withGroup(new ThreadGroup("lucene-search")));
private final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR);
private final SnapshotsManager snapshotsManager;
@ -72,7 +75,8 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
logger.error("Failed to refresh the searcher manager", ex);
}
})
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel())
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime))
.takeUntilOther(closeRequestedMono.asMono())
.doAfterTerminate(refresherClosed::tryEmitEmpty)
@ -102,6 +106,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
logger.info("Closing refreshes...");
if (!activeRefreshes.isTerminated()) {
try {
//noinspection BlockingMethodInNonBlockingContext
activeRefreshes.awaitAdvanceInterruptibly(activeRefreshes.arrive(), 15, TimeUnit.SECONDS);
} catch (Exception ex) {
if (ex instanceof TimeoutException) {
@ -115,6 +120,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
logger.info("Closing active searchers...");
if (!activeSearchers.isTerminated()) {
try {
//noinspection BlockingMethodInNonBlockingContext
activeSearchers.awaitAdvanceInterruptibly(activeSearchers.arrive(), 15, TimeUnit.SECONDS);
} catch (Exception ex) {
if (ex instanceof TimeoutException) {
@ -127,7 +133,9 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
logger.info("Closed active searchers");
cachedSnapshotSearchers.invalidateAll();
cachedSnapshotSearchers.cleanUp();
}).subscribeOn(Schedulers.boundedElastic())).cache();
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.publishOn(Schedulers.parallel())
.cache();
}
private Mono<Send<LLIndexSearcher>> generateCachedSearcher(@Nullable LLSnapshot snapshot) {

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE;
import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION;
@ -90,7 +91,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
* There is only a single thread globally to not overwhelm the disk with
* concurrent commits or concurrent refreshes.
*/
private static final Scheduler luceneHeavyTasksScheduler = Schedulers.single(Schedulers.boundedElastic());
private static final Scheduler luceneHeavyTasksScheduler = uninterruptibleScheduler(Schedulers.single(Schedulers.boundedElastic()));
private static final ExecutorService SAFE_EXECUTOR = Executors.newCachedThreadPool(new ShortNamedThreadFactory("lucene-index-impl"));
private final MeterRegistry meterRegistry;
@ -280,29 +281,17 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
private <V> Mono<V> runSafe(Callable<V> callable) {
return Mono.create(sink -> Schedulers.boundedElastic().schedule(() -> {
try {
var result = callable.call();
if (result != null) {
sink.success(result);
} else {
sink.success();
}
} catch (Throwable e) {
sink.error(e);
}
}));
return Mono
.fromCallable(callable)
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel());
}
private <V> Mono<V> runSafe(IORunnable runnable) {
return Mono.create(sink -> Schedulers.boundedElastic().schedule(() -> {
try {
runnable.run();
sink.success();
} catch (Throwable e) {
sink.error(e);
}
}));
return Mono.<V>fromCallable(() -> {
runnable.run();
return null;
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())).publishOn(Schedulers.parallel());
}
@Override

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import io.micrometer.core.instrument.MeterRegistry;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
@ -280,10 +282,12 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
.flatMap(LLLocalLuceneIndex::close)
.then(Mono.fromCallable(() -> {
if (multiSearcher instanceof Closeable closeable) {
//noinspection BlockingMethodInNonBlockingContext
closeable.close();
}
return null;
}).subscribeOn(Schedulers.boundedElastic()))
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.publishOn(Schedulers.parallel())
.then();
}

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import it.cavallium.dbengine.database.LLSnapshot;
import java.io.IOException;
import java.util.Objects;
@ -57,34 +59,28 @@ public class SnapshotsManager {
*/
private Mono<IndexCommit> takeLuceneSnapshot() {
return Mono
.<IndexCommit>create(sink -> Schedulers.boundedElastic().schedule(() -> {
try {
sink.success(snapshotter.snapshot());
} catch (Throwable ex) {
sink.error(ex);
}
}))
.fromCallable(snapshotter::snapshot)
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.onErrorResume(ex -> {
if (ex instanceof IllegalStateException && "No index commit to snapshot".equals(ex.getMessage())) {
return Mono.create(sink -> Schedulers.boundedElastic().schedule(() -> {
return Mono.fromCallable(() -> {
activeTasks.register();
try {
indexWriter.commit();
sink.success(snapshotter.snapshot());
} catch (Throwable e) {
sink.error(e);
return snapshotter.snapshot();
} finally {
activeTasks.arriveAndDeregister();
}
}));
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()));
} else {
return Mono.error(ex);
}
});
})
.publishOn(Schedulers.parallel());
}
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono.create(sink -> Schedulers.boundedElastic().schedule(() -> {
return Mono.<Void>fromCallable(() -> {
activeTasks.register();
try {
var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber());
@ -98,13 +94,11 @@ public class SnapshotsManager {
snapshotter.release(luceneIndexSnapshot);
// Delete unused files after releasing the snapshot
indexWriter.deleteUnusedFiles();
sink.success();
} catch (Throwable ex) {
sink.error(ex);
return null;
} finally {
activeTasks.arriveAndDeregister();
}
}));
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())).publishOn(Schedulers.parallel());
}
public void close() {

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.lucene;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
@ -27,6 +29,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -355,28 +358,27 @@ public class LuceneUtils {
);
}
/**
* HitsFlux must run on a blocking scheduler
*/
public static Flux<LLKeyScore> convertHits(Flux<ScoreDoc> hitsFlux,
List<IndexSearcher> indexSearchers,
String keyFieldName,
boolean preserveOrder) {
if (preserveOrder) {
return hitsFlux
.mapNotNull(hit -> mapHitBlocking(hit, indexSearchers, keyFieldName));
.publishOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.mapNotNull(hit -> mapHitBlocking(hit, indexSearchers, keyFieldName))
.publishOn(Schedulers.parallel());
} else {
// Compute parallelism
var availableProcessors = Runtime.getRuntime().availableProcessors();
var min = Queues.XS_BUFFER_SIZE;
var maxParallelGroups = Math.max(availableProcessors, min);
return hitsFlux
.groupBy(hit -> hit.shardIndex % maxParallelGroups) // Max n groups
.flatMap(shardHits -> shardHits
.mapNotNull(hit -> mapHitBlocking(hit, indexSearchers, keyFieldName)),
maxParallelGroups // Max n concurrency. Concurrency must be >= total groups count
);
.buffer(Queues.XS_BUFFER_SIZE, () -> new ArrayList<Object>(Queues.XS_BUFFER_SIZE))
.flatMap(shardHits -> Mono.fromCallable(() -> {
for (int i = 0, size = shardHits.size(); i < size; i++) {
shardHits.set(i, mapHitBlocking((ScoreDoc) shardHits.get(i), indexSearchers, keyFieldName));
}
//noinspection unchecked
return (List<LLKeyScore>) (List<?>) shardHits;
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.flatMapIterable(a -> a)
.publishOn(Schedulers.parallel());
}
}
@ -384,16 +386,16 @@ public class LuceneUtils {
private static LLKeyScore mapHitBlocking(ScoreDoc hit,
List<IndexSearcher> indexSearchers,
String keyFieldName) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called mapHitBlocking in a nonblocking thread");
}
assert !Schedulers.isInNonBlockingThread();
int shardDocId = hit.doc;
int shardIndex = hit.shardIndex;
float score = hit.score;
IndexSearcher indexSearcher;
if (shardIndex == -1 && indexSearchers.size() == 1) {
shardIndex = 0;
indexSearcher = indexSearchers.get(0);
} else {
indexSearcher = indexSearchers.get(shardIndex);
}
var indexSearcher = indexSearchers.get(shardIndex);
try {
String collectedDoc = keyOfTopDoc(shardDocId, indexSearcher.getIndexReader(), keyFieldName);
return new LLKeyScore(shardDocId, score, collectedDoc);
@ -556,9 +558,9 @@ public class LuceneUtils {
localQueryParams.minCompetitiveScore(),
localQueryParams.sort(),
localQueryParams.computePreciseHitsCount(),
localQueryParams.timeout()
);
}).subscribeOn(Schedulers.boundedElastic()));
localQueryParams.timeout());
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.publishOn(Schedulers.parallel());
}
public static Collector withTimeout(Collector collector, Duration timeout) {

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
@ -120,7 +122,8 @@ public class CountMultiSearcher implements MultiSearcher {
LLUtils.ensureBlocking();
return is.getIndexSearcher().count(queryParams2.query());
}
}).subscribeOn(Schedulers.boundedElastic()))
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.publishOn(Schedulers.parallel())
.timeout(queryParams.timeout());
},
is -> Mono.empty()

View File

@ -1,10 +1,12 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.UninterruptibleScheduler;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
@ -91,7 +93,9 @@ public class PagedLocalSearcher implements LocalSearcher {
.just(currentPageInfo)
.<PageData>handle((s, sink) -> this.searchPageSync(queryParams, indexSearchers, pagination, resultsOffset, s, sink))
//defaultIfEmpty(new PageData(new TopDocs(new TotalHits(0, Relation.EQUAL_TO), new ScoreDoc[0]), currentPageInfo))
.single();
.single()
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel());
}
/**
@ -144,7 +148,8 @@ public class PagedLocalSearcher implements LocalSearcher {
(s, sink) -> searchPageSync(queryParams, indexSearchers, true, 0, s, sink),
s -> {}
)
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel())
.map(PageData::topDocs)
.flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs))
.transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers,

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
import io.net5.buffer.api.Send;
@ -146,7 +147,8 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
.doOnNext(s -> currentPageInfoRef.set(s.nextPageInfo()))
.repeatWhen(s -> s.takeWhile(n -> n > 0));
})
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel())
.map(PageData::topDocs)
.flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs))
.transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers,
@ -194,7 +196,8 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
var cm = cmm.get(shard, index);
return shard.search(queryParams.query(), cm);
}))
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.publishOn(Schedulers.parallel())
.collectList()
.flatMap(results -> Mono.fromCallable(() -> {
LLUtils.ensureBlocking();
@ -210,8 +213,9 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
var nextPageIndex = s.pageIndex() + 1;
var nextPageInfo = new CurrentPageInfo(pageLastDoc, nextRemainingLimit, nextPageIndex);
return new PageData(pageTopDocs, nextPageInfo);
}))
);
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
)
.publishOn(Schedulers.parallel());
}
@Override

View File

@ -33,7 +33,7 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
private static final int SEARCH_THREADS = Math.min(Math.max(8, Runtime.getRuntime().availableProcessors()), 128);
private static final ThreadFactory THREAD_FACTORY = new ShortNamedThreadFactory("UnscoredStreamingSearcher");
private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(); // Executors.newFixedThreadPool(SEARCH_THREADS, THREAD_FACTORY);
private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(THREAD_FACTORY); // Executors.newFixedThreadPool(SEARCH_THREADS, THREAD_FACTORY);
@Override
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
@ -77,56 +77,53 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
}
private Flux<ScoreDoc> getScoreDocs(LocalQueryParams localQueryParams, List<IndexSearcher> shards) {
return Flux.<ScoreDoc>create(sink -> {
AtomicReference<Thread> threadAtomicReference = new AtomicReference<>();
return Flux
.<ScoreDoc>create(sink -> EXECUTOR.execute(() -> {
try {
LLUtils.ensureBlocking();
var thread = Thread.currentThread();
sink.onRequest(lc -> LockSupport.unpark(thread));
int shardIndexTemp = 0;
for (IndexSearcher shard : shards) {
if (sink.isCancelled()) break;
final int shardIndex = shardIndexTemp;
shard.search(localQueryParams.query(), withTimeout(new SimpleCollector() {
private LeafReaderContext leafReaderContext;
var disposable = EXECUTOR.submit(() -> {
try {
LLUtils.ensureBlocking();
threadAtomicReference.set(Thread.currentThread());
int shardIndexTemp = 0;
for (IndexSearcher shard : shards) {
if (sink.isCancelled()) break;
final int shardIndex = shardIndexTemp;
var collector = withTimeout(new SimpleCollector() {
private LeafReaderContext leafReaderContext;
@Override
protected void doSetNextReader(LeafReaderContext context) {
this.leafReaderContext = context;
}
@Override
public void collect(int i) {
// Assert that this is a non-blocking context
assert !Schedulers.isInNonBlockingThread();
var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex);
while (sink.requestedFromDownstream() <= 0 || sink.isCancelled()) {
if (sink.isCancelled()) {
throw new CollectionTerminatedException();
}
// 1000ms
LockSupport.parkNanos(1000000000L);
@Override
protected void doSetNextReader(LeafReaderContext context) {
this.leafReaderContext = context;
}
sink.next(scoreDoc);
}
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
}, localQueryParams.timeout());
shard.search(localQueryParams.query(), collector);
shardIndexTemp++;
@Override
public void collect(int i) {
// Assert that this is a non-blocking context
assert !Schedulers.isInNonBlockingThread();
var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex);
if (sink.requestedFromDownstream() <= 0 || sink.isCancelled()) {
if (sink.isCancelled()) {
throw new CollectionTerminatedException();
} else {
// 1000ms
LockSupport.parkNanos(1000000000L);
}
}
sink.next(scoreDoc);
}
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
}, localQueryParams.timeout()));
shardIndexTemp++;
}
} catch (Throwable e) {
sink.error(e);
}
} catch (Throwable e) {
sink.error(e);
}
sink.complete();
});
sink.onRequest(lc -> LockSupport.unpark(threadAtomicReference.get()));
sink.onDispose(() -> disposable.cancel(false));
}, OverflowStrategy.BUFFER).publishOn(Schedulers.boundedElastic());
sink.complete();
}), OverflowStrategy.BUFFER)
.publishOn(Schedulers.parallel());
}