Bugfixes and updated reactor

This commit is contained in:
Andrea Cavalli 2021-04-15 00:00:42 +02:00
parent 505de18ecb
commit df84562bb9
4 changed files with 108 additions and 156 deletions

View File

@ -178,17 +178,17 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.3</version>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId>
<version>3.4.3</version>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.3</version>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>org.novasearch</groupId>

View File

@ -47,6 +47,9 @@ public class CountedStream<T> {
public static <T> Mono<CountedStream<T>> counted(Flux<T> flux) {
var publishedFlux = flux.cache();
return publishedFlux.count().map(count -> new CountedStream<>(publishedFlux, count));
return publishedFlux
.count()
.map(count -> new CountedStream<>(publishedFlux, count))
.switchIfEmpty(Mono.fromSupplier(() -> new CountedStream<>(Flux.empty(), 0)));
}
}

View File

@ -1,6 +1,5 @@
package it.cavallium.dbengine.database.disk;
import com.google.common.base.Suppliers;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLSnapshot;
@ -24,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.lang3.time.StopWatch;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
@ -58,8 +56,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalKeyValueDatabase.class);
private static final ColumnFamilyDescriptor DEFAULT_COLUMN_FAMILY = new ColumnFamilyDescriptor(
RocksDB.DEFAULT_COLUMN_FAMILY);
private static final Supplier<Scheduler> lowMemorySupplier = Suppliers.memoize(() ->
Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "db-low-memory", Integer.MAX_VALUE))::get;
private final Scheduler dbScheduler;
private final Path dbPath;
@ -88,16 +84,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
Path dbPath = Paths.get(dbPathString);
this.dbPath = dbPath;
this.name = name;
if (lowMemory) {
this.dbScheduler = lowMemorySupplier.get();
} else {
this.dbScheduler = Schedulers.newBoundedElastic(Math.max(8, Runtime.getRuntime().availableProcessors()),
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"db-" + name,
60,
true
);
}
this.dbScheduler = Schedulers.newBoundedElastic(lowMemory ? Runtime.getRuntime().availableProcessors()
: Math.max(8, Runtime.getRuntime().availableProcessors()),
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"db-" + name,
60,
true
);
createIfNotExists(descriptors, options, dbPath, dbPathString);
// Create all column families that don't exist

View File

@ -1,6 +1,5 @@
package it.cavallium.dbengine.database.disk;
import com.google.common.base.Suppliers;
import it.cavallium.dbengine.client.query.QueryParser;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.EnglishItalianStopFilter;
@ -31,12 +30,11 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
@ -58,6 +56,7 @@ import org.apache.lucene.store.FSDirectory;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import org.warp.commonutils.type.ShortNamedThreadFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.GroupedFlux;
@ -69,7 +68,6 @@ import reactor.util.function.Tuples;
public class LLLocalLuceneIndex implements LLLuceneIndex {
private static final boolean USE_STANDARD_SCHEDULERS = true;
protected static final Logger logger = LoggerFactory.getLogger(LLLocalLuceneIndex.class);
private static final LuceneStreamSearcher streamSearcher = new AdaptiveStreamSearcher();
private static final AllowOnlyQueryParsingCollectorStreamSearcher allowOnlyQueryParsingCollectorStreamSearcher
@ -85,30 +83,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Integer.MAX_VALUE,
true
);
private final Scheduler luceneBlockingScheduler;
private static final Function<String, Scheduler> boundedSchedulerSupplier = name ->
Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(),
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"lucene-" + name,
60
);
private static final Supplier<Scheduler> lowMemorySchedulerSupplier = Suppliers.memoize(() ->
Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"lucene-low-memory", Integer.MAX_VALUE))::get;
@SuppressWarnings("FieldCanBeLocal")
private final Supplier<Scheduler> querySchedulerSupplier = USE_STANDARD_SCHEDULERS ?
Schedulers::boundedElastic : Suppliers.memoize(() -> boundedSchedulerSupplier.apply("query"))::get;
@SuppressWarnings("FieldCanBeLocal")
private final Supplier<Scheduler> blockingSchedulerSupplier = USE_STANDARD_SCHEDULERS ?
Schedulers::boundedElastic : Suppliers.memoize(() -> boundedSchedulerSupplier.apply("blocking"))::get;
@SuppressWarnings("FieldCanBeLocal")
private final Supplier<Scheduler> blockingLuceneSearchSchedulerSupplier = USE_STANDARD_SCHEDULERS ?
Schedulers::boundedElastic : Suppliers.memoize(() -> boundedSchedulerSupplier.apply("search-blocking"))::get;
/**
* Lucene query scheduler.
*/
private final Scheduler luceneQueryScheduler;
private final Scheduler blockingLuceneSearchScheduler;
// Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks
private static final Scheduler luceneSearcherScheduler = Schedulers
.fromExecutorService(Executors
.newCachedThreadPool(new ShortNamedThreadFactory("lucene-searcher")));
private final String luceneIndexName;
private final SnapshotDeletionPolicy snapshotter;
@ -162,14 +140,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.indexWriter = new IndexWriter(directory, indexWriterConfig);
this.searcherManager
= new SearcherManager(indexWriter, false, false, null);
if (lowMemory) {
this.luceneQueryScheduler = this.luceneBlockingScheduler = blockingLuceneSearchScheduler
= lowMemorySchedulerSupplier.get();
} else {
this.luceneBlockingScheduler = blockingSchedulerSupplier.get();
this.luceneQueryScheduler = querySchedulerSupplier.get();
this.blockingLuceneSearchScheduler = blockingLuceneSearchSchedulerSupplier.get();
}
// Create scheduled tasks lifecycle manager
this.scheduledTasksLifecycle = new ScheduledTaskLifecycle();
@ -208,7 +178,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot));
return new LLSnapshot(snapshotSeqNo);
})
.subscribeOn(luceneBlockingScheduler)
.subscribeOn(Schedulers.boundedElastic())
);
}
@ -219,7 +189,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private Mono<IndexCommit> takeLuceneSnapshot() {
return Mono
.fromCallable(snapshotter::snapshot)
.subscribeOn(luceneBlockingScheduler)
.subscribeOn(Schedulers.boundedElastic())
.onErrorResume(ex -> Mono
.defer(() -> {
if (ex instanceof IllegalStateException && "No index commit to snapshot".equals(ex.getMessage())) {
@ -244,26 +214,22 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
throw new IOException("LLSnapshot " + snapshot.getSequenceNumber() + " not found!");
}
//noinspection BlockingMethodInNonBlockingContext
indexSnapshot.close();
var luceneIndexSnapshot = indexSnapshot.getSnapshot();
//noinspection BlockingMethodInNonBlockingContext
snapshotter.release(luceneIndexSnapshot);
// Delete unused files after releasing the snapshot
//noinspection BlockingMethodInNonBlockingContext
indexWriter.deleteUnusedFiles();
return null;
}).subscribeOn(luceneBlockingScheduler);
}).subscribeOn(Schedulers.boundedElastic());
}
@Override
public Mono<Void> addDocument(LLTerm key, LLDocument doc) {
return Mono.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.addDocument(LLUtils.toDocument(doc));
return null;
}).subscribeOn(luceneBlockingScheduler);
}).subscribeOn(Schedulers.boundedElastic());
}
@Override
@ -273,11 +239,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.collectList()
.flatMap(docs -> Mono
.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.addDocuments(LLUtils.toDocuments(docs));
return null;
})
.subscribeOn(luceneBlockingScheduler))
.subscribeOn(Schedulers.boundedElastic()))
)
.then();
}
@ -286,19 +251,17 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> deleteDocument(LLTerm id) {
return Mono.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.deleteDocuments(LLUtils.toTerm(id));
return null;
}).subscribeOn(luceneBlockingScheduler);
}).subscribeOn(Schedulers.boundedElastic());
}
@Override
public Mono<Void> updateDocument(LLTerm id, LLDocument document) {
return Mono.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document));
return null;
}).subscribeOn(luceneBlockingScheduler);
}).subscribeOn(Schedulers.boundedElastic());
}
@Override
@ -312,11 +275,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.collectList()
.flatMap(luceneDocuments -> Mono
.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.updateDocuments(LLUtils.toTerm(documents.key()), luceneDocuments);
return null;
})
.subscribeOn(luceneBlockingScheduler)
.subscribeOn(Schedulers.boundedElastic())
);
}
@ -337,7 +299,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono.fromCallable(() -> {
IndexSearcher indexSearcher;
if (snapshot == null) {
//noinspection BlockingMethodInNonBlockingContext
indexSearcher = searcherManager.acquire();
indexSearcher.setSimilarity(getSimilarity());
} else {
@ -352,20 +313,19 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} else {
return indexSearcher;
}
}).subscribeOn(luceneBlockingScheduler);
}).subscribeOn(Schedulers.boundedElastic());
}
private Mono<Void> releaseSearcherWrapper(LLSnapshot snapshot, IndexSearcher indexSearcher) {
return Mono.<Void>fromRunnable(() -> {
if (snapshot == null) {
try {
//noinspection BlockingMethodInNonBlockingContext
searcherManager.release(indexSearcher);
} catch (IOException e) {
e.printStackTrace();
}
}
}).subscribeOn(luceneBlockingScheduler);
}).subscribeOn(Schedulers.boundedElastic());
}
@Override
@ -449,7 +409,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
// Get the reference doc and apply it to MoreLikeThis, to generate the query
//noinspection BlockingMethodInNonBlockingContext
var mltQuery = mlt.like((Map) mltDocumentFields);
Query luceneQuery;
if (luceneAdditionalQuery != null) {
@ -463,7 +422,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return luceneQuery;
})
.subscribeOn(luceneQueryScheduler)
.subscribeOn(Schedulers.boundedElastic())
.map(luceneQuery -> luceneSearch(doDistributedPre,
indexSearcher,
queryParams.getOffset(),
@ -520,7 +479,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
org.apache.lucene.search.ScoreMode luceneScoreMode = QueryParser.toScoreMode(queryParams.getScoreMode());
return Tuples.of(luceneQuery, Optional.ofNullable(luceneSort), luceneScoreMode);
})
.subscribeOn(luceneQueryScheduler)
.subscribeOn(Schedulers.boundedElastic())
.<LLSearchResult>flatMap(tuple -> Mono
.fromSupplier(() -> {
Query luceneQuery = tuple.getT1();
@ -560,89 +519,87 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
ScoreMode luceneScoreMode,
Mono<Void> successCleanup) {
return new LLSearchResult(Mono.<LLSearchResultShard>create(monoSink -> {
LuceneSearchInstance luceneSearchInstance;
long totalHitsCount;
try {
if (doDistributedPre) {
//noinspection BlockingMethodInNonBlockingContext
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
monoSink.success(new LLSearchResultShard(successCleanup.thenMany(Flux.empty()), 0));
return;
} else {
int boundedOffset = Math.max(0, offset > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) offset);
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
//noinspection BlockingMethodInNonBlockingContext
luceneSearchInstance = streamSearcher.search(indexSearcher,
luceneQuery,
boundedOffset,
boundedLimit,
luceneSort,
luceneScoreMode,
minCompetitiveScore,
keyFieldName
);
//noinspection BlockingMethodInNonBlockingContext
totalHitsCount = luceneSearchInstance.getTotalHitsCount();
}
} catch (Exception ex) {
monoSink.error(ex);
return;
}
AtomicBoolean alreadySubscribed = new AtomicBoolean(false);
var resultsFlux = Flux.<LLKeyScore>create(sink -> {
if (!alreadySubscribed.compareAndSet(false, true)) {
sink.error(new IllegalStateException("Already subscribed to results"));
return;
}
AtomicBoolean cancelled = new AtomicBoolean();
Semaphore requests = new Semaphore(0);
sink.onDispose(() -> cancelled.set(true));
sink.onCancel(() -> cancelled.set(true));
sink.onRequest(delta -> requests.release((int) Math.min(delta, Integer.MAX_VALUE)));
LuceneSearchInstance luceneSearchInstance;
long totalHitsCount;
try {
//noinspection BlockingMethodInNonBlockingContext
luceneSearchInstance.getResults(keyScore -> {
try {
if (cancelled.get()) {
return HandleResult.HALT;
}
while (!requests.tryAcquire(500, TimeUnit.MILLISECONDS)) {
if (cancelled.get()) {
return HandleResult.HALT;
}
}
sink.next(fixKeyScore(keyScore, scoreDivisor));
if (cancelled.get()) {
return HandleResult.HALT;
} else {
return HandleResult.CONTINUE;
}
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requests.release(Integer.MAX_VALUE);
return HandleResult.HALT;
}
});
sink.complete();
if (doDistributedPre) {
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
monoSink.success(new LLSearchResultShard(successCleanup.thenMany(Flux.empty()), 0));
return;
} else {
int boundedOffset = Math.max(0, offset > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) offset);
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
luceneSearchInstance = streamSearcher.search(indexSearcher,
luceneQuery,
boundedOffset,
boundedLimit,
luceneSort,
luceneScoreMode,
minCompetitiveScore,
keyFieldName
);
totalHitsCount = luceneSearchInstance.getTotalHitsCount();
}
} catch (Exception ex) {
sink.error(ex);
monoSink.error(ex);
return;
}
}, OverflowStrategy.ERROR).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler);
AtomicBoolean alreadySubscribed = new AtomicBoolean(false);
var resultsFlux = Flux.<LLKeyScore>create(sink -> {
monoSink.success(new LLSearchResultShard(Flux
.usingWhen(
Mono.just(true),
b -> resultsFlux,
b -> successCleanup),
totalHitsCount));
}).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler).flux());
if (!alreadySubscribed.compareAndSet(false, true)) {
sink.error(new IllegalStateException("Already subscribed to results"));
return;
}
AtomicBoolean cancelled = new AtomicBoolean();
Semaphore requests = new Semaphore(0);
sink.onDispose(() -> cancelled.set(true));
sink.onCancel(() -> cancelled.set(true));
sink.onRequest(delta -> requests.release((int) Math.min(delta, Integer.MAX_VALUE)));
luceneSearcherScheduler
.schedule(() -> {
try {
luceneSearchInstance.getResults(keyScore -> {
try {
if (cancelled.get()) {
return HandleResult.HALT;
}
while (!requests.tryAcquire(500, TimeUnit.MILLISECONDS)) {
if (cancelled.get()) {
return HandleResult.HALT;
}
}
sink.next(fixKeyScore(keyScore, scoreDivisor));
if (cancelled.get()) {
return HandleResult.HALT;
} else {
return HandleResult.CONTINUE;
}
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requests.release(Integer.MAX_VALUE);
return HandleResult.HALT;
}
});
sink.complete();
} catch (Exception ex) {
sink.error(ex);
}
});
}, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic());
monoSink.success(new LLSearchResultShard(Flux
.usingWhen(
Mono.just(true),
b -> resultsFlux,
b -> successCleanup),
totalHitsCount));
}).subscribeOn(Schedulers.boundedElastic()).flux());
}
@Override
@ -650,7 +607,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono
.<Void>fromCallable(() -> {
logger.debug("Closing IndexWriter...");
this.blockingLuceneSearchScheduler.dispose();
scheduledTasksLifecycle.cancelAndWait();
//noinspection BlockingMethodInNonBlockingContext
indexWriter.close();