From 717d13ef1fa596205f87fae8d7ab894b8dea59b4 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 7 Sep 2021 11:26:10 +0200 Subject: [PATCH] Revert commit 1882e8b3005b23fe3939f06a64c70117370f9ef9 --- .../database/disk/LLLocalDictionary.java | 28 +++++++++------ .../disk/LLLocalKeyValueDatabase.java | 32 ++++++++++++----- .../database/disk/LLLocalLuceneIndex.java | 35 ++++++++++++++----- .../disk/LLLocalMultiLuceneIndex.java | 6 ++-- .../database/disk/LLLocalSingleton.java | 9 +++-- .../dbengine/lucene/LuceneUtils.java | 5 +-- .../searcher/AdaptiveLuceneLocalSearcher.java | 10 +++--- .../searcher/CountLuceneLocalSearcher.java | 7 ++-- .../searcher/CountLuceneMultiSearcher.java | 9 +++-- .../lucene/searcher/LocalLuceneWrapper.java | 7 ++-- .../lucene/searcher/LuceneLocalSearcher.java | 4 ++- .../lucene/searcher/LuceneShardSearcher.java | 7 ++-- .../ScoredSimpleLuceneShardSearcher.java | 17 +++++---- .../searcher/SimpleLuceneLocalSearcher.java | 11 ++++-- .../UnscoredPagedLuceneShardSearcher.java | 25 ++++++++----- ...UnsortedContinuousLuceneMultiSearcher.java | 10 ++++-- 16 files changed, 154 insertions(+), 68 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 321c733..4883cb7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -143,6 +143,7 @@ public class LLLocalDictionary implements LLDictionary { private final ColumnFamilyHandle cfh; private final String databaseName; private final String columnName; + private final Scheduler dbScheduler; private final Function snapshotResolver; private final Striped itemsLock = Striped.readWriteStampedLock(STRIPES); private final UpdateMode updateMode; @@ -157,6 +158,7 @@ public class LLLocalDictionary implements LLDictionary { @NotNull ColumnFamilyHandle columnFamilyHandle, String databaseName, String columnName, + Scheduler dbScheduler, Function snapshotResolver, UpdateMode updateMode, DatabaseOptions databaseOptions) { @@ -166,6 +168,7 @@ public class LLLocalDictionary implements LLDictionary { this.cfh = columnFamilyHandle; this.databaseName = databaseName; this.columnName = columnName; + this.dbScheduler = dbScheduler; this.snapshotResolver = snapshotResolver; this.updateMode = updateMode; this.getRangeMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeMulti"; @@ -241,7 +244,7 @@ public class LLLocalDictionary implements LLDictionary { } private @NotNull Mono runOnDb(Callable<@Nullable T> callable) { - return Mono.fromCallable(callable); + return Mono.fromCallable(callable).subscribeOn(dbScheduler); } @Override @@ -1420,7 +1423,7 @@ public class LLLocalDictionary implements LLDictionary { rangeSend -> Flux.using( () -> new LLLocalEntryReactiveRocksIterator(db, alloc, cfh, rangeSend, databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeMultiDebugName), - LLLocalReactiveRocksIterator::flux, + llLocalEntryReactiveRocksIterator -> llLocalEntryReactiveRocksIterator.flux().subscribeOn(dbScheduler), LLLocalReactiveRocksIterator::release ).transform(LLUtils::handleDiscard), rangeSend -> Mono.fromRunnable(rangeSend::close) @@ -1432,7 +1435,7 @@ public class LLLocalDictionary implements LLDictionary { rangeSend -> Flux.using( () -> new LLLocalGroupedEntryReactiveRocksIterator(db, alloc, cfh, prefixLength, rangeSend, databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), "getRangeMultiGrouped"), - LLLocalGroupedReactiveRocksIterator::flux, + reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler), LLLocalGroupedReactiveRocksIterator::release ).transform(LLUtils::handleDiscard), rangeSend -> Mono.fromRunnable(rangeSend::close) @@ -1463,7 +1466,7 @@ public class LLLocalDictionary implements LLDictionary { rangeSend -> Flux.using( () -> new LLLocalGroupedKeyReactiveRocksIterator(db, alloc, cfh, prefixLength, rangeSend, databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), "getRangeKeysGrouped"), - LLLocalGroupedReactiveRocksIterator::flux, + reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler), LLLocalGroupedReactiveRocksIterator::release ).transform(LLUtils::handleDiscard), rangeSend -> Mono.fromRunnable(rangeSend::close) @@ -1474,7 +1477,7 @@ public class LLLocalDictionary implements LLDictionary { public Flux badBlocks(Mono> rangeMono) { return Flux.usingWhen(rangeMono, rangeSend -> Flux - .create(sink -> { + .create(sink -> { var range = rangeSend.receive(); sink.onDispose(range::close); try (var ro = new ReadOptions(getReadOptions(null))) { @@ -1511,7 +1514,8 @@ public class LLLocalDictionary implements LLDictionary { } catch (Throwable ex) { sink.error(ex); } - }), + }) + .subscribeOn(dbScheduler), rangeSend -> Mono.fromRunnable(rangeSend::close) ); } @@ -1531,9 +1535,10 @@ public class LLLocalDictionary implements LLDictionary { true, "getRangeKeysGrouped" ), - LLLocalKeyPrefixReactiveRocksIterator::flux, - LLLocalKeyPrefixReactiveRocksIterator::release - ), + it -> it.flux(), + it -> it.release() + ) + .subscribeOn(dbScheduler), rangeSend -> Mono.fromRunnable(rangeSend::close) ); } @@ -1560,7 +1565,7 @@ public class LLLocalDictionary implements LLDictionary { rangeSend -> Flux.using( () -> new LLLocalKeyReactiveRocksIterator(db, alloc, cfh, rangeSend, databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeKeysMultiDebugName), - LLLocalReactiveRocksIterator::flux, + llLocalKeyReactiveRocksIterator -> llLocalKeyReactiveRocksIterator.flux().subscribeOn(dbScheduler), LLLocalReactiveRocksIterator::release ).transform(LLUtils::handleDiscard), rangeSend -> Mono.fromRunnable(rangeSend::close) @@ -2001,7 +2006,8 @@ public class LLLocalDictionary implements LLDictionary { return null; } }) - .onErrorMap(cause -> new IOException("Failed to clear", cause)); + .onErrorMap(cause -> new IOException("Failed to clear", cause)) + .subscribeOn(dbScheduler); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 6a7e3f0..a7ba9eb 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -66,6 +66,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { RocksDB.DEFAULT_COLUMN_FAMILY); private final BufferAllocator allocator; + private final Scheduler dbScheduler; // Configurations @@ -128,6 +129,12 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // 8 or more threadCap = Math.max(8, Runtime.getRuntime().availableProcessors()); } + this.dbScheduler = Schedulers.newBoundedElastic(threadCap, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + "db-" + name, + 60, + true + ); this.enableColumnsBug = "true".equals(databaseOptions.extraFlags().getOrDefault("enableColumnBug", "false")); createIfNotExists(descriptors, rocksdbOptions, databaseOptions, dbPath, dbPathString); @@ -458,9 +465,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), LLLocalKeyValueDatabase.this.name, name, + dbScheduler, defaultValue )) - .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)); + .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)) + .subscribeOn(dbScheduler); } @Override @@ -472,10 +481,12 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { getCfh(columnName), name, Column.toString(columnName), + dbScheduler, (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), updateMode, databaseOptions - )); + )) + .subscribeOn(dbScheduler); } private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException { @@ -491,7 +502,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { @Override public Mono getProperty(String propertyName) { return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName)) - .onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause)); + .onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause)) + .subscribeOn(dbScheduler); } @Override @@ -502,7 +514,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return null; }) .onErrorMap(cause -> new IOException("Failed to verify checksum of database \"" - + getDatabaseName() + "\"", cause)); + + getDatabaseName() + "\"", cause)) + .subscribeOn(dbScheduler); } @Override @@ -518,20 +531,22 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement(); this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); return new LLSnapshot(currentSnapshotSequenceNumber); - }); + }) + .subscribeOn(dbScheduler); } @Override public Mono releaseSnapshot(LLSnapshot snapshot) { return Mono - .fromCallable(() -> { + .fromCallable(() -> { Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber()); if (dbSnapshot == null) { throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!"); } db.releaseSnapshot(dbSnapshot); return null; - }); + }) + .subscribeOn(dbScheduler); } @Override @@ -546,7 +561,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } return null; }) - .onErrorMap(cause -> new IOException("Failed to close", cause)); + .onErrorMap(cause -> new IOException("Failed to close", cause)) + .subscribeOn(dbScheduler); } /** diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index c7eeb69..cf8b79d 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -76,6 +76,16 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { Integer.MAX_VALUE, true ); + // Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks + protected static final Scheduler luceneSearcherScheduler = Schedulers.newBoundedElastic( + 4, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + "lucene-searcher", + 60, + true + ); + // Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks + private static final Scheduler luceneWriterScheduler = Schedulers.boundedElastic(); private final String luceneIndexName; private final IndexWriter indexWriter; @@ -174,9 +184,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND); indexWriterConfig.setIndexDeletionPolicy(snapshotter); indexWriterConfig.setCommitOnClose(true); + int writerSchedulerMaxThreadCount; MergeScheduler mergeScheduler; if (lowMemory) { mergeScheduler = new SerialMergeScheduler(); + writerSchedulerMaxThreadCount = 1; } else { var concurrentMergeScheduler = new ConcurrentMergeScheduler(); concurrentMergeScheduler.setDefaultMaxMergesAndThreads(false); @@ -185,6 +197,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } else { concurrentMergeScheduler.enableAutoIOThrottle(); } + writerSchedulerMaxThreadCount = concurrentMergeScheduler.getMaxThreadCount(); mergeScheduler = concurrentMergeScheduler; } indexWriterConfig.setMergeScheduler(mergeScheduler); @@ -231,18 +244,20 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono.fromCallable(() -> { activeTasks.register(); try { + //noinspection BlockingMethodInNonBlockingContext indexWriter.addDocument(LLUtils.toDocument(doc)); return null; } finally { activeTasks.arriveAndDeregister(); } - }); + }).subscribeOn(luceneWriterScheduler); } @Override public Mono addDocuments(Flux> documents) { return documents .collectList() + .publishOn(luceneWriterScheduler) .flatMap(documentsList -> Mono .fromCallable(() -> { activeTasks.register(); @@ -262,12 +277,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono.fromCallable(() -> { activeTasks.register(); try { + //noinspection BlockingMethodInNonBlockingContext indexWriter.deleteDocuments(LLUtils.toTerm(id)); return null; } finally { activeTasks.arriveAndDeregister(); } - }); + }).subscribeOn(luceneWriterScheduler); } @Override @@ -275,12 +291,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono.fromCallable(() -> { activeTasks.register(); try { + //noinspection BlockingMethodInNonBlockingContext indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)); } finally { activeTasks.arriveAndDeregister(); } return null; - }); + }).subscribeOn(luceneWriterScheduler); } @Override @@ -296,13 +313,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { for (Entry entry : documentsMap.entrySet()) { LLTerm key = entry.getKey(); LLDocument value = entry.getValue(); + //noinspection BlockingMethodInNonBlockingContext indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value)); } return null; } finally { activeTasks.arriveAndDeregister(); } - }); + }) + .subscribeOn(luceneWriterScheduler); } @Override @@ -333,7 +352,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .flatMap(indexSearcher -> { Mono releaseMono = searcherManager.releaseUsedIndexSearcher(indexSearcher); return localSearcher - .collect(indexSearcher.getIndexSearcher(), releaseMono, modifiedLocalQuery, keyFieldName) + .collect(indexSearcher.getIndexSearcher(), releaseMono, modifiedLocalQuery, keyFieldName, luceneSearcherScheduler) .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release())) .onErrorResume(ex -> releaseMono.then(Mono.error(ex))); }) @@ -349,7 +368,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .flatMap(indexSearcher -> { Mono releaseMono = searcherManager.releaseUsedIndexSearcher(indexSearcher); return shardSearcher - .searchOn(indexSearcher.getIndexSearcher(), releaseMono, modifiedLocalQuery) + .searchOn(indexSearcher.getIndexSearcher(), releaseMono, modifiedLocalQuery, luceneSearcherScheduler) .onErrorResume(ex -> releaseMono.then(Mono.error(ex))); }) ); @@ -426,7 +445,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return searcherManager.captureIndexSearcher(snapshot).flatMap(indexSearcher -> { Mono releaseMono = searcherManager.releaseUsedIndexSearcher(indexSearcher); return localSearcher - .collect(indexSearcher.getIndexSearcher(), releaseMono, localQueryParams, keyFieldName) + .collect(indexSearcher.getIndexSearcher(), releaseMono, localQueryParams, keyFieldName, luceneSearcherScheduler) .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release())) .onErrorResume(ex -> releaseMono.then(Mono.error(ex))); }); @@ -439,7 +458,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return searcherManager.captureIndexSearcher(snapshot) .flatMap(indexSearcher -> { Mono releaseMono = searcherManager.releaseUsedIndexSearcher(indexSearcher); - return shardSearcher.searchOn(indexSearcher.getIndexSearcher(), releaseMono, localQueryParams) + return shardSearcher.searchOn(indexSearcher.getIndexSearcher(), releaseMono, localQueryParams, luceneSearcherScheduler) .onErrorResume(ex -> releaseMono.then(Mono.error(ex))); }); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 5f45881..466ea11 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database.disk; +import static it.cavallium.dbengine.database.disk.LLLocalLuceneIndex.luceneSearcherScheduler; + import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader.InvalidCacheLoadException; @@ -218,7 +220,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { .flatMap(luceneIndexWithSnapshot -> luceneIndexWithSnapshot.luceneIndex() .distributedMoreLikeThis(luceneIndexWithSnapshot.snapshot.orElse(null), queryParams, mltDocumentFields, shardSearcher)) // Collect all the shards results into a single global result - .then(shardSearcher.collect(localQueryParams, keyFieldName)) + .then(shardSearcher.collect(localQueryParams, keyFieldName, luceneSearcherScheduler)) ) // Fix the result type .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release())); @@ -246,7 +248,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { .flatMap(luceneIndexWithSnapshot -> luceneIndexWithSnapshot.luceneIndex() .distributedSearch(luceneIndexWithSnapshot.snapshot.orElse(null), queryParams, shardSearcher)) // Collect all the shards results into a single global result - .then(shardSearcher.collect(localQueryParams, keyFieldName)) + .then(shardSearcher.collect(localQueryParams, keyFieldName, luceneSearcherScheduler)) ) // Fix the result type .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release())); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index 334295d..71d49e3 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -23,17 +23,20 @@ public class LLLocalSingleton implements LLSingleton { private final Function snapshotResolver; private final byte[] name; private final String databaseName; + private final Scheduler dbScheduler; public LLLocalSingleton(RocksDB db, ColumnFamilyHandle singletonListColumn, Function snapshotResolver, String databaseName, byte[] name, + Scheduler dbScheduler, byte[] defaultValue) throws RocksDBException { this.db = db; this.cfh = singletonListColumn; this.databaseName = databaseName; this.snapshotResolver = snapshotResolver; this.name = name; + this.dbScheduler = dbScheduler; if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Initialized in a nonblocking thread"); } @@ -59,7 +62,8 @@ public class LLLocalSingleton implements LLSingleton { } return db.get(cfh, resolveSnapshot(snapshot), name); }) - .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)); + .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)) + .subscribeOn(dbScheduler); } @Override @@ -72,7 +76,8 @@ public class LLLocalSingleton implements LLSingleton { db.put(cfh, name, value); return null; }) - .onErrorMap(cause -> new IOException("Failed to write " + Arrays.toString(name), cause)); + .onErrorMap(cause -> new IOException("Failed to write " + Arrays.toString(name), cause)) + .subscribeOn(dbScheduler); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index b312bc3..2485afe 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -69,7 +69,6 @@ import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; public class LuceneUtils { @@ -366,6 +365,7 @@ public class LuceneUtils { public static Flux convertHits(ScoreDoc[] hits, IndexSearchers indexSearchers, String keyFieldName, + Scheduler scheduler, boolean preserveOrder) { return Flux @@ -373,11 +373,12 @@ public class LuceneUtils { .transform(hitsFlux -> { if (preserveOrder) { return hitsFlux + .publishOn(scheduler) .mapNotNull(hit -> mapHitBlocking(hit, indexSearchers, keyFieldName)); } else { return hitsFlux .parallel() - .runOn(Schedulers.boundedElastic()) + .runOn(scheduler) .map(hit -> { var result = mapHitBlocking(hit, indexSearchers, keyFieldName); // The "else" value is an errored key score, to filter out next diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java index b392185..d06a072 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java @@ -17,22 +17,24 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher { public Mono collect(IndexSearcher indexSearcher, Mono releaseIndexSearcher, LocalQueryParams queryParams, - String keyFieldName) { + String keyFieldName, + Scheduler scheduler) { if (Schedulers.isInNonBlockingThread()) { return releaseIndexSearcher .then(Mono.error(() -> new UnsupportedOperationException("Called collect in a nonblocking thread"))); } if (queryParams.limit() == 0) { - return countSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName); + return countSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName, scheduler); } else if (!queryParams.isScored() && queryParams.offset() == 0 && queryParams.limit() >= 2147483630 && !queryParams.isSorted()) { return unscoredPagedLuceneLocalSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, - keyFieldName + keyFieldName, + scheduler ); } else { - return localSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName); + return localSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName, scheduler); } } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java index 5764bcb..215950d 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java @@ -15,7 +15,9 @@ public class CountLuceneLocalSearcher implements LuceneLocalSearcher { public Mono collect(IndexSearcher indexSearcher, Mono releaseIndexSearcher, LocalQueryParams queryParams, - String keyFieldName) { + String keyFieldName, + Scheduler scheduler) { + //noinspection BlockingMethodInNonBlockingContext return Mono .fromCallable(() -> { if (Schedulers.isInNonBlockingThread()) { @@ -26,6 +28,7 @@ public class CountLuceneLocalSearcher implements LuceneLocalSearcher { Flux.empty(), releaseIndexSearcher); } - ); + ) + .subscribeOn(scheduler); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java index 42814f3..835558b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java @@ -25,17 +25,20 @@ public class CountLuceneMultiSearcher implements LuceneMultiSearcher { @Override public Mono searchOn(IndexSearcher indexSearcher, Mono releaseIndexSearcher, - LocalQueryParams queryParams) { + LocalQueryParams queryParams, + Scheduler scheduler) { return Mono .fromCallable(() -> { + //noinspection BlockingMethodInNonBlockingContext totalHits.addAndGet(indexSearcher.count(queryParams.query())); release.add(releaseIndexSearcher); return null; - }); + }) + .subscribeOn(scheduler); } @Override - public Mono collect(LocalQueryParams queryParams, String keyFieldName) { + public Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) { return Mono.fromCallable(() -> { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called collect in a nonblocking thread"); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalLuceneWrapper.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalLuceneWrapper.java index 618387f..493cd6b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalLuceneWrapper.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalLuceneWrapper.java @@ -16,12 +16,13 @@ public class LocalLuceneWrapper implements LuceneLocalSearcher { public Mono collect(IndexSearcher indexSearcher, Mono releaseIndexSearcher, LocalQueryParams queryParams, - String keyFieldName) { + String keyFieldName, + Scheduler scheduler) { var shardSearcher = luceneMultiSearcher.createShardSearcher(queryParams); return shardSearcher .flatMap(luceneShardSearcher -> luceneShardSearcher - .searchOn(indexSearcher, releaseIndexSearcher, queryParams) - .then(luceneShardSearcher.collect(queryParams, keyFieldName)) + .searchOn(indexSearcher, releaseIndexSearcher, queryParams, scheduler) + .then(luceneShardSearcher.collect(queryParams, keyFieldName, scheduler)) ); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java index 39ae902..32546e0 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java @@ -11,9 +11,11 @@ public interface LuceneLocalSearcher { * @param indexSearcher Lucene index searcher * @param queryParams the query parameters * @param keyFieldName the name of the key field + * @param scheduler a blocking scheduler */ Mono collect(IndexSearcher indexSearcher, Mono releaseIndexSearcher, LocalQueryParams queryParams, - String keyFieldName); + String keyFieldName, + Scheduler scheduler); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneShardSearcher.java index 1f0102e..e9ab9ca 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneShardSearcher.java @@ -10,14 +10,17 @@ public interface LuceneShardSearcher { /** * @param indexSearcher the index searcher, which contains all the lucene data * @param queryParams the query parameters + * @param scheduler a blocking scheduler */ Mono searchOn(IndexSearcher indexSearcher, Mono indexSearcherRelease, - LocalQueryParams queryParams); + LocalQueryParams queryParams, + Scheduler scheduler); /** * @param queryParams the query parameters * @param keyFieldName the name of the key field + * @param collectorScheduler a blocking scheduler */ - Mono collect(LocalQueryParams queryParams, String keyFieldName); + Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler collectorScheduler); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java index 39f68d3..fb97c38 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java @@ -48,25 +48,28 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { @Override public Mono searchOn(IndexSearcher indexSearcher, Mono releaseIndexSearcher, - LocalQueryParams queryParams) { + LocalQueryParams queryParams, + Scheduler scheduler) { return Mono.fromCallable(() -> { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called searchOn in a nonblocking thread"); } TopFieldCollector collector; synchronized (lock) { + //noinspection BlockingMethodInNonBlockingContext collector = firstPageSharedManager.newCollector(); indexSearchersArray.add(indexSearcher); indexSearcherReleasersArray.add(releaseIndexSearcher); collectors.add(collector); } + //noinspection BlockingMethodInNonBlockingContext indexSearcher.search(luceneQuery, collector); return null; - }); + }).subscribeOn(scheduler); } @Override - public Mono collect(LocalQueryParams queryParams, String keyFieldName) { + public Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler collectorScheduler) { if (Schedulers.isInNonBlockingThread()) { return Mono.error(() -> new UnsupportedOperationException("Called collect in a nonblocking thread")); } @@ -88,7 +91,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { indexSearchers = IndexSearchers.of(indexSearchersArray); } Flux firstPageHits = LuceneUtils - .convertHits(result.scoreDocs, indexSearchers, keyFieldName, true); + .convertHits(result.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true); Flux nextHits = Flux.defer(() -> { if (paginationInfo.forceSinglePage() @@ -168,8 +171,9 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { emitter.onCancel(cancelEvent::tryEmitEmpty); }) + .subscribeOn(collectorScheduler) .flatMapSequential(topFieldDoc -> LuceneUtils - .convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, true) + .convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true) ); }); @@ -179,7 +183,8 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { //.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), release ); - }); + }) + .subscribeOn(collectorScheduler); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java index 385ac86..def9f5a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java @@ -23,7 +23,8 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { public Mono collect(IndexSearcher indexSearcher, Mono releaseIndexSearcher, LocalQueryParams queryParams, - String keyFieldName) { + String keyFieldName, + Scheduler scheduler) { return Mono .fromCallable(() -> { if (Schedulers.isInNonBlockingThread()) { @@ -45,6 +46,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { LuceneUtils.totalHitsThreshold(), !paginationInfo.forceSinglePage(), queryParams.isScored()); + //noinspection BlockingMethodInNonBlockingContext indexSearcher.search(queryParams.query(), firstPageCollector); firstPageTopDocs = firstPageCollector.topDocs(LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()), LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()) @@ -55,6 +57,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { firstPageTopDocs.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, + scheduler, true ) .take(queryParams.limit(), true); @@ -97,8 +100,9 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { }, s -> {} ) + .subscribeOn(scheduler) .flatMapSequential(topFieldDoc -> LuceneUtils - .convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, true) + .convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler, true) ) ); } @@ -116,6 +120,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { //.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), releaseIndexSearcher ); - }); + }) + .subscribeOn(scheduler); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java index 4c2073a..6583cf0 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java @@ -41,25 +41,28 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { @Override public Mono searchOn(IndexSearcher indexSearcher, Mono releaseIndexSearcher, - LocalQueryParams queryParams) { + LocalQueryParams queryParams, + Scheduler scheduler) { return Mono.fromCallable(() -> { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called searchOn in a nonblocking thread"); } TopDocsCollector collector; synchronized (lock) { + //noinspection BlockingMethodInNonBlockingContext collector = firstPageUnsortedCollectorManager.newCollector(); indexSearchersArray.add(indexSearcher); indexSearcherReleasersArray.add(releaseIndexSearcher); collectors.add(collector); } + //noinspection BlockingMethodInNonBlockingContext indexSearcher.search(luceneQuery, collector); return null; - }); + }).subscribeOn(scheduler); } @Override - public Mono collect(LocalQueryParams queryParams, String keyFieldName) { + public Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) { return Mono .fromCallable(() -> { if (Schedulers.isInNonBlockingThread()) { @@ -68,7 +71,7 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { TopDocs result; Mono release; synchronized (lock) { - + //noinspection BlockingMethodInNonBlockingContext result = firstPageUnsortedCollectorManager.reduce(collectors); release = Mono.when(indexSearcherReleasersArray); } @@ -77,7 +80,7 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { indexSearchers = IndexSearchers.of(indexSearchersArray); } Flux firstPageHits = LuceneUtils - .convertHits(result.scoreDocs, indexSearchers, keyFieldName, false); + .convertHits(result.scoreDocs, indexSearchers, keyFieldName, scheduler, false); Flux nextHits = Flux.defer(() -> { if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { @@ -95,19 +98,23 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { () -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), s.currentPageLimit(), s.last(), LuceneUtils.totalHitsThreshold(), true, queryParams.isScored()), 0, s.currentPageLimit(), queryParams.sort()); - + //noinspection BlockingMethodInNonBlockingContext TopDocs pageTopDocs = Flux .fromIterable(indexSearchersArray) .flatMapSequential(indexSearcher -> Mono .fromCallable(() -> { + //noinspection BlockingMethodInNonBlockingContext var collector = currentPageUnsortedCollectorManager.newCollector(); + //noinspection BlockingMethodInNonBlockingContext indexSearcher.search(luceneQuery, collector); return collector; }) + .subscribeOn(scheduler) ) .collect(Collectors.toCollection(ObjectArrayList::new)) .flatMap(collectors -> Mono .fromCallable(() -> currentPageUnsortedCollectorManager.reduce(collectors)) + .subscribeOn(scheduler) ) .blockOptional().orElseThrow(); @@ -122,8 +129,9 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { }, s -> {} ) + .subscribeOn(scheduler) .flatMapSequential(topFieldDoc -> LuceneUtils - .convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, false) + .convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, scheduler, false) ); }); @@ -132,7 +140,8 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { //.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), release ); - }); + }) + .subscribeOn(scheduler); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java index 34828b0..c54d14f 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java @@ -99,7 +99,8 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult @Override public Mono searchOn(IndexSearcher indexSearcher, Mono releaseIndexSearcher, - LocalQueryParams queryParams) { + LocalQueryParams queryParams, + Scheduler scheduler) { return Mono .fromCallable(() -> { if (Schedulers.isInNonBlockingThread()) { @@ -126,12 +127,14 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult } }); return null; - }); + }) + .subscribeOn(scheduler); } @Override public Mono collect(LocalQueryParams queryParams, - String keyFieldName) { + String keyFieldName, + Scheduler scheduler) { return Mono .fromCallable(() -> { if (Schedulers.isInNonBlockingThread()) { @@ -166,6 +169,7 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult .flatMap(scoreDocs -> LuceneUtils.convertHits(scoreDocs.toArray(ScoreDoc[]::new), indexSearchers, keyFieldName, + scheduler, false ));