Performance bugfix

This commit is contained in:
Andrea Cavalli 2021-09-08 21:34:52 +02:00
parent 047a471bf7
commit 09c7e4f730
5 changed files with 79 additions and 60 deletions

View File

@ -6,9 +6,11 @@ import com.google.common.cache.LoadingCache;
import it.cavallium.dbengine.database.LLSnapshot;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
@ -44,6 +46,7 @@ public class CachedIndexSearcherManager {
private final Empty<Void> closeRequested = Sinks.empty();
private final Empty<Void> refresherClosed = Sinks.empty();
private final Mono<Void> closeMono;
public CachedIndexSearcherManager(IndexWriter indexWriter,
SnapshotsManager snapshotsManager,
@ -64,7 +67,7 @@ public class CachedIndexSearcherManager {
Mono
.fromRunnable(() -> {
try {
maybeRefreshBlocking();
maybeRefresh();
} catch (Exception ex) {
logger.error("Failed to refresh the searcher manager", ex);
}
@ -86,10 +89,49 @@ public class CachedIndexSearcherManager {
}
});
this.cachedMainSearcher = this.generateCachedSearcher(null);
this.closeMono = Mono
.fromRunnable(() -> {
logger.info("Closing IndexSearcherManager...");
this.closeRequested.tryEmitEmpty();
})
.then(refresherClosed.asMono())
.then(Mono.<Void>fromRunnable(() -> {
logger.info("Closed IndexSearcherManager");
logger.info("Closing refreshes...");
if (!activeRefreshes.isTerminated()) {
try {
activeRefreshes.awaitAdvanceInterruptibly(activeRefreshes.arrive(), 15, TimeUnit.SECONDS);
} catch (Exception ex) {
if (ex instanceof TimeoutException) {
logger.error("Failed to terminate active refreshes: timeout");
} else {
logger.error("Failed to terminate active refreshes", ex);
}
}
}
logger.info("Closed refreshes...");
logger.info("Closing active searchers...");
if (!activeSearchers.isTerminated()) {
try {
activeSearchers.awaitAdvanceInterruptibly(activeSearchers.arrive(), 15, TimeUnit.SECONDS);
} catch (Exception ex) {
if (ex instanceof TimeoutException) {
logger.error("Failed to terminate active searchers: timeout");
} else {
logger.error("Failed to terminate active searchers", ex);
}
}
}
logger.info("Closed active searchers");
cachedSnapshotSearchers.invalidateAll();
cachedSnapshotSearchers.cleanUp();
})).cache();
}
private Mono<CachedIndexSearcher> generateCachedSearcher(@Nullable LLSnapshot snapshot) {
return Mono.fromCallable(() -> {
activeSearchers.register();
IndexSearcher indexSearcher;
SearcherManager associatedSearcherManager;
if (snapshot == null) {
@ -100,7 +142,18 @@ public class CachedIndexSearcherManager {
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher();
associatedSearcherManager = null;
}
return new CachedIndexSearcher(indexSearcher, associatedSearcherManager, activeSearchers::arriveAndDeregister);
AtomicBoolean alreadyDeregistered = new AtomicBoolean(false);
return new CachedIndexSearcher(indexSearcher, associatedSearcherManager,
() -> {
// This shouldn't happen more than once,
// but I put this AtomicBoolean to be sure that this will NEVER happen more than once.
if (alreadyDeregistered.compareAndSet(false, true)) {
activeSearchers.arriveAndDeregister();
} else {
logger.error("Disposed CachedIndexSearcher twice! This is an implementation bug!");
}
}
);
})
.cacheInvalidateWhen(indexSearcher -> Mono
.firstWithSignal(
@ -159,10 +212,7 @@ public class CachedIndexSearcherManager {
return this
.retrieveCachedIndexSearcher(snapshot)
// Increment reference count
.doOnNext(indexSearcher -> {
activeSearchers.register();
indexSearcher.incUsage();
});
.doOnNext(CachedIndexSearcher::incUsage);
}
private Mono<CachedIndexSearcher> retrieveCachedIndexSearcher(LLSnapshot snapshot) {
@ -185,42 +235,6 @@ public class CachedIndexSearcherManager {
}
public Mono<Void> close() {
return Mono
.fromRunnable(() -> {
logger.info("Closing IndexSearcherManager...");
this.closeRequested.tryEmitEmpty();
})
.then(refresherClosed.asMono())
.then(Mono.fromRunnable(() -> {
logger.info("Closed IndexSearcherManager");
logger.info("Closing refreshes...");
if (!activeRefreshes.isTerminated()) {
try {
activeRefreshes.awaitAdvanceInterruptibly(activeRefreshes.arrive(), 15, TimeUnit.SECONDS);
} catch (Exception ex) {
if (ex instanceof TimeoutException) {
logger.error("Failed to terminate active refreshes: timeout");
} else {
logger.error("Failed to terminate active refreshes", ex);
}
}
}
logger.info("Closed refreshes...");
logger.info("Closing active searchers...");
if (!activeSearchers.isTerminated()) {
try {
activeSearchers.awaitAdvanceInterruptibly(activeSearchers.arrive(), 15, TimeUnit.SECONDS);
} catch (Exception ex) {
if (ex instanceof TimeoutException) {
logger.error("Failed to terminate active searchers: timeout");
} else {
logger.error("Failed to terminate active searchers", ex);
}
}
}
logger.info("Closed active searchers");
cachedSnapshotSearchers.invalidateAll();
cachedSnapshotSearchers.cleanUp();
}));
return closeMono;
}
}

View File

@ -1111,7 +1111,6 @@ public class LLLocalDictionary implements LLDictionary {
public Flux<Send<LLEntry>> putMulti(Flux<Send<LLEntry>> entries, boolean getOldValues) {
return entries
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.publishOn(dbScheduler)
.flatMapSequential(ew -> Mono
.<List<Send<LLEntry>>>fromCallable(() -> {
var entriesWindow = new ArrayList<LLEntry>(ew.size());
@ -1206,7 +1205,7 @@ public class LLLocalDictionary implements LLDictionary {
llEntry.close();
}
}
}), 2) // Max concurrency is 2 to read data while preparing the next segment
}).subscribeOn(dbScheduler), 2) // Max concurrency is 2 to read data while preparing the next segment
.flatMapIterable(oldValuesList -> oldValuesList)
.transform(LLUtils::handleDiscard);
}

View File

@ -77,13 +77,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
true
);
// Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks
protected static final Scheduler luceneSearcherScheduler = Schedulers.newBoundedElastic(
4,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"lucene-searcher",
60,
true
);
protected final Scheduler luceneSearcherScheduler = LuceneUtils.newLuceneSearcherScheduler(false);
// Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks
private static final Scheduler luceneWriterScheduler = Schedulers.boundedElastic();
@ -257,9 +251,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLDocument>> documents) {
return documents
.collectList()
.publishOn(luceneWriterScheduler)
.flatMap(documentsList -> Mono
.fromCallable(() -> {
.<Void>fromCallable(() -> {
activeTasks.register();
try {
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
@ -267,7 +260,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} finally {
activeTasks.arriveAndDeregister();
}
})
}).subscribeOn(luceneWriterScheduler)
);
}
@ -510,7 +503,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
try {
if (activeTasks.isTerminated()) return null;
if (force) {
if (activeTasks.isTerminated()) return null;
//noinspection BlockingMethodInNonBlockingContext
searcherManager.maybeRefreshBlocking();
} else {

View File

@ -1,7 +1,5 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.disk.LLLocalLuceneIndex.luceneSearcherScheduler;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
@ -53,12 +51,16 @@ import org.warp.commonutils.functional.IOBiConsumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
// Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks
protected final Scheduler luceneSearcherScheduler = LuceneUtils.newLuceneSearcherScheduler(true);
private final ConcurrentHashMap<Long, LLSnapshot[]> registeredSnapshots = new ConcurrentHashMap<>();
private final AtomicLong nextSnapshotNumber = new AtomicLong(1);
private final LLLocalLuceneIndex[] luceneIndices;

View File

@ -374,8 +374,10 @@ public class LuceneUtils {
.transform(hitsFlux -> {
if (preserveOrder) {
return hitsFlux
.publishOn(scheduler)
.mapNotNull(hit -> mapHitBlocking(hit, indexSearchers, keyFieldName));
.flatMapSequential(hit -> Mono
.fromCallable(() -> mapHitBlocking(hit, indexSearchers, keyFieldName))
.subscribeOn(scheduler)
);
} else {
return hitsFlux
.parallel()
@ -508,4 +510,14 @@ public class LuceneUtils {
return totalHitsCount.value() + "+";
}
}
public static Scheduler newLuceneSearcherScheduler(boolean multi) {
return Schedulers.newBoundedElastic(
4,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
multi ? "lucene-searcher-multi" : "lucene-searcher-shard",
60,
true
);
}
}