From 4c348a6b2f4bd58fd7b574ac4cedda20227cb523 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 5 Sep 2021 14:23:46 +0200 Subject: [PATCH] Throw exception if running in a nonblocking scope --- .../database/disk/LLLocalDictionary.java | 57 +++++++++++++++++++ .../disk/LLLocalKeyValueDatabase.java | 6 ++ .../database/disk/LLLocalSingleton.java | 14 ++++- .../dbengine/lucene/LuceneUtils.java | 25 +++++++- .../searcher/AdaptiveLuceneLocalSearcher.java | 4 ++ .../searcher/CountLuceneLocalSearcher.java | 14 +++-- .../searcher/CountLuceneMultiSearcher.java | 11 +++- .../ScoredSimpleLuceneShardSearcher.java | 20 ++++++- .../ScoringShardsCollectorManager.java | 4 ++ .../searcher/SimpleLuceneLocalSearcher.java | 7 +++ .../lucene/searcher/TopDocsSearcher.java | 1 + .../UnscoredPagedLuceneShardSearcher.java | 7 +++ .../UnscoredTopDocsCollectorManager.java | 4 ++ ...UnsortedContinuousLuceneMultiSearcher.java | 9 +++ 14 files changed, 171 insertions(+), 12 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 7de1d95..4d2ba50 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -287,6 +287,9 @@ public class LLLocalDictionary implements LLDictionary { Send keySend, boolean existsAlmostCertainly) throws RocksDBException { try (var key = keySend.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called dbGet in a nonblocking thread"); + } if (databaseOptions.allowNettyDirect()) { //todo: implement keyMayExist if existsAlmostCertainly is false. @@ -404,6 +407,9 @@ public class LLLocalDictionary implements LLDictionary { try { try (var key = keyToReceive.receive()) { try (var value = valueToReceive.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called dbPut in a nonblocking thread"); + } if (databaseOptions.allowNettyDirect()) { var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send()); try (var ignored1 = keyNioBuffer.buffer().receive()) { @@ -460,6 +466,9 @@ public class LLLocalDictionary implements LLDictionary { AbstractSlice slice2 = null; try { try (var range = rangeSend.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called containsRange in a nonblocking thread"); + } try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); readOpts.setFillCache(false); @@ -525,6 +534,9 @@ public class LLLocalDictionary implements LLDictionary { return Mono.usingWhen(keyMono, keySend -> runOnDb(() -> { try (var key = keySend.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called containsKey in a nonblocking thread"); + } StampedLock lock; long stamp; if (updateMode == UpdateMode.ALLOW) { @@ -623,6 +635,9 @@ public class LLLocalDictionary implements LLDictionary { return Mono.usingWhen(keyMono, keySend -> runOnDb(() -> { try (var key = keySend.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called update in a nonblocking thread"); + } if (updateMode == UpdateMode.DISALLOW) { throw new UnsupportedOperationException("update() is disallowed"); } @@ -762,6 +777,9 @@ public class LLLocalDictionary implements LLDictionary { return Mono.usingWhen(keyMono, keySend -> this.runOnDb(() -> { try (var key = keySend.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called update in a nonblocking thread"); + } if (updateMode == UpdateMode.DISALLOW) { throw new UnsupportedOperationException("update() is disallowed"); } @@ -883,6 +901,9 @@ public class LLLocalDictionary implements LLDictionary { private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, Send keyToReceive) throws RocksDBException { try (var key = keyToReceive.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called dbDelete in a nonblocking thread"); + } var validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS); if (databaseOptions.allowNettyDirect()) { var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send()); @@ -947,6 +968,9 @@ public class LLLocalDictionary implements LLDictionary { keySend -> this .runOnDb(() -> { try (var key = keySend.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called getPreviousData in a nonblocking thread"); + } StampedLock lock; long stamp; if (updateMode == UpdateMode.ALLOW) { @@ -1012,6 +1036,9 @@ public class LLLocalDictionary implements LLDictionary { keyBufsWindow.add(bufferSend.receive()); } try { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called getMulti in a nonblocking thread"); + } Iterable locks; ArrayList stamps; if (updateMode == UpdateMode.ALLOW) { @@ -1088,6 +1115,9 @@ public class LLLocalDictionary implements LLDictionary { entriesWindow.add(entrySend.receive()); } try { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called putMulti in a nonblocking thread"); + } Iterable locks; ArrayList stamps; if (updateMode == UpdateMode.ALLOW) { @@ -1188,6 +1218,9 @@ public class LLLocalDictionary implements LLDictionary { entriesWindow.add(tuple.mapT1(Send::receive)); } try { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called updateMulti in a nonblocking thread"); + } List keyBufsWindow = new ArrayList<>(entriesWindow.size()); for (Tuple2 objects : entriesWindow) { keyBufsWindow.add(objects.getT1()); @@ -1542,6 +1575,9 @@ public class LLLocalDictionary implements LLDictionary { return this .runOnDb(() -> { try (var range = rangeSend.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called setRange in a nonblocking thread"); + } if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) { assert EMPTY_READ_OPTIONS.isOwningHandle(); try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) { @@ -1912,6 +1948,9 @@ public class LLLocalDictionary implements LLDictionary { public Mono clear() { return Mono .fromCallable(() -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called clear in a nonblocking thread"); + } try (var readOpts = new ReadOptions(getReadOptions(null))) { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); @@ -1971,6 +2010,9 @@ public class LLLocalDictionary implements LLDictionary { return Mono.usingWhen(rangeMono, rangeSend -> { try (var range = rangeSend.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called sizeRange in a nonblocking thread"); + } if (range.isAll()) { return this .runOnDb(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)) @@ -2044,6 +2086,9 @@ public class LLLocalDictionary implements LLDictionary { return Mono.usingWhen(rangeMono, rangeSend -> runOnDb(() -> { try (var range = rangeSend.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called getOne in a nonblocking thread"); + } try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { ReleasableSlice minBound; if (range.hasMin()) { @@ -2102,6 +2147,9 @@ public class LLLocalDictionary implements LLDictionary { return Mono.usingWhen(rangeMono, rangeSend -> runOnDb(() -> { try (var range = rangeSend.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called getOneKey in a nonblocking thread"); + } try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { ReleasableSlice minBound; if (range.hasMin()) { @@ -2183,6 +2231,9 @@ public class LLLocalDictionary implements LLDictionary { } private long exactSizeAll(@Nullable LLSnapshot snapshot) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called exactSizeAll in a nonblocking thread"); + } try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { readOpts.setFillCache(false); readOpts.setReadaheadSize(32 * 1024); // 32KiB @@ -2264,6 +2315,9 @@ public class LLLocalDictionary implements LLDictionary { return Mono.usingWhen(rangeMono, rangeSend -> runOnDb(() -> { try (var range = rangeSend.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called removeOne in a nonblocking thread"); + } try (var readOpts = new ReadOptions(getReadOptions(null))) { ReleasableSlice minBound; if (range.hasMin()) { @@ -2325,6 +2379,9 @@ public class LLLocalDictionary implements LLDictionary { RocksDB db, ColumnFamilyHandle cfh) { try (var range = rangeToReceive.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called getRocksIterator in a nonblocking thread"); + } ReleasableSlice sliceMin; ReleasableSlice sliceMax; if (range.hasMin()) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 0fcf6d7..6a7e3f0 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -213,6 +213,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } private void flushDb(RocksDB db, List handles) throws RocksDBException { + if (Schedulers.isInNonBlockingThread()) { + logger.error("Called flushDb in a nonblocking thread"); + } // force flush the database try (var flushOptions = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) { db.flush(flushOptions); @@ -227,6 +230,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { @SuppressWarnings("unused") private void compactDb(RocksDB db, List handles) { + if (Schedulers.isInNonBlockingThread()) { + logger.error("Called compactDb in a nonblocking thread"); + } // force compact the database for (ColumnFamilyHandle cfh : handles) { var t = new Thread(() -> { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index 0f5cef4..334295d 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -13,6 +13,7 @@ import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class LLLocalSingleton implements LLSingleton { @@ -33,6 +34,9 @@ public class LLLocalSingleton implements LLSingleton { this.databaseName = databaseName; this.snapshotResolver = snapshotResolver; this.name = name; + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Initialized in a nonblocking thread"); + } if (db.get(cfh, this.name) == null) { db.put(cfh, this.name, defaultValue); } @@ -49,7 +53,12 @@ public class LLLocalSingleton implements LLSingleton { @Override public Mono get(@Nullable LLSnapshot snapshot) { return Mono - .fromCallable(() -> db.get(cfh, resolveSnapshot(snapshot), name)) + .fromCallable(() -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called get in a nonblocking thread"); + } + return db.get(cfh, resolveSnapshot(snapshot), name); + }) .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)); } @@ -57,6 +66,9 @@ public class LLLocalSingleton implements LLSingleton { public Mono set(byte[] value) { return Mono .fromCallable(() -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called set in a nonblocking thread"); + } db.put(cfh, name, value); return null; }) diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 18227ea..b312bc3 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -190,6 +190,9 @@ public class LuceneUtils { @NotNull public static String keyOfTopDoc(int docId, IndexReader indexReader, String keyFieldName) throws IOException, NoSuchElementException { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called keyOfTopDoc in a nonblocking thread"); + } if (docId > indexReader.maxDoc()) { throw new IOException("Document " + docId + " > maxDoc (" +indexReader.maxDoc() + ")"); } @@ -269,7 +272,16 @@ public class LuceneUtils { } } - public static void readInternalAligned(Object ref, FileChannel channel, long pos, ByteBuffer b, int readLength, int usefulLength, long end) throws IOException { + public static void readInternalAligned(Object ref, + FileChannel channel, + long pos, + ByteBuffer b, + int readLength, + int usefulLength, + long end) throws IOException { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called readInternalAligned in a nonblocking thread"); + } int startBufPosition = b.position(); int readData = 0; int i; @@ -365,7 +377,7 @@ public class LuceneUtils { } else { return hitsFlux .parallel() - .runOn(Schedulers.parallel()) + .runOn(Schedulers.boundedElastic()) .map(hit -> { var result = mapHitBlocking(hit, indexSearchers, keyFieldName); // The "else" value is an errored key score, to filter out next @@ -382,6 +394,9 @@ public class LuceneUtils { private static LLKeyScore mapHitBlocking(ScoreDoc hit, IndexSearchers indexSearchers, String keyFieldName) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called mapHitBlocking in a nonblocking thread"); + } int shardDocId = hit.doc; int shardIndex = hit.shardIndex; float score = hit.score; @@ -413,7 +428,11 @@ public class LuceneUtils { } } - public static TopDocs mergeTopDocs(Sort sort, @Nullable Integer startN, @Nullable Integer topN, TopDocs[] topDocs, Comparator tieBreaker) { + public static TopDocs mergeTopDocs(Sort sort, + @Nullable Integer startN, + @Nullable Integer topN, + TopDocs[] topDocs, + Comparator tieBreaker) { if ((startN == null) != (topN == null)) { throw new IllegalArgumentException("You must pass startN and topN together or nothing"); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java index b6ddfff..1e5364a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.lucene.searcher; import org.apache.lucene.search.IndexSearcher; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher { @@ -17,6 +18,9 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher { Mono releaseIndexSearcher, LocalQueryParams queryParams, String keyFieldName) { + if (Schedulers.isInNonBlockingThread()) { + return Mono.error(() -> new UnsupportedOperationException("Called collect in a nonblocking thread")); + } if (queryParams.limit() == 0) { return countSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName); } else if (!queryParams.isScored() && queryParams.offset() == 0 && queryParams.limit() >= 2147483630 diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java index d3ea3e6..5764bcb 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java @@ -7,6 +7,7 @@ import org.apache.lucene.search.IndexSearcher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class CountLuceneLocalSearcher implements LuceneLocalSearcher { @@ -16,10 +17,15 @@ public class CountLuceneLocalSearcher implements LuceneLocalSearcher { LocalQueryParams queryParams, String keyFieldName) { return Mono - .fromCallable(() -> new LuceneSearchResult( - TotalHitsCount.of(indexSearcher.count(queryParams.query()), true), - Flux.empty(), - releaseIndexSearcher) + .fromCallable(() -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called collect in a nonblocking thread"); + } + return new LuceneSearchResult( + TotalHitsCount.of(indexSearcher.count(queryParams.query()), true), + Flux.empty(), + releaseIndexSearcher); + } ); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java index 80e9893..42814f3 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneMultiSearcher.java @@ -11,6 +11,7 @@ import org.apache.lucene.search.Query; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class CountLuceneMultiSearcher implements LuceneMultiSearcher { @@ -35,7 +36,15 @@ public class CountLuceneMultiSearcher implements LuceneMultiSearcher { @Override public Mono collect(LocalQueryParams queryParams, String keyFieldName) { - return Mono.fromCallable(() -> new LuceneSearchResult(TotalHitsCount.of(totalHits.get(), true), Flux.empty(), Mono.when(release))); + return Mono.fromCallable(() -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called collect in a nonblocking thread"); + } + return new LuceneSearchResult(TotalHitsCount.of(totalHits.get(), true), + Flux.empty(), + Mono.when(release) + ); + }); } }; }); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java index 8e20179..39f68d3 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java @@ -50,6 +50,9 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { Mono releaseIndexSearcher, LocalQueryParams queryParams) { return Mono.fromCallable(() -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called searchOn in a nonblocking thread"); + } TopFieldCollector collector; synchronized (lock) { collector = firstPageSharedManager.newCollector(); @@ -64,10 +67,12 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { @Override public Mono collect(LocalQueryParams queryParams, String keyFieldName) { + if (Schedulers.isInNonBlockingThread()) { + return Mono.error(() -> new UnsupportedOperationException("Called collect in a nonblocking thread")); + } if (!queryParams.isScored()) { - return Mono.error( - new UnsupportedOperationException("Can't execute an unscored query with a scored lucene shard searcher") - ); + return Mono.error(() -> new UnsupportedOperationException("Can't execute an unscored query" + + " with a scored lucene shard searcher")); } return Mono .fromCallable(() -> { @@ -92,10 +97,19 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { } return Flux .create(emitter -> { + if (Schedulers.isInNonBlockingThread()) { + emitter.error(new UnsupportedOperationException("Called collect in a nonblocking thread")); + return; + } Empty cancelEvent = Sinks.empty(); AtomicReference currentPageInfoAtomicReference = new AtomicReference<>(new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1)); emitter.onRequest(requests -> { + if (Schedulers.isInNonBlockingThread()) { + emitter.error(new UnsupportedOperationException("Called collect" + + ", onRequest in a nonblocking thread")); + return; + } synchronized (currentPageInfoAtomicReference) { var s = currentPageInfoAtomicReference.get(); while (requests > 0 && !emitter.isCancelled()) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java index 76adb57..e9de6e5 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java @@ -13,6 +13,7 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TopFieldDocs; import org.jetbrains.annotations.Nullable; +import reactor.core.scheduler.Schedulers; public class ScoringShardsCollectorManager implements CollectorManager { @@ -62,6 +63,9 @@ public class ScoringShardsCollectorManager implements CollectorManager collectors) throws IOException { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called reduce in a nonblocking thread"); + } TopDocs result; if (sort != null) { TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()]; diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java index dc4fd3b..385ac86 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java @@ -15,6 +15,7 @@ import org.apache.lucene.search.TopDocsCollector; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { @@ -25,6 +26,9 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { String keyFieldName) { return Mono .fromCallable(() -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called collect in a nonblocking thread"); + } Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); PaginationInfo paginationInfo; if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) { @@ -64,6 +68,9 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { .generate( () -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), (s, sink) -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called collect in a nonblocking thread"); + } if (s.last() != null && s.remainingLimit() > 0) { TopDocs pageTopDocs; try { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java index 8901011..372d574 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java @@ -23,6 +23,7 @@ import org.apache.lucene.search.TopDocsCollector; import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TopScoreDocCollector; import org.apache.lucene.search.TotalHits.Relation; +import reactor.core.scheduler.Schedulers; class TopDocsSearcher { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java index 46904a9..4c2073a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java @@ -18,6 +18,7 @@ import org.apache.lucene.search.TopDocsCollector; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { @@ -42,6 +43,9 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { Mono releaseIndexSearcher, LocalQueryParams queryParams) { return Mono.fromCallable(() -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called searchOn in a nonblocking thread"); + } TopDocsCollector collector; synchronized (lock) { collector = firstPageUnsortedCollectorManager.newCollector(); @@ -58,6 +62,9 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { public Mono collect(LocalQueryParams queryParams, String keyFieldName) { return Mono .fromCallable(() -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called collect in a nonblocking thread"); + } TopDocs result; Mono release; synchronized (lock) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredTopDocsCollectorManager.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredTopDocsCollectorManager.java index 94940ef..f158c73 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredTopDocsCollectorManager.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredTopDocsCollectorManager.java @@ -15,6 +15,7 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocsCollector; import org.apache.lucene.search.TopFieldDocs; import org.jetbrains.annotations.Nullable; +import reactor.core.scheduler.Schedulers; public class UnscoredTopDocsCollectorManager implements CollectorManager, TopDocs> { @@ -41,6 +42,9 @@ public class UnscoredTopDocsCollectorManager implements @Override public TopDocs reduce(Collection> collection) throws IOException { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called reduce in a nonblocking thread"); + } int i = 0; TopDocs[] topDocsArray; if (sort != null) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java index 8423c99..4af29f3 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java @@ -50,6 +50,9 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult @Override public void collect(int i) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called collect in a nonblocking thread"); + } var scoreDoc = new ScoreDoc(context.docBase + i, 0, shardIndex); synchronized (scoreDocsSink) { while (scoreDocsSink.tryEmitNext(scoreDoc) == EmitResult.FAIL_OVERFLOW) { @@ -94,6 +97,9 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult LocalQueryParams queryParams) { return Mono .fromCallable(() -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called searchOn in a nonblocking thread"); + } var collector = cm.newCollector(); int collectorShardIndex; synchronized (lock) { @@ -123,6 +129,9 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult String keyFieldName) { return Mono .fromCallable(() -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called collect in a nonblocking thread"); + } synchronized (scoreDocsSink) { decrementRemainingCollectors(scoreDocsSink, remainingCollectors); }