From df84562bb90e673c51f59c2fa66b816497f38b36 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 15 Apr 2021 00:00:42 +0200 Subject: [PATCH] Bugfixes and updated reactor --- pom.xml | 6 +- .../dbengine/client/CountedStream.java | 5 +- .../disk/LLLocalKeyValueDatabase.java | 21 +- .../database/disk/LLLocalLuceneIndex.java | 232 +++++++----------- 4 files changed, 108 insertions(+), 156 deletions(-) diff --git a/pom.xml b/pom.xml index 7533cff..8da0641 100644 --- a/pom.xml +++ b/pom.xml @@ -178,17 +178,17 @@ io.projectreactor reactor-core - 3.4.3 + 3.4.5 io.projectreactor reactor-tools - 3.4.3 + 3.4.5 io.projectreactor reactor-test - 3.4.3 + 3.4.5 org.novasearch diff --git a/src/main/java/it/cavallium/dbengine/client/CountedStream.java b/src/main/java/it/cavallium/dbengine/client/CountedStream.java index 2262558..8aa3493 100644 --- a/src/main/java/it/cavallium/dbengine/client/CountedStream.java +++ b/src/main/java/it/cavallium/dbengine/client/CountedStream.java @@ -47,6 +47,9 @@ public class CountedStream { public static Mono> counted(Flux flux) { var publishedFlux = flux.cache(); - return publishedFlux.count().map(count -> new CountedStream<>(publishedFlux, count)); + return publishedFlux + .count() + .map(count -> new CountedStream<>(publishedFlux, count)) + .switchIfEmpty(Mono.fromSupplier(() -> new CountedStream<>(Flux.empty(), 0))); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 564b151..5978a1d 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -1,6 +1,5 @@ package it.cavallium.dbengine.database.disk; -import com.google.common.base.Suppliers; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLSnapshot; @@ -24,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; import org.apache.commons.lang3.time.StopWatch; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; @@ -58,8 +56,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { protected static final Logger logger = LoggerFactory.getLogger(LLLocalKeyValueDatabase.class); private static final ColumnFamilyDescriptor DEFAULT_COLUMN_FAMILY = new ColumnFamilyDescriptor( RocksDB.DEFAULT_COLUMN_FAMILY); - private static final Supplier lowMemorySupplier = Suppliers.memoize(() -> - Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "db-low-memory", Integer.MAX_VALUE))::get; private final Scheduler dbScheduler; private final Path dbPath; @@ -88,16 +84,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { Path dbPath = Paths.get(dbPathString); this.dbPath = dbPath; this.name = name; - if (lowMemory) { - this.dbScheduler = lowMemorySupplier.get(); - } else { - this.dbScheduler = Schedulers.newBoundedElastic(Math.max(8, Runtime.getRuntime().availableProcessors()), - Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - "db-" + name, - 60, - true - ); - } + this.dbScheduler = Schedulers.newBoundedElastic(lowMemory ? Runtime.getRuntime().availableProcessors() + : Math.max(8, Runtime.getRuntime().availableProcessors()), + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + "db-" + name, + 60, + true + ); createIfNotExists(descriptors, options, dbPath, dbPathString); // Create all column families that don't exist diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 5929372..e56e15f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -1,6 +1,5 @@ package it.cavallium.dbengine.database.disk; -import com.google.common.base.Suppliers; import it.cavallium.dbengine.client.query.QueryParser; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.EnglishItalianStopFilter; @@ -31,12 +30,11 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; -import java.util.function.Supplier; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -58,6 +56,7 @@ import org.apache.lucene.store.FSDirectory; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; +import org.warp.commonutils.type.ShortNamedThreadFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.GroupedFlux; @@ -69,7 +68,6 @@ import reactor.util.function.Tuples; public class LLLocalLuceneIndex implements LLLuceneIndex { - private static final boolean USE_STANDARD_SCHEDULERS = true; protected static final Logger logger = LoggerFactory.getLogger(LLLocalLuceneIndex.class); private static final LuceneStreamSearcher streamSearcher = new AdaptiveStreamSearcher(); private static final AllowOnlyQueryParsingCollectorStreamSearcher allowOnlyQueryParsingCollectorStreamSearcher @@ -85,30 +83,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { Integer.MAX_VALUE, true ); - private final Scheduler luceneBlockingScheduler; - private static final Function boundedSchedulerSupplier = name -> - Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), - Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - "lucene-" + name, - 60 - ); - private static final Supplier lowMemorySchedulerSupplier = Suppliers.memoize(() -> - Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - "lucene-low-memory", Integer.MAX_VALUE))::get; - @SuppressWarnings("FieldCanBeLocal") - private final Supplier querySchedulerSupplier = USE_STANDARD_SCHEDULERS ? - Schedulers::boundedElastic : Suppliers.memoize(() -> boundedSchedulerSupplier.apply("query"))::get; - @SuppressWarnings("FieldCanBeLocal") - private final Supplier blockingSchedulerSupplier = USE_STANDARD_SCHEDULERS ? - Schedulers::boundedElastic : Suppliers.memoize(() -> boundedSchedulerSupplier.apply("blocking"))::get; - @SuppressWarnings("FieldCanBeLocal") - private final Supplier blockingLuceneSearchSchedulerSupplier = USE_STANDARD_SCHEDULERS ? - Schedulers::boundedElastic : Suppliers.memoize(() -> boundedSchedulerSupplier.apply("search-blocking"))::get; - /** - * Lucene query scheduler. - */ - private final Scheduler luceneQueryScheduler; - private final Scheduler blockingLuceneSearchScheduler; + // Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks + private static final Scheduler luceneSearcherScheduler = Schedulers + .fromExecutorService(Executors + .newCachedThreadPool(new ShortNamedThreadFactory("lucene-searcher"))); private final String luceneIndexName; private final SnapshotDeletionPolicy snapshotter; @@ -162,14 +140,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { this.indexWriter = new IndexWriter(directory, indexWriterConfig); this.searcherManager = new SearcherManager(indexWriter, false, false, null); - if (lowMemory) { - this.luceneQueryScheduler = this.luceneBlockingScheduler = blockingLuceneSearchScheduler - = lowMemorySchedulerSupplier.get(); - } else { - this.luceneBlockingScheduler = blockingSchedulerSupplier.get(); - this.luceneQueryScheduler = querySchedulerSupplier.get(); - this.blockingLuceneSearchScheduler = blockingLuceneSearchSchedulerSupplier.get(); - } // Create scheduled tasks lifecycle manager this.scheduledTasksLifecycle = new ScheduledTaskLifecycle(); @@ -208,7 +178,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot)); return new LLSnapshot(snapshotSeqNo); }) - .subscribeOn(luceneBlockingScheduler) + .subscribeOn(Schedulers.boundedElastic()) ); } @@ -219,7 +189,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private Mono takeLuceneSnapshot() { return Mono .fromCallable(snapshotter::snapshot) - .subscribeOn(luceneBlockingScheduler) + .subscribeOn(Schedulers.boundedElastic()) .onErrorResume(ex -> Mono .defer(() -> { if (ex instanceof IllegalStateException && "No index commit to snapshot".equals(ex.getMessage())) { @@ -244,26 +214,22 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { throw new IOException("LLSnapshot " + snapshot.getSequenceNumber() + " not found!"); } - //noinspection BlockingMethodInNonBlockingContext indexSnapshot.close(); var luceneIndexSnapshot = indexSnapshot.getSnapshot(); - //noinspection BlockingMethodInNonBlockingContext snapshotter.release(luceneIndexSnapshot); // Delete unused files after releasing the snapshot - //noinspection BlockingMethodInNonBlockingContext indexWriter.deleteUnusedFiles(); return null; - }).subscribeOn(luceneBlockingScheduler); + }).subscribeOn(Schedulers.boundedElastic()); } @Override public Mono addDocument(LLTerm key, LLDocument doc) { return Mono.fromCallable(() -> { - //noinspection BlockingMethodInNonBlockingContext indexWriter.addDocument(LLUtils.toDocument(doc)); return null; - }).subscribeOn(luceneBlockingScheduler); + }).subscribeOn(Schedulers.boundedElastic()); } @Override @@ -273,11 +239,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .collectList() .flatMap(docs -> Mono .fromCallable(() -> { - //noinspection BlockingMethodInNonBlockingContext indexWriter.addDocuments(LLUtils.toDocuments(docs)); return null; }) - .subscribeOn(luceneBlockingScheduler)) + .subscribeOn(Schedulers.boundedElastic())) ) .then(); } @@ -286,19 +251,17 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono deleteDocument(LLTerm id) { return Mono.fromCallable(() -> { - //noinspection BlockingMethodInNonBlockingContext indexWriter.deleteDocuments(LLUtils.toTerm(id)); return null; - }).subscribeOn(luceneBlockingScheduler); + }).subscribeOn(Schedulers.boundedElastic()); } @Override public Mono updateDocument(LLTerm id, LLDocument document) { return Mono.fromCallable(() -> { - //noinspection BlockingMethodInNonBlockingContext indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)); return null; - }).subscribeOn(luceneBlockingScheduler); + }).subscribeOn(Schedulers.boundedElastic()); } @Override @@ -312,11 +275,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .collectList() .flatMap(luceneDocuments -> Mono .fromCallable(() -> { - //noinspection BlockingMethodInNonBlockingContext indexWriter.updateDocuments(LLUtils.toTerm(documents.key()), luceneDocuments); return null; }) - .subscribeOn(luceneBlockingScheduler) + .subscribeOn(Schedulers.boundedElastic()) ); } @@ -337,7 +299,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono.fromCallable(() -> { IndexSearcher indexSearcher; if (snapshot == null) { - //noinspection BlockingMethodInNonBlockingContext indexSearcher = searcherManager.acquire(); indexSearcher.setSimilarity(getSimilarity()); } else { @@ -352,20 +313,19 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } else { return indexSearcher; } - }).subscribeOn(luceneBlockingScheduler); + }).subscribeOn(Schedulers.boundedElastic()); } private Mono releaseSearcherWrapper(LLSnapshot snapshot, IndexSearcher indexSearcher) { return Mono.fromRunnable(() -> { if (snapshot == null) { try { - //noinspection BlockingMethodInNonBlockingContext searcherManager.release(indexSearcher); } catch (IOException e) { e.printStackTrace(); } } - }).subscribeOn(luceneBlockingScheduler); + }).subscribeOn(Schedulers.boundedElastic()); } @Override @@ -449,7 +409,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } // Get the reference doc and apply it to MoreLikeThis, to generate the query - //noinspection BlockingMethodInNonBlockingContext var mltQuery = mlt.like((Map) mltDocumentFields); Query luceneQuery; if (luceneAdditionalQuery != null) { @@ -463,7 +422,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return luceneQuery; }) - .subscribeOn(luceneQueryScheduler) + .subscribeOn(Schedulers.boundedElastic()) .map(luceneQuery -> luceneSearch(doDistributedPre, indexSearcher, queryParams.getOffset(), @@ -520,7 +479,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { org.apache.lucene.search.ScoreMode luceneScoreMode = QueryParser.toScoreMode(queryParams.getScoreMode()); return Tuples.of(luceneQuery, Optional.ofNullable(luceneSort), luceneScoreMode); }) - .subscribeOn(luceneQueryScheduler) + .subscribeOn(Schedulers.boundedElastic()) .flatMap(tuple -> Mono .fromSupplier(() -> { Query luceneQuery = tuple.getT1(); @@ -560,89 +519,87 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { ScoreMode luceneScoreMode, Mono successCleanup) { return new LLSearchResult(Mono.create(monoSink -> { - - LuceneSearchInstance luceneSearchInstance; - long totalHitsCount; - try { - if (doDistributedPre) { - //noinspection BlockingMethodInNonBlockingContext - allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); - monoSink.success(new LLSearchResultShard(successCleanup.thenMany(Flux.empty()), 0)); - return; - } else { - int boundedOffset = Math.max(0, offset > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) offset); - int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); - //noinspection BlockingMethodInNonBlockingContext - luceneSearchInstance = streamSearcher.search(indexSearcher, - luceneQuery, - boundedOffset, - boundedLimit, - luceneSort, - luceneScoreMode, - minCompetitiveScore, - keyFieldName - ); - //noinspection BlockingMethodInNonBlockingContext - totalHitsCount = luceneSearchInstance.getTotalHitsCount(); - } - } catch (Exception ex) { - monoSink.error(ex); - return; - } - - AtomicBoolean alreadySubscribed = new AtomicBoolean(false); - var resultsFlux = Flux.create(sink -> { - - if (!alreadySubscribed.compareAndSet(false, true)) { - sink.error(new IllegalStateException("Already subscribed to results")); - return; - } - - AtomicBoolean cancelled = new AtomicBoolean(); - Semaphore requests = new Semaphore(0); - sink.onDispose(() -> cancelled.set(true)); - sink.onCancel(() -> cancelled.set(true)); - sink.onRequest(delta -> requests.release((int) Math.min(delta, Integer.MAX_VALUE))); - + LuceneSearchInstance luceneSearchInstance; + long totalHitsCount; try { - //noinspection BlockingMethodInNonBlockingContext - luceneSearchInstance.getResults(keyScore -> { - try { - if (cancelled.get()) { - return HandleResult.HALT; - } - while (!requests.tryAcquire(500, TimeUnit.MILLISECONDS)) { - if (cancelled.get()) { - return HandleResult.HALT; - } - } - sink.next(fixKeyScore(keyScore, scoreDivisor)); - if (cancelled.get()) { - return HandleResult.HALT; - } else { - return HandleResult.CONTINUE; - } - } catch (Exception ex) { - sink.error(ex); - cancelled.set(true); - requests.release(Integer.MAX_VALUE); - return HandleResult.HALT; - } - }); - sink.complete(); + if (doDistributedPre) { + allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); + monoSink.success(new LLSearchResultShard(successCleanup.thenMany(Flux.empty()), 0)); + return; + } else { + int boundedOffset = Math.max(0, offset > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) offset); + int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); + luceneSearchInstance = streamSearcher.search(indexSearcher, + luceneQuery, + boundedOffset, + boundedLimit, + luceneSort, + luceneScoreMode, + minCompetitiveScore, + keyFieldName + ); + totalHitsCount = luceneSearchInstance.getTotalHitsCount(); + } } catch (Exception ex) { - sink.error(ex); + monoSink.error(ex); + return; } - }, OverflowStrategy.ERROR).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler); + AtomicBoolean alreadySubscribed = new AtomicBoolean(false); + var resultsFlux = Flux.create(sink -> { - monoSink.success(new LLSearchResultShard(Flux - .usingWhen( - Mono.just(true), - b -> resultsFlux, - b -> successCleanup), - totalHitsCount)); - }).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler).flux()); + if (!alreadySubscribed.compareAndSet(false, true)) { + sink.error(new IllegalStateException("Already subscribed to results")); + return; + } + + AtomicBoolean cancelled = new AtomicBoolean(); + Semaphore requests = new Semaphore(0); + sink.onDispose(() -> cancelled.set(true)); + sink.onCancel(() -> cancelled.set(true)); + sink.onRequest(delta -> requests.release((int) Math.min(delta, Integer.MAX_VALUE))); + + luceneSearcherScheduler + .schedule(() -> { + try { + luceneSearchInstance.getResults(keyScore -> { + try { + if (cancelled.get()) { + return HandleResult.HALT; + } + while (!requests.tryAcquire(500, TimeUnit.MILLISECONDS)) { + if (cancelled.get()) { + return HandleResult.HALT; + } + } + sink.next(fixKeyScore(keyScore, scoreDivisor)); + if (cancelled.get()) { + return HandleResult.HALT; + } else { + return HandleResult.CONTINUE; + } + } catch (Exception ex) { + sink.error(ex); + cancelled.set(true); + requests.release(Integer.MAX_VALUE); + return HandleResult.HALT; + } + }); + sink.complete(); + } catch (Exception ex) { + sink.error(ex); + } + }); + + }, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic()); + + monoSink.success(new LLSearchResultShard(Flux + .usingWhen( + Mono.just(true), + b -> resultsFlux, + b -> successCleanup), + totalHitsCount)); + }).subscribeOn(Schedulers.boundedElastic()).flux()); } @Override @@ -650,7 +607,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono .fromCallable(() -> { logger.debug("Closing IndexWriter..."); - this.blockingLuceneSearchScheduler.dispose(); scheduledTasksLifecycle.cancelAndWait(); //noinspection BlockingMethodInNonBlockingContext indexWriter.close();