Throw exception if running in a nonblocking scope

This commit is contained in:
Andrea Cavalli 2021-09-05 14:23:46 +02:00
parent 1882e8b300
commit 4c348a6b2f
14 changed files with 171 additions and 12 deletions

View File

@ -287,6 +287,9 @@ public class LLLocalDictionary implements LLDictionary {
Send<Buffer> keySend, Send<Buffer> keySend,
boolean existsAlmostCertainly) throws RocksDBException { boolean existsAlmostCertainly) throws RocksDBException {
try (var key = keySend.receive()) { try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called dbGet in a nonblocking thread");
}
if (databaseOptions.allowNettyDirect()) { if (databaseOptions.allowNettyDirect()) {
//todo: implement keyMayExist if existsAlmostCertainly is false. //todo: implement keyMayExist if existsAlmostCertainly is false.
@ -404,6 +407,9 @@ public class LLLocalDictionary implements LLDictionary {
try { try {
try (var key = keyToReceive.receive()) { try (var key = keyToReceive.receive()) {
try (var value = valueToReceive.receive()) { try (var value = valueToReceive.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called dbPut in a nonblocking thread");
}
if (databaseOptions.allowNettyDirect()) { if (databaseOptions.allowNettyDirect()) {
var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send()); var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send());
try (var ignored1 = keyNioBuffer.buffer().receive()) { try (var ignored1 = keyNioBuffer.buffer().receive()) {
@ -460,6 +466,9 @@ public class LLLocalDictionary implements LLDictionary {
AbstractSlice<?> slice2 = null; AbstractSlice<?> slice2 = null;
try { try {
try (var range = rangeSend.receive()) { try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called containsRange in a nonblocking thread");
}
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(false); readOpts.setFillCache(false);
@ -525,6 +534,9 @@ public class LLLocalDictionary implements LLDictionary {
return Mono.usingWhen(keyMono, return Mono.usingWhen(keyMono,
keySend -> runOnDb(() -> { keySend -> runOnDb(() -> {
try (var key = keySend.receive()) { try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called containsKey in a nonblocking thread");
}
StampedLock lock; StampedLock lock;
long stamp; long stamp;
if (updateMode == UpdateMode.ALLOW) { if (updateMode == UpdateMode.ALLOW) {
@ -623,6 +635,9 @@ public class LLLocalDictionary implements LLDictionary {
return Mono.usingWhen(keyMono, return Mono.usingWhen(keyMono,
keySend -> runOnDb(() -> { keySend -> runOnDb(() -> {
try (var key = keySend.receive()) { try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
if (updateMode == UpdateMode.DISALLOW) { if (updateMode == UpdateMode.DISALLOW) {
throw new UnsupportedOperationException("update() is disallowed"); throw new UnsupportedOperationException("update() is disallowed");
} }
@ -762,6 +777,9 @@ public class LLLocalDictionary implements LLDictionary {
return Mono.usingWhen(keyMono, return Mono.usingWhen(keyMono,
keySend -> this.runOnDb(() -> { keySend -> this.runOnDb(() -> {
try (var key = keySend.receive()) { try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
if (updateMode == UpdateMode.DISALLOW) { if (updateMode == UpdateMode.DISALLOW) {
throw new UnsupportedOperationException("update() is disallowed"); throw new UnsupportedOperationException("update() is disallowed");
} }
@ -883,6 +901,9 @@ public class LLLocalDictionary implements LLDictionary {
private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, Send<Buffer> keyToReceive) private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, Send<Buffer> keyToReceive)
throws RocksDBException { throws RocksDBException {
try (var key = keyToReceive.receive()) { try (var key = keyToReceive.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called dbDelete in a nonblocking thread");
}
var validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS); var validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS);
if (databaseOptions.allowNettyDirect()) { if (databaseOptions.allowNettyDirect()) {
var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send()); var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send());
@ -947,6 +968,9 @@ public class LLLocalDictionary implements LLDictionary {
keySend -> this keySend -> this
.runOnDb(() -> { .runOnDb(() -> {
try (var key = keySend.receive()) { try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getPreviousData in a nonblocking thread");
}
StampedLock lock; StampedLock lock;
long stamp; long stamp;
if (updateMode == UpdateMode.ALLOW) { if (updateMode == UpdateMode.ALLOW) {
@ -1012,6 +1036,9 @@ public class LLLocalDictionary implements LLDictionary {
keyBufsWindow.add(bufferSend.receive()); keyBufsWindow.add(bufferSend.receive());
} }
try { try {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getMulti in a nonblocking thread");
}
Iterable<StampedLock> locks; Iterable<StampedLock> locks;
ArrayList<Long> stamps; ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) { if (updateMode == UpdateMode.ALLOW) {
@ -1088,6 +1115,9 @@ public class LLLocalDictionary implements LLDictionary {
entriesWindow.add(entrySend.receive()); entriesWindow.add(entrySend.receive());
} }
try { try {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called putMulti in a nonblocking thread");
}
Iterable<StampedLock> locks; Iterable<StampedLock> locks;
ArrayList<Long> stamps; ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) { if (updateMode == UpdateMode.ALLOW) {
@ -1188,6 +1218,9 @@ public class LLLocalDictionary implements LLDictionary {
entriesWindow.add(tuple.mapT1(Send::receive)); entriesWindow.add(tuple.mapT1(Send::receive));
} }
try { try {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called updateMulti in a nonblocking thread");
}
List<Buffer> keyBufsWindow = new ArrayList<>(entriesWindow.size()); List<Buffer> keyBufsWindow = new ArrayList<>(entriesWindow.size());
for (Tuple2<Buffer, X> objects : entriesWindow) { for (Tuple2<Buffer, X> objects : entriesWindow) {
keyBufsWindow.add(objects.getT1()); keyBufsWindow.add(objects.getT1());
@ -1542,6 +1575,9 @@ public class LLLocalDictionary implements LLDictionary {
return this return this
.<Void>runOnDb(() -> { .<Void>runOnDb(() -> {
try (var range = rangeSend.receive()) { try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called setRange in a nonblocking thread");
}
if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) { if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
assert EMPTY_READ_OPTIONS.isOwningHandle(); assert EMPTY_READ_OPTIONS.isOwningHandle();
try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) { try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) {
@ -1912,6 +1948,9 @@ public class LLLocalDictionary implements LLDictionary {
public Mono<Void> clear() { public Mono<Void> clear() {
return Mono return Mono
.<Void>fromCallable(() -> { .<Void>fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called clear in a nonblocking thread");
}
try (var readOpts = new ReadOptions(getReadOptions(null))) { try (var readOpts = new ReadOptions(getReadOptions(null))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
@ -1971,6 +2010,9 @@ public class LLLocalDictionary implements LLDictionary {
return Mono.usingWhen(rangeMono, return Mono.usingWhen(rangeMono,
rangeSend -> { rangeSend -> {
try (var range = rangeSend.receive()) { try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called sizeRange in a nonblocking thread");
}
if (range.isAll()) { if (range.isAll()) {
return this return this
.runOnDb(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)) .runOnDb(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot))
@ -2044,6 +2086,9 @@ public class LLLocalDictionary implements LLDictionary {
return Mono.usingWhen(rangeMono, return Mono.usingWhen(rangeMono,
rangeSend -> runOnDb(() -> { rangeSend -> runOnDb(() -> {
try (var range = rangeSend.receive()) { try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getOne in a nonblocking thread");
}
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
@ -2102,6 +2147,9 @@ public class LLLocalDictionary implements LLDictionary {
return Mono.usingWhen(rangeMono, return Mono.usingWhen(rangeMono,
rangeSend -> runOnDb(() -> { rangeSend -> runOnDb(() -> {
try (var range = rangeSend.receive()) { try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getOneKey in a nonblocking thread");
}
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
@ -2183,6 +2231,9 @@ public class LLLocalDictionary implements LLDictionary {
} }
private long exactSizeAll(@Nullable LLSnapshot snapshot) { private long exactSizeAll(@Nullable LLSnapshot snapshot) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called exactSizeAll in a nonblocking thread");
}
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setFillCache(false); readOpts.setFillCache(false);
readOpts.setReadaheadSize(32 * 1024); // 32KiB readOpts.setReadaheadSize(32 * 1024); // 32KiB
@ -2264,6 +2315,9 @@ public class LLLocalDictionary implements LLDictionary {
return Mono.usingWhen(rangeMono, return Mono.usingWhen(rangeMono,
rangeSend -> runOnDb(() -> { rangeSend -> runOnDb(() -> {
try (var range = rangeSend.receive()) { try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called removeOne in a nonblocking thread");
}
try (var readOpts = new ReadOptions(getReadOptions(null))) { try (var readOpts = new ReadOptions(getReadOptions(null))) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
@ -2325,6 +2379,9 @@ public class LLLocalDictionary implements LLDictionary {
RocksDB db, RocksDB db,
ColumnFamilyHandle cfh) { ColumnFamilyHandle cfh) {
try (var range = rangeToReceive.receive()) { try (var range = rangeToReceive.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getRocksIterator in a nonblocking thread");
}
ReleasableSlice sliceMin; ReleasableSlice sliceMin;
ReleasableSlice sliceMax; ReleasableSlice sliceMax;
if (range.hasMin()) { if (range.hasMin()) {

View File

@ -213,6 +213,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
private void flushDb(RocksDB db, List<ColumnFamilyHandle> handles) throws RocksDBException { private void flushDb(RocksDB db, List<ColumnFamilyHandle> handles) throws RocksDBException {
if (Schedulers.isInNonBlockingThread()) {
logger.error("Called flushDb in a nonblocking thread");
}
// force flush the database // force flush the database
try (var flushOptions = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) { try (var flushOptions = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) {
db.flush(flushOptions); db.flush(flushOptions);
@ -227,6 +230,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
@SuppressWarnings("unused") @SuppressWarnings("unused")
private void compactDb(RocksDB db, List<ColumnFamilyHandle> handles) { private void compactDb(RocksDB db, List<ColumnFamilyHandle> handles) {
if (Schedulers.isInNonBlockingThread()) {
logger.error("Called compactDb in a nonblocking thread");
}
// force compact the database // force compact the database
for (ColumnFamilyHandle cfh : handles) { for (ColumnFamilyHandle cfh : handles) {
var t = new Thread(() -> { var t = new Thread(() -> {

View File

@ -13,6 +13,7 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot; import org.rocksdb.Snapshot;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class LLLocalSingleton implements LLSingleton { public class LLLocalSingleton implements LLSingleton {
@ -33,6 +34,9 @@ public class LLLocalSingleton implements LLSingleton {
this.databaseName = databaseName; this.databaseName = databaseName;
this.snapshotResolver = snapshotResolver; this.snapshotResolver = snapshotResolver;
this.name = name; this.name = name;
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Initialized in a nonblocking thread");
}
if (db.get(cfh, this.name) == null) { if (db.get(cfh, this.name) == null) {
db.put(cfh, this.name, defaultValue); db.put(cfh, this.name, defaultValue);
} }
@ -49,7 +53,12 @@ public class LLLocalSingleton implements LLSingleton {
@Override @Override
public Mono<byte[]> get(@Nullable LLSnapshot snapshot) { public Mono<byte[]> get(@Nullable LLSnapshot snapshot) {
return Mono return Mono
.fromCallable(() -> db.get(cfh, resolveSnapshot(snapshot), name)) .fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called get in a nonblocking thread");
}
return db.get(cfh, resolveSnapshot(snapshot), name);
})
.onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)); .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause));
} }
@ -57,6 +66,9 @@ public class LLLocalSingleton implements LLSingleton {
public Mono<Void> set(byte[] value) { public Mono<Void> set(byte[] value) {
return Mono return Mono
.<Void>fromCallable(() -> { .<Void>fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called set in a nonblocking thread");
}
db.put(cfh, name, value); db.put(cfh, name, value);
return null; return null;
}) })

View File

@ -190,6 +190,9 @@ public class LuceneUtils {
@NotNull @NotNull
public static String keyOfTopDoc(int docId, IndexReader indexReader, public static String keyOfTopDoc(int docId, IndexReader indexReader,
String keyFieldName) throws IOException, NoSuchElementException { String keyFieldName) throws IOException, NoSuchElementException {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called keyOfTopDoc in a nonblocking thread");
}
if (docId > indexReader.maxDoc()) { if (docId > indexReader.maxDoc()) {
throw new IOException("Document " + docId + " > maxDoc (" +indexReader.maxDoc() + ")"); throw new IOException("Document " + docId + " > maxDoc (" +indexReader.maxDoc() + ")");
} }
@ -269,7 +272,16 @@ public class LuceneUtils {
} }
} }
public static void readInternalAligned(Object ref, FileChannel channel, long pos, ByteBuffer b, int readLength, int usefulLength, long end) throws IOException { public static void readInternalAligned(Object ref,
FileChannel channel,
long pos,
ByteBuffer b,
int readLength,
int usefulLength,
long end) throws IOException {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called readInternalAligned in a nonblocking thread");
}
int startBufPosition = b.position(); int startBufPosition = b.position();
int readData = 0; int readData = 0;
int i; int i;
@ -365,7 +377,7 @@ public class LuceneUtils {
} else { } else {
return hitsFlux return hitsFlux
.parallel() .parallel()
.runOn(Schedulers.parallel()) .runOn(Schedulers.boundedElastic())
.map(hit -> { .map(hit -> {
var result = mapHitBlocking(hit, indexSearchers, keyFieldName); var result = mapHitBlocking(hit, indexSearchers, keyFieldName);
// The "else" value is an errored key score, to filter out next // The "else" value is an errored key score, to filter out next
@ -382,6 +394,9 @@ public class LuceneUtils {
private static LLKeyScore mapHitBlocking(ScoreDoc hit, private static LLKeyScore mapHitBlocking(ScoreDoc hit,
IndexSearchers indexSearchers, IndexSearchers indexSearchers,
String keyFieldName) { String keyFieldName) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called mapHitBlocking in a nonblocking thread");
}
int shardDocId = hit.doc; int shardDocId = hit.doc;
int shardIndex = hit.shardIndex; int shardIndex = hit.shardIndex;
float score = hit.score; float score = hit.score;
@ -413,7 +428,11 @@ public class LuceneUtils {
} }
} }
public static TopDocs mergeTopDocs(Sort sort, @Nullable Integer startN, @Nullable Integer topN, TopDocs[] topDocs, Comparator<ScoreDoc> tieBreaker) { public static TopDocs mergeTopDocs(Sort sort,
@Nullable Integer startN,
@Nullable Integer topN,
TopDocs[] topDocs,
Comparator<ScoreDoc> tieBreaker) {
if ((startN == null) != (topN == null)) { if ((startN == null) != (topN == null)) {
throw new IllegalArgumentException("You must pass startN and topN together or nothing"); throw new IllegalArgumentException("You must pass startN and topN together or nothing");
} }

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.lucene.searcher;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher { public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher {
@ -17,6 +18,9 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher {
Mono<Void> releaseIndexSearcher, Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams, LocalQueryParams queryParams,
String keyFieldName) { String keyFieldName) {
if (Schedulers.isInNonBlockingThread()) {
return Mono.error(() -> new UnsupportedOperationException("Called collect in a nonblocking thread"));
}
if (queryParams.limit() == 0) { if (queryParams.limit() == 0) {
return countSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName); return countSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName);
} else if (!queryParams.isScored() && queryParams.offset() == 0 && queryParams.limit() >= 2147483630 } else if (!queryParams.isScored() && queryParams.offset() == 0 && queryParams.limit() >= 2147483630

View File

@ -7,6 +7,7 @@ import org.apache.lucene.search.IndexSearcher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class CountLuceneLocalSearcher implements LuceneLocalSearcher { public class CountLuceneLocalSearcher implements LuceneLocalSearcher {
@ -16,10 +17,15 @@ public class CountLuceneLocalSearcher implements LuceneLocalSearcher {
LocalQueryParams queryParams, LocalQueryParams queryParams,
String keyFieldName) { String keyFieldName) {
return Mono return Mono
.fromCallable(() -> new LuceneSearchResult( .fromCallable(() -> {
TotalHitsCount.of(indexSearcher.count(queryParams.query()), true), if (Schedulers.isInNonBlockingThread()) {
Flux.empty(), throw new UnsupportedOperationException("Called collect in a nonblocking thread");
releaseIndexSearcher) }
return new LuceneSearchResult(
TotalHitsCount.of(indexSearcher.count(queryParams.query()), true),
Flux.empty(),
releaseIndexSearcher);
}
); );
} }
} }

View File

@ -11,6 +11,7 @@ import org.apache.lucene.search.Query;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class CountLuceneMultiSearcher implements LuceneMultiSearcher { public class CountLuceneMultiSearcher implements LuceneMultiSearcher {
@ -35,7 +36,15 @@ public class CountLuceneMultiSearcher implements LuceneMultiSearcher {
@Override @Override
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName) { public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName) {
return Mono.fromCallable(() -> new LuceneSearchResult(TotalHitsCount.of(totalHits.get(), true), Flux.empty(), Mono.when(release))); return Mono.fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
return new LuceneSearchResult(TotalHitsCount.of(totalHits.get(), true),
Flux.empty(),
Mono.when(release)
);
});
} }
}; };
}); });

View File

@ -50,6 +50,9 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
Mono<Void> releaseIndexSearcher, Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams) { LocalQueryParams queryParams) {
return Mono.fromCallable(() -> { return Mono.fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called searchOn in a nonblocking thread");
}
TopFieldCollector collector; TopFieldCollector collector;
synchronized (lock) { synchronized (lock) {
collector = firstPageSharedManager.newCollector(); collector = firstPageSharedManager.newCollector();
@ -64,10 +67,12 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
@Override @Override
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName) { public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName) {
if (Schedulers.isInNonBlockingThread()) {
return Mono.error(() -> new UnsupportedOperationException("Called collect in a nonblocking thread"));
}
if (!queryParams.isScored()) { if (!queryParams.isScored()) {
return Mono.error( return Mono.error(() -> new UnsupportedOperationException("Can't execute an unscored query"
new UnsupportedOperationException("Can't execute an unscored query with a scored lucene shard searcher") + " with a scored lucene shard searcher"));
);
} }
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
@ -92,10 +97,19 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
} }
return Flux return Flux
.<TopDocs>create(emitter -> { .<TopDocs>create(emitter -> {
if (Schedulers.isInNonBlockingThread()) {
emitter.error(new UnsupportedOperationException("Called collect in a nonblocking thread"));
return;
}
Empty<Void> cancelEvent = Sinks.empty(); Empty<Void> cancelEvent = Sinks.empty();
AtomicReference<CurrentPageInfo> currentPageInfoAtomicReference = new AtomicReference<>(new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs), AtomicReference<CurrentPageInfo> currentPageInfoAtomicReference = new AtomicReference<>(new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs),
paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1)); paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1));
emitter.onRequest(requests -> { emitter.onRequest(requests -> {
if (Schedulers.isInNonBlockingThread()) {
emitter.error(new UnsupportedOperationException("Called collect"
+ ", onRequest in a nonblocking thread"));
return;
}
synchronized (currentPageInfoAtomicReference) { synchronized (currentPageInfoAtomicReference) {
var s = currentPageInfoAtomicReference.get(); var s = currentPageInfoAtomicReference.get();
while (requests > 0 && !emitter.isCancelled()) { while (requests > 0 && !emitter.isCancelled()) {

View File

@ -13,6 +13,7 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.TopFieldDocs;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import reactor.core.scheduler.Schedulers;
public class ScoringShardsCollectorManager implements CollectorManager<TopFieldCollector, TopDocs> { public class ScoringShardsCollectorManager implements CollectorManager<TopFieldCollector, TopDocs> {
@ -62,6 +63,9 @@ public class ScoringShardsCollectorManager implements CollectorManager<TopFieldC
@Override @Override
public TopDocs reduce(Collection<TopFieldCollector> collectors) throws IOException { public TopDocs reduce(Collection<TopFieldCollector> collectors) throws IOException {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called reduce in a nonblocking thread");
}
TopDocs result; TopDocs result;
if (sort != null) { if (sort != null) {
TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()]; TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()];

View File

@ -15,6 +15,7 @@ import org.apache.lucene.search.TopDocsCollector;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
@ -25,6 +26,9 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
String keyFieldName) { String keyFieldName) {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
PaginationInfo paginationInfo; PaginationInfo paginationInfo;
if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) { if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) {
@ -64,6 +68,9 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
.<TopDocs, CurrentPageInfo>generate( .<TopDocs, CurrentPageInfo>generate(
() -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), () -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, sink) -> { (s, sink) -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
if (s.last() != null && s.remainingLimit() > 0) { if (s.last() != null && s.remainingLimit() > 0) {
TopDocs pageTopDocs; TopDocs pageTopDocs;
try { try {

View File

@ -23,6 +23,7 @@ import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopScoreDocCollector; import org.apache.lucene.search.TopScoreDocCollector;
import org.apache.lucene.search.TotalHits.Relation; import org.apache.lucene.search.TotalHits.Relation;
import reactor.core.scheduler.Schedulers;
class TopDocsSearcher { class TopDocsSearcher {

View File

@ -18,6 +18,7 @@ import org.apache.lucene.search.TopDocsCollector;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher {
@ -42,6 +43,9 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher {
Mono<Void> releaseIndexSearcher, Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams) { LocalQueryParams queryParams) {
return Mono.fromCallable(() -> { return Mono.fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called searchOn in a nonblocking thread");
}
TopDocsCollector<ScoreDoc> collector; TopDocsCollector<ScoreDoc> collector;
synchronized (lock) { synchronized (lock) {
collector = firstPageUnsortedCollectorManager.newCollector(); collector = firstPageUnsortedCollectorManager.newCollector();
@ -58,6 +62,9 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher {
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName) { public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName) {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
TopDocs result; TopDocs result;
Mono<Void> release; Mono<Void> release;
synchronized (lock) { synchronized (lock) {

View File

@ -15,6 +15,7 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector; import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.TopFieldDocs;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import reactor.core.scheduler.Schedulers;
public class UnscoredTopDocsCollectorManager implements public class UnscoredTopDocsCollectorManager implements
CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> { CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> {
@ -41,6 +42,9 @@ public class UnscoredTopDocsCollectorManager implements
@Override @Override
public TopDocs reduce(Collection<TopDocsCollector<ScoreDoc>> collection) throws IOException { public TopDocs reduce(Collection<TopDocsCollector<ScoreDoc>> collection) throws IOException {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called reduce in a nonblocking thread");
}
int i = 0; int i = 0;
TopDocs[] topDocsArray; TopDocs[] topDocsArray;
if (sort != null) { if (sort != null) {

View File

@ -50,6 +50,9 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult
@Override @Override
public void collect(int i) { public void collect(int i) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
var scoreDoc = new ScoreDoc(context.docBase + i, 0, shardIndex); var scoreDoc = new ScoreDoc(context.docBase + i, 0, shardIndex);
synchronized (scoreDocsSink) { synchronized (scoreDocsSink) {
while (scoreDocsSink.tryEmitNext(scoreDoc) == EmitResult.FAIL_OVERFLOW) { while (scoreDocsSink.tryEmitNext(scoreDoc) == EmitResult.FAIL_OVERFLOW) {
@ -94,6 +97,9 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult
LocalQueryParams queryParams) { LocalQueryParams queryParams) {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called searchOn in a nonblocking thread");
}
var collector = cm.newCollector(); var collector = cm.newCollector();
int collectorShardIndex; int collectorShardIndex;
synchronized (lock) { synchronized (lock) {
@ -123,6 +129,9 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult
String keyFieldName) { String keyFieldName) {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
synchronized (scoreDocsSink) { synchronized (scoreDocsSink) {
decrementRemainingCollectors(scoreDocsSink, remainingCollectors); decrementRemainingCollectors(scoreDocsSink, remainingCollectors);
} }