From 1882e8b3005b23fe3939f06a64c70117370f9ef9 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 4 Sep 2021 16:42:47 +0200 Subject: [PATCH] Optimize performance by removing all superfluous subscribeOn calls --- .../database/disk/LLLocalDictionary.java | 32 ++++---- .../disk/LLLocalKeyValueDatabase.java | 36 +++------ .../database/disk/LLLocalLuceneIndex.java | 51 +++--------- .../disk/LLLocalMultiLuceneIndex.java | 4 +- .../database/disk/LLLocalSingleton.java | 9 +-- .../dbengine/lucene/LuceneUtils.java | 5 +- .../searcher/AdaptiveLuceneLocalSearcher.java | 10 +-- .../searcher/CountLuceneLocalSearcher.java | 7 +- .../searcher/CountLuceneMultiSearcher.java | 9 +-- .../lucene/searcher/LocalLuceneWrapper.java | 7 +- .../lucene/searcher/LuceneLocalSearcher.java | 4 +- .../lucene/searcher/LuceneShardSearcher.java | 7 +- .../ScoredSimpleLuceneShardSearcher.java | 50 ++++++------ .../searcher/SimpleLuceneLocalSearcher.java | 79 +++++++++---------- .../UnscoredPagedLuceneShardSearcher.java | 27 +++---- ...UnsortedContinuousLuceneMultiSearcher.java | 13 +-- 16 files changed, 133 insertions(+), 217 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 436c1e4..7de1d95 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -68,6 +68,7 @@ import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuple3; import reactor.util.function.Tuple4; @@ -142,7 +143,6 @@ public class LLLocalDictionary implements LLDictionary { private final ColumnFamilyHandle cfh; private final String databaseName; private final String columnName; - private final Scheduler dbScheduler; private final Function snapshotResolver; private final Striped itemsLock = Striped.readWriteStampedLock(STRIPES); private final UpdateMode updateMode; @@ -157,7 +157,6 @@ public class LLLocalDictionary implements LLDictionary { @NotNull ColumnFamilyHandle columnFamilyHandle, String databaseName, String columnName, - Scheduler dbScheduler, Function snapshotResolver, UpdateMode updateMode, DatabaseOptions databaseOptions) { @@ -167,7 +166,6 @@ public class LLLocalDictionary implements LLDictionary { this.cfh = columnFamilyHandle; this.databaseName = databaseName; this.columnName = columnName; - this.dbScheduler = dbScheduler; this.snapshotResolver = snapshotResolver; this.updateMode = updateMode; this.getRangeMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeMulti"; @@ -243,7 +241,7 @@ public class LLLocalDictionary implements LLDictionary { } private @NotNull Mono runOnDb(Callable<@Nullable T> callable) { - return Mono.fromCallable(callable).subscribeOn(dbScheduler); + return Mono.fromCallable(callable); } @Override @@ -327,6 +325,7 @@ public class LLLocalDictionary implements LLDictionary { // If it's smaller or equals it means that RocksDB is overwriting // the beginning of the result buffer. assert resultNioBuf.limit() > assertionReadData; + //noinspection ConstantConditions if (ASSERTIONS_ENABLED) { assertionReadData = resultNioBuf.limit(); } @@ -515,6 +514,8 @@ public class LLLocalDictionary implements LLDictionary { if (direct1 != null) PlatformDependent.freeDirectBuffer(direct1); if (direct2 != null) PlatformDependent.freeDirectBuffer(direct2); if (direct3 != null) PlatformDependent.freeDirectBuffer(direct3); + if (slice1 != null) slice1.close(); + if (slice2 != null) slice2.close(); } }).onErrorMap(cause -> new IOException("Failed to read range", cause)), rangeSend -> Mono.fromRunnable(rangeSend::close)); @@ -1386,7 +1387,7 @@ public class LLLocalDictionary implements LLDictionary { rangeSend -> Flux.using( () -> new LLLocalEntryReactiveRocksIterator(db, alloc, cfh, rangeSend, databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeMultiDebugName), - llLocalEntryReactiveRocksIterator -> llLocalEntryReactiveRocksIterator.flux().subscribeOn(dbScheduler), + LLLocalReactiveRocksIterator::flux, LLLocalReactiveRocksIterator::release ).transform(LLUtils::handleDiscard), rangeSend -> Mono.fromRunnable(rangeSend::close) @@ -1398,7 +1399,7 @@ public class LLLocalDictionary implements LLDictionary { rangeSend -> Flux.using( () -> new LLLocalGroupedEntryReactiveRocksIterator(db, alloc, cfh, prefixLength, rangeSend, databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), "getRangeMultiGrouped"), - reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler), + LLLocalGroupedReactiveRocksIterator::flux, LLLocalGroupedReactiveRocksIterator::release ).transform(LLUtils::handleDiscard), rangeSend -> Mono.fromRunnable(rangeSend::close) @@ -1429,7 +1430,7 @@ public class LLLocalDictionary implements LLDictionary { rangeSend -> Flux.using( () -> new LLLocalGroupedKeyReactiveRocksIterator(db, alloc, cfh, prefixLength, rangeSend, databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), "getRangeKeysGrouped"), - reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler), + LLLocalGroupedReactiveRocksIterator::flux, LLLocalGroupedReactiveRocksIterator::release ).transform(LLUtils::handleDiscard), rangeSend -> Mono.fromRunnable(rangeSend::close) @@ -1440,7 +1441,7 @@ public class LLLocalDictionary implements LLDictionary { public Flux badBlocks(Mono> rangeMono) { return Flux.usingWhen(rangeMono, rangeSend -> Flux - .create(sink -> { + .create(sink -> { var range = rangeSend.receive(); sink.onDispose(range::close); try (var ro = new ReadOptions(getReadOptions(null))) { @@ -1477,8 +1478,7 @@ public class LLLocalDictionary implements LLDictionary { } catch (Throwable ex) { sink.error(ex); } - }) - .subscribeOn(dbScheduler), + }), rangeSend -> Mono.fromRunnable(rangeSend::close) ); } @@ -1498,10 +1498,9 @@ public class LLLocalDictionary implements LLDictionary { true, "getRangeKeysGrouped" ), - it -> it.flux(), - it -> it.release() - ) - .subscribeOn(dbScheduler), + LLLocalKeyPrefixReactiveRocksIterator::flux, + LLLocalKeyPrefixReactiveRocksIterator::release + ), rangeSend -> Mono.fromRunnable(rangeSend::close) ); } @@ -1528,7 +1527,7 @@ public class LLLocalDictionary implements LLDictionary { rangeSend -> Flux.using( () -> new LLLocalKeyReactiveRocksIterator(db, alloc, cfh, rangeSend, databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeKeysMultiDebugName), - llLocalKeyReactiveRocksIterator -> llLocalKeyReactiveRocksIterator.flux().subscribeOn(dbScheduler), + LLLocalReactiveRocksIterator::flux, LLLocalReactiveRocksIterator::release ).transform(LLUtils::handleDiscard), rangeSend -> Mono.fromRunnable(rangeSend::close) @@ -1963,8 +1962,7 @@ public class LLLocalDictionary implements LLDictionary { return null; } }) - .onErrorMap(cause -> new IOException("Failed to clear", cause)) - .subscribeOn(dbScheduler); + .onErrorMap(cause -> new IOException("Failed to clear", cause)); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 8da4535..0fcf6d7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -65,7 +66,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { RocksDB.DEFAULT_COLUMN_FAMILY); private final BufferAllocator allocator; - private final Scheduler dbScheduler; // Configurations @@ -112,6 +112,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } // Get databases directory path + Objects.requireNonNull(path); Path databasesDirPath = path.toAbsolutePath().getParent(); String dbPathString = databasesDirPath.toString() + File.separatorChar + path.getFileName(); Path dbPath = Paths.get(dbPathString); @@ -127,12 +128,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // 8 or more threadCap = Math.max(8, Runtime.getRuntime().availableProcessors()); } - this.dbScheduler = Schedulers.newBoundedElastic(threadCap, - Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - "db-" + name, - 60, - true - ); this.enableColumnsBug = "true".equals(databaseOptions.extraFlags().getOrDefault("enableColumnBug", "false")); createIfNotExists(descriptors, rocksdbOptions, databaseOptions, dbPath, dbPathString); @@ -300,6 +295,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { options.setAllowFAllocate(true); options.setRateLimiter(new RateLimiter(10L * 1024L * 1024L)); // 10MiB/s max compaction write speed + Objects.requireNonNull(databasesDirPath); + Objects.requireNonNull(path.getFileName()); List paths = List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"), 10L * 1024L * 1024L * 1024L), // 10GiB new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"), @@ -455,11 +452,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), LLLocalKeyValueDatabase.this.name, name, - dbScheduler, defaultValue )) - .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)) - .subscribeOn(dbScheduler); + .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)); } @Override @@ -471,12 +466,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { getCfh(columnName), name, Column.toString(columnName), - dbScheduler, (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), updateMode, databaseOptions - )) - .subscribeOn(dbScheduler); + )); } private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException { @@ -492,8 +485,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { @Override public Mono getProperty(String propertyName) { return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName)) - .onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause)) - .subscribeOn(dbScheduler); + .onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause)); } @Override @@ -504,8 +496,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return null; }) .onErrorMap(cause -> new IOException("Failed to verify checksum of database \"" - + getDatabaseName() + "\"", cause)) - .subscribeOn(dbScheduler); + + getDatabaseName() + "\"", cause)); } @Override @@ -521,22 +512,20 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement(); this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); return new LLSnapshot(currentSnapshotSequenceNumber); - }) - .subscribeOn(dbScheduler); + }); } @Override public Mono releaseSnapshot(LLSnapshot snapshot) { return Mono - .fromCallable(() -> { + .fromCallable(() -> { Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber()); if (dbSnapshot == null) { throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!"); } db.releaseSnapshot(dbSnapshot); return null; - }) - .subscribeOn(dbScheduler); + }); } @Override @@ -551,8 +540,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } return null; }) - .onErrorMap(cause -> new IOException("Failed to close", cause)) - .subscribeOn(dbScheduler); + .onErrorMap(cause -> new IOException("Failed to close", cause)); } /** diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 1f83901..720354b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -93,16 +93,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { Integer.MAX_VALUE, true ); - // Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks - private final Scheduler luceneSearcherScheduler = Schedulers.newBoundedElastic( - 4, - Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - "lucene-searcher", - 60, - true - ); - // Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks - private static final Scheduler luceneWriterScheduler = Schedulers.boundedElastic(); private final String luceneIndexName; private final SnapshotDeletionPolicy snapshotter; @@ -209,11 +199,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND); indexWriterConfig.setIndexDeletionPolicy(snapshotter); indexWriterConfig.setCommitOnClose(true); - int writerSchedulerMaxThreadCount; MergeScheduler mergeScheduler; if (lowMemory) { mergeScheduler = new SerialMergeScheduler(); - writerSchedulerMaxThreadCount = 1; } else { var concurrentMergeScheduler = new ConcurrentMergeScheduler(); concurrentMergeScheduler.setDefaultMaxMergesAndThreads(false); @@ -222,7 +210,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } else { concurrentMergeScheduler.enableAutoIOThrottle(); } - writerSchedulerMaxThreadCount = concurrentMergeScheduler.getMaxThreadCount(); mergeScheduler = concurrentMergeScheduler; } indexWriterConfig.setMergeScheduler(mergeScheduler); @@ -236,14 +223,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { new SearcherFactory() ); - /*this.luceneWriterScheduler = Schedulers.newBoundedElastic( - writerSchedulerMaxThreadCount, - Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - "lucene-writer", - 60, - true - );*/ - // Create scheduled tasks lifecycle manager this.scheduledTasksLifecycle = new ScheduledTaskLifecycle(); @@ -370,23 +349,21 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono addDocument(LLTerm key, LLDocument doc) { - return Mono.fromCallable(() -> { + return Mono.fromCallable(() -> { scheduledTasksLifecycle.startScheduledTask(); try { - //noinspection BlockingMethodInNonBlockingContext indexWriter.addDocument(LLUtils.toDocument(doc)); return null; } finally { scheduledTasksLifecycle.endScheduledTask(); } - }).subscribeOn(luceneWriterScheduler); + }); } @Override public Mono addDocuments(Flux> documents) { return documents .collectList() - .publishOn(luceneWriterScheduler) .flatMap(documentsList -> Mono .fromCallable(() -> { scheduledTasksLifecycle.startScheduledTask(); @@ -403,30 +380,28 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono deleteDocument(LLTerm id) { - return Mono.fromCallable(() -> { + return Mono.fromCallable(() -> { scheduledTasksLifecycle.startScheduledTask(); try { - //noinspection BlockingMethodInNonBlockingContext indexWriter.deleteDocuments(LLUtils.toTerm(id)); return null; } finally { scheduledTasksLifecycle.endScheduledTask(); } - }).subscribeOn(luceneWriterScheduler); + }); } @Override public Mono updateDocument(LLTerm id, LLDocument document) { - return Mono.fromCallable(() -> { + return Mono.fromCallable(() -> { scheduledTasksLifecycle.startScheduledTask(); try { - //noinspection BlockingMethodInNonBlockingContext indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)); } finally { scheduledTasksLifecycle.endScheduledTask(); } return null; - }).subscribeOn(luceneWriterScheduler); + }); } @Override @@ -436,21 +411,19 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private Mono updateDocuments(Map documentsMap) { return Mono - .fromCallable(() -> { + .fromCallable(() -> { scheduledTasksLifecycle.startScheduledTask(); try { for (Entry entry : documentsMap.entrySet()) { LLTerm key = entry.getKey(); LLDocument value = entry.getValue(); - //noinspection BlockingMethodInNonBlockingContext indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value)); } return null; } finally { scheduledTasksLifecycle.endScheduledTask(); } - }) - .subscribeOn(luceneWriterScheduler); + }); } @Override @@ -506,7 +479,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .flatMap(indexSearcher -> { Mono releaseMono = releaseSearcherWrapper(snapshot, indexSearcher); return localSearcher - .collect(indexSearcher, releaseMono, modifiedLocalQuery, keyFieldName, luceneSearcherScheduler) + .collect(indexSearcher, releaseMono, modifiedLocalQuery, keyFieldName) .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release())) .onErrorResume(ex -> releaseMono.then(Mono.error(ex))); }) @@ -522,7 +495,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .flatMap(indexSearcher -> { Mono releaseMono = releaseSearcherWrapper(snapshot, indexSearcher); return shardSearcher - .searchOn(indexSearcher, releaseMono, modifiedLocalQuery, luceneSearcherScheduler) + .searchOn(indexSearcher, releaseMono, modifiedLocalQuery) .onErrorResume(ex -> releaseMono.then(Mono.error(ex))); }) ); @@ -606,7 +579,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .flatMap(indexSearcher -> { Mono releaseMono = releaseSearcherWrapper(snapshot, indexSearcher); return localSearcher - .collect(indexSearcher, releaseMono, localQueryParams, keyFieldName, luceneSearcherScheduler) + .collect(indexSearcher, releaseMono, localQueryParams, keyFieldName) .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release())) .onErrorResume(ex -> releaseMono.then(Mono.error(ex))); }); @@ -619,7 +592,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return this.acquireSearcherWrapper(snapshot) .flatMap(indexSearcher -> { Mono releaseMono = releaseSearcherWrapper(snapshot, indexSearcher); - return shardSearcher.searchOn(indexSearcher, releaseMono, localQueryParams, luceneSearcherScheduler) + return shardSearcher.searchOn(indexSearcher, releaseMono, localQueryParams) .onErrorResume(ex -> releaseMono.then(Mono.error(ex))); }); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 2cca1f3..5f45881 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -218,7 +218,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { .flatMap(luceneIndexWithSnapshot -> luceneIndexWithSnapshot.luceneIndex() .distributedMoreLikeThis(luceneIndexWithSnapshot.snapshot.orElse(null), queryParams, mltDocumentFields, shardSearcher)) // Collect all the shards results into a single global result - .then(shardSearcher.collect(localQueryParams, keyFieldName, Schedulers.boundedElastic())) + .then(shardSearcher.collect(localQueryParams, keyFieldName)) ) // Fix the result type .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release())); @@ -246,7 +246,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { .flatMap(luceneIndexWithSnapshot -> luceneIndexWithSnapshot.luceneIndex() .distributedSearch(luceneIndexWithSnapshot.snapshot.orElse(null), queryParams, shardSearcher)) // Collect all the shards results into a single global result - .then(shardSearcher.collect(localQueryParams, keyFieldName, Schedulers.boundedElastic())) + .then(shardSearcher.collect(localQueryParams, keyFieldName)) ) // Fix the result type .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release())); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index c0c860a..0f5cef4 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -22,20 +22,17 @@ public class LLLocalSingleton implements LLSingleton { private final Function snapshotResolver; private final byte[] name; private final String databaseName; - private final Scheduler dbScheduler; public LLLocalSingleton(RocksDB db, ColumnFamilyHandle singletonListColumn, Function snapshotResolver, String databaseName, byte[] name, - Scheduler dbScheduler, byte[] defaultValue) throws RocksDBException { this.db = db; this.cfh = singletonListColumn; this.databaseName = databaseName; this.snapshotResolver = snapshotResolver; this.name = name; - this.dbScheduler = dbScheduler; if (db.get(cfh, this.name) == null) { db.put(cfh, this.name, defaultValue); } @@ -53,8 +50,7 @@ public class LLLocalSingleton implements LLSingleton { public Mono get(@Nullable LLSnapshot snapshot) { return Mono .fromCallable(() -> db.get(cfh, resolveSnapshot(snapshot), name)) - .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)) - .subscribeOn(dbScheduler); + .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)); } @Override @@ -64,8 +60,7 @@ public class LLLocalSingleton implements LLSingleton { db.put(cfh, name, value); return null; }) - .onErrorMap(cause -> new IOException("Failed to write " + Arrays.toString(name), cause)) - .subscribeOn(dbScheduler); + .onErrorMap(cause -> new IOException("Failed to write " + Arrays.toString(name), cause)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 123ad78..18227ea 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -69,6 +69,7 @@ import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class LuceneUtils { @@ -353,7 +354,6 @@ public class LuceneUtils { public static Flux convertHits(ScoreDoc[] hits, IndexSearchers indexSearchers, String keyFieldName, - Scheduler scheduler, boolean preserveOrder) { return Flux @@ -361,12 +361,11 @@ public class LuceneUtils { .transform(hitsFlux -> { if (preserveOrder) { return hitsFlux - .publishOn(scheduler) .mapNotNull(hit -> mapHitBlocking(hit, indexSearchers, keyFieldName)); } else { return hitsFlux .parallel() - .runOn(scheduler) + .runOn(Schedulers.parallel()) .map(hit -> { var result = mapHitBlocking(hit, indexSearchers, keyFieldName); // The "else" value is an errored key score, to filter out next diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java index 5c48b54..b6ddfff 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java @@ -16,20 +16,18 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher { public Mono collect(IndexSearcher indexSearcher, Mono releaseIndexSearcher, LocalQueryParams queryParams, - String keyFieldName, - Scheduler scheduler) { + String keyFieldName) { if (queryParams.limit() == 0) { - return countSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName, scheduler); + return countSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName); } else if (!queryParams.isScored() && queryParams.offset() == 0 && queryParams.limit() >= 2147483630 && !queryParams.isSorted()) { return unscoredPagedLuceneLocalSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, - keyFieldName, - scheduler + keyFieldName ); } else { - return localSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName, scheduler); + return localSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName); } } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java index e5766a4..d3ea3e6 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java @@ -14,15 +14,12 @@ public class CountLuceneLocalSearcher implements LuceneLocalSearcher { public Mono collect(IndexSearcher indexSearcher, Mono releaseIndexSearcher, LocalQueryParams queryParams, - String keyFieldName, - Scheduler scheduler) { - //noinspection BlockingMethodInNonBlockingContext + String keyFieldName) { return Mono .fromCallable(() -> new LuceneSearchResult( TotalHitsCount.of(indexSearcher.count(queryParams.query()), true), Flux.empty(), releaseIndexSearcher) - ) - .subscribeOn(scheduler); + ); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java index d1e9330..80e9893 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java @@ -24,20 +24,17 @@ public class CountLuceneMultiSearcher implements LuceneMultiSearcher { @Override public Mono searchOn(IndexSearcher indexSearcher, Mono releaseIndexSearcher, - LocalQueryParams queryParams, - Scheduler scheduler) { + LocalQueryParams queryParams) { return Mono .fromCallable(() -> { - //noinspection BlockingMethodInNonBlockingContext totalHits.addAndGet(indexSearcher.count(queryParams.query())); release.add(releaseIndexSearcher); return null; - }) - .subscribeOn(scheduler); + }); } @Override - public Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) { + public Mono collect(LocalQueryParams queryParams, String keyFieldName) { return Mono.fromCallable(() -> new LuceneSearchResult(TotalHitsCount.of(totalHits.get(), true), Flux.empty(), Mono.when(release))); } }; diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalLuceneWrapper.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalLuceneWrapper.java index 493cd6b..618387f 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalLuceneWrapper.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalLuceneWrapper.java @@ -16,13 +16,12 @@ public class LocalLuceneWrapper implements LuceneLocalSearcher { public Mono collect(IndexSearcher indexSearcher, Mono releaseIndexSearcher, LocalQueryParams queryParams, - String keyFieldName, - Scheduler scheduler) { + String keyFieldName) { var shardSearcher = luceneMultiSearcher.createShardSearcher(queryParams); return shardSearcher .flatMap(luceneShardSearcher -> luceneShardSearcher - .searchOn(indexSearcher, releaseIndexSearcher, queryParams, scheduler) - .then(luceneShardSearcher.collect(queryParams, keyFieldName, scheduler)) + .searchOn(indexSearcher, releaseIndexSearcher, queryParams) + .then(luceneShardSearcher.collect(queryParams, keyFieldName)) ); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java index 32546e0..39ae902 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java @@ -11,11 +11,9 @@ public interface LuceneLocalSearcher { * @param indexSearcher Lucene index searcher * @param queryParams the query parameters * @param keyFieldName the name of the key field - * @param scheduler a blocking scheduler */ Mono collect(IndexSearcher indexSearcher, Mono releaseIndexSearcher, LocalQueryParams queryParams, - String keyFieldName, - Scheduler scheduler); + String keyFieldName); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneShardSearcher.java index e9ab9ca..1f0102e 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneShardSearcher.java @@ -10,17 +10,14 @@ public interface LuceneShardSearcher { /** * @param indexSearcher the index searcher, which contains all the lucene data * @param queryParams the query parameters - * @param scheduler a blocking scheduler */ Mono searchOn(IndexSearcher indexSearcher, Mono indexSearcherRelease, - LocalQueryParams queryParams, - Scheduler scheduler); + LocalQueryParams queryParams); /** * @param queryParams the query parameters * @param keyFieldName the name of the key field - * @param collectorScheduler a blocking scheduler */ - Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler collectorScheduler); + Mono collect(LocalQueryParams queryParams, String keyFieldName); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java index 24bd9a0..8e20179 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java @@ -48,25 +48,22 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { @Override public Mono searchOn(IndexSearcher indexSearcher, Mono releaseIndexSearcher, - LocalQueryParams queryParams, - Scheduler scheduler) { - return Mono.fromCallable(() -> { + LocalQueryParams queryParams) { + return Mono.fromCallable(() -> { TopFieldCollector collector; synchronized (lock) { - //noinspection BlockingMethodInNonBlockingContext collector = firstPageSharedManager.newCollector(); indexSearchersArray.add(indexSearcher); indexSearcherReleasersArray.add(releaseIndexSearcher); collectors.add(collector); } - //noinspection BlockingMethodInNonBlockingContext indexSearcher.search(luceneQuery, collector); return null; - }).subscribeOn(scheduler); + }); } @Override - public Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler collectorScheduler) { + public Mono collect(LocalQueryParams queryParams, String keyFieldName) { if (!queryParams.isScored()) { return Mono.error( new UnsupportedOperationException("Can't execute an unscored query with a scored lucene shard searcher") @@ -86,7 +83,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { indexSearchers = IndexSearchers.of(indexSearchersArray); } Flux firstPageHits = LuceneUtils - .convertHits(result.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true); + .convertHits(result.scoreDocs, indexSearchers, keyFieldName, true); Flux nextHits = Flux.defer(() -> { if (paginationInfo.forceSinglePage() @@ -115,21 +112,24 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { TopDocs pageTopDocs = Flux .fromIterable(indexSearchersArray) .index() - .flatMapSequential(tuple -> Mono - .fromCallable(() -> { - long shardIndex = tuple.getT1(); - IndexSearcher indexSearcher = tuple.getT2(); - TopFieldCollector collector = sharedManager.newCollector(); - indexSearcher.search(luceneQuery, collector); - return collector; - }) - .subscribeOn(Schedulers.immediate()) - ) + .handle((tuple, sink) -> { + try { + IndexSearcher indexSearcher = tuple.getT2(); + TopFieldCollector collector = sharedManager.newCollector(); + indexSearcher.search(luceneQuery, collector); + sink.next(collector); + } catch (Exception ex) { + sink.error(ex); + } + }) .collect(Collectors.toCollection(ObjectArrayList::new)) - .flatMap(collectors -> Mono - .fromCallable(() -> sharedManager.reduce(collectors)) - .subscribeOn(Schedulers.immediate()) - ) + .handle((collectors, sink) -> { + try { + sink.next(sharedManager.reduce(collectors)); + } catch (Exception ex) { + sink.error(ex); + } + }) .single() .takeUntilOther(cancelEvent.asMono()) .block(); @@ -154,9 +154,8 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { emitter.onCancel(cancelEvent::tryEmitEmpty); }) - .subscribeOn(collectorScheduler) .flatMapSequential(topFieldDoc -> LuceneUtils - .convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true) + .convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, true) ); }); @@ -166,8 +165,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { //.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), release ); - }) - .subscribeOn(Schedulers.boundedElastic()); + }); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java index 3d30412..dc4fd3b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java @@ -22,8 +22,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { public Mono collect(IndexSearcher indexSearcher, Mono releaseIndexSearcher, LocalQueryParams queryParams, - String keyFieldName, - Scheduler scheduler) { + String keyFieldName) { return Mono .fromCallable(() -> { Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); @@ -42,7 +41,6 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { LuceneUtils.totalHitsThreshold(), !paginationInfo.forceSinglePage(), queryParams.isScored()); - //noinspection BlockingMethodInNonBlockingContext indexSearcher.search(queryParams.query(), firstPageCollector); firstPageTopDocs = firstPageCollector.topDocs(LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()), LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()) @@ -53,7 +51,6 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { firstPageTopDocs.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, - scheduler, true ) .take(queryParams.limit(), true); @@ -63,43 +60,40 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { nextHits = null; } else { - nextHits = Flux.defer(() -> { - return Flux - .generate( - () -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), - (s, sink) -> { - if (s.last() != null && s.remainingLimit() > 0) { - TopDocs pageTopDocs; - try { - TopDocsCollector collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(), - s.currentPageLimit(), - s.last(), - LuceneUtils.totalHitsThreshold(), - true, - queryParams.isScored() - ); - //noinspection BlockingMethodInNonBlockingContext - indexSearcher.search(queryParams.query(), collector); - pageTopDocs = collector.topDocs(); - } catch (IOException e) { - sink.error(e); - return EMPTY_STATUS; - } - var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs); - sink.next(pageTopDocs); - return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1); - } else { - sink.complete(); - return EMPTY_STATUS; - } - }, - s -> {} - ) - .subscribeOn(scheduler) - .flatMapSequential(topFieldDoc -> LuceneUtils - .convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler, true) - ); - }); + nextHits = Flux.defer(() -> Flux + .generate( + () -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), + (s, sink) -> { + if (s.last() != null && s.remainingLimit() > 0) { + TopDocs pageTopDocs; + try { + TopDocsCollector collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(), + s.currentPageLimit(), + s.last(), + LuceneUtils.totalHitsThreshold(), + true, + queryParams.isScored() + ); + indexSearcher.search(queryParams.query(), collector); + pageTopDocs = collector.topDocs(); + } catch (IOException e) { + sink.error(e); + return EMPTY_STATUS; + } + var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs); + sink.next(pageTopDocs); + return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1); + } else { + sink.complete(); + return EMPTY_STATUS; + } + }, + s -> {} + ) + .flatMapSequential(topFieldDoc -> LuceneUtils + .convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, true) + ) + ); } Flux combinedFlux; @@ -115,7 +109,6 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { //.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), releaseIndexSearcher ); - }) - .subscribeOn(scheduler); + }); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java index dda79d8..46904a9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java @@ -40,31 +40,28 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { @Override public Mono searchOn(IndexSearcher indexSearcher, Mono releaseIndexSearcher, - LocalQueryParams queryParams, - Scheduler scheduler) { - return Mono.fromCallable(() -> { + LocalQueryParams queryParams) { + return Mono.fromCallable(() -> { TopDocsCollector collector; synchronized (lock) { - //noinspection BlockingMethodInNonBlockingContext collector = firstPageUnsortedCollectorManager.newCollector(); indexSearchersArray.add(indexSearcher); indexSearcherReleasersArray.add(releaseIndexSearcher); collectors.add(collector); } - //noinspection BlockingMethodInNonBlockingContext indexSearcher.search(luceneQuery, collector); return null; - }).subscribeOn(scheduler); + }); } @Override - public Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) { + public Mono collect(LocalQueryParams queryParams, String keyFieldName) { return Mono .fromCallable(() -> { TopDocs result; Mono release; synchronized (lock) { - //noinspection BlockingMethodInNonBlockingContext + result = firstPageUnsortedCollectorManager.reduce(collectors); release = Mono.when(indexSearcherReleasersArray); } @@ -73,7 +70,7 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { indexSearchers = IndexSearchers.of(indexSearchersArray); } Flux firstPageHits = LuceneUtils - .convertHits(result.scoreDocs, indexSearchers, keyFieldName, scheduler, false); + .convertHits(result.scoreDocs, indexSearchers, keyFieldName, false); Flux nextHits = Flux.defer(() -> { if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { @@ -91,23 +88,19 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { () -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), s.currentPageLimit(), s.last(), LuceneUtils.totalHitsThreshold(), true, queryParams.isScored()), 0, s.currentPageLimit(), queryParams.sort()); - //noinspection BlockingMethodInNonBlockingContext + TopDocs pageTopDocs = Flux .fromIterable(indexSearchersArray) .flatMapSequential(indexSearcher -> Mono .fromCallable(() -> { - //noinspection BlockingMethodInNonBlockingContext var collector = currentPageUnsortedCollectorManager.newCollector(); - //noinspection BlockingMethodInNonBlockingContext indexSearcher.search(luceneQuery, collector); return collector; }) - .subscribeOn(scheduler) ) .collect(Collectors.toCollection(ObjectArrayList::new)) .flatMap(collectors -> Mono .fromCallable(() -> currentPageUnsortedCollectorManager.reduce(collectors)) - .subscribeOn(scheduler) ) .blockOptional().orElseThrow(); @@ -122,9 +115,8 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { }, s -> {} ) - .subscribeOn(scheduler) .flatMapSequential(topFieldDoc -> LuceneUtils - .convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, scheduler, false) + .convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, false) ); }); @@ -133,8 +125,7 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { //.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)), release ); - }) - .subscribeOn(scheduler); + }); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java index 537c4e1..8423c99 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java @@ -91,11 +91,9 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult @Override public Mono searchOn(IndexSearcher indexSearcher, Mono releaseIndexSearcher, - LocalQueryParams queryParams, - Scheduler scheduler) { + LocalQueryParams queryParams) { return Mono - .fromCallable(() -> { - //noinspection BlockingMethodInNonBlockingContext + .fromCallable(() -> { var collector = cm.newCollector(); int collectorShardIndex; synchronized (lock) { @@ -117,14 +115,12 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult } }); return null; - }) - .subscribeOn(scheduler); + }); } @Override public Mono collect(LocalQueryParams queryParams, - String keyFieldName, - Scheduler scheduler) { + String keyFieldName) { return Mono .fromCallable(() -> { synchronized (scoreDocsSink) { @@ -156,7 +152,6 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult .flatMap(scoreDocs -> LuceneUtils.convertHits(scoreDocs.toArray(ScoreDoc[]::new), indexSearchers, keyFieldName, - scheduler, false ));