Implement more phasers

This commit is contained in:
Andrea Cavalli 2021-09-06 18:52:21 +02:00
parent 51b60168f7
commit 936c07406e
4 changed files with 67 additions and 159 deletions

View File

@ -18,6 +18,8 @@ import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.AlreadyClosedException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
@ -26,11 +28,14 @@ import reactor.core.scheduler.Schedulers;
public class CachedIndexSearcherManager {
private static final Logger logger = LoggerFactory.getLogger(CachedIndexSearcherManager.class);
private final SnapshotsManager snapshotsManager;
private final Similarity similarity;
private final SearcherManager searcherManager;
private final Duration queryRefreshDebounceTime;
private final Phaser activeSearchers = new Phaser(1);
private final Phaser activeRefreshes = new Phaser(1);
private final LoadingCache<LLSnapshot, Mono<CachedIndexSearcher>> cachedSnapshotSearchers;
private final Mono<CachedIndexSearcher> cachedMainSearcher;
@ -55,7 +60,13 @@ public class CachedIndexSearcherManager {
);
Mono
.fromRunnable(this::scheduledQueryRefresh)
.fromRunnable(() -> {
try {
maybeRefreshBlocking();
} catch (Exception ex) {
logger.error("Failed to refresh the searcher manager", ex);
}
})
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime, Schedulers.boundedElastic()))
.subscribeOn(Schedulers.boundedElastic())
.takeUntilOther(closeRequested.asMono())
@ -98,37 +109,31 @@ public class CachedIndexSearcherManager {
try {
// Mark as removed from cache
indexSearcher.removeFromCache();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception ex) {
logger.error("Failed to release an old cached IndexSearcher", ex);
}
});
}
@SuppressWarnings("unused")
private void scheduledQueryRefresh() {
try {
boolean refreshStarted = searcherManager.maybeRefresh();
// if refreshStarted == false, another thread is currently already refreshing
} catch (AlreadyClosedException ignored) {
} catch (IOException ex) {
ex.printStackTrace();
}
}
public void maybeRefreshBlocking() throws IOException {
try {
activeRefreshes.register();
searcherManager.maybeRefreshBlocking();
} catch (AlreadyClosedException ignored) {
} finally {
activeRefreshes.arriveAndDeregister();
}
}
public void maybeRefresh() throws IOException {
try {
activeRefreshes.register();
searcherManager.maybeRefresh();
} catch (AlreadyClosedException ignored) {
} finally {
activeRefreshes.arriveAndDeregister();
}
}
@ -171,8 +176,8 @@ public class CachedIndexSearcherManager {
try {
// Decrement reference count
indexSearcher.decUsage();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception ex) {
logger.error("Failed to release an used IndexSearcher", ex);
}
});
}
@ -182,7 +187,12 @@ public class CachedIndexSearcherManager {
.fromRunnable(this.closeRequested::tryEmitEmpty)
.then(refresherClosed.asMono())
.then(Mono.fromRunnable(() -> {
activeSearchers.arriveAndAwaitAdvance();
if (!activeRefreshes.isTerminated()) {
activeRefreshes.arriveAndAwaitAdvance();
}
if (!activeSearchers.isTerminated()) {
activeSearchers.arriveAndAwaitAdvance();
}
cachedSnapshotSearchers.invalidateAll();
cachedSnapshotSearchers.cleanUp();
}));

View File

@ -15,7 +15,6 @@ import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LuceneLocalSearcher;
@ -27,6 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexWriter;
@ -85,7 +85,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private final Directory directory;
private final boolean lowMemory;
private final ScheduledTaskLifecycle scheduledTasksLifecycle;
private final Phaser activeTasks = new Phaser(1);
public LLLocalLuceneIndex(@Nullable Path luceneBasePath,
String name,
@ -170,9 +170,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.lowMemory = lowMemory;
this.similarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
// Create scheduled tasks lifecycle manager
this.scheduledTasksLifecycle = new ScheduledTaskLifecycle();
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers));
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
indexWriterConfig.setIndexDeletionPolicy(snapshotter);
@ -195,7 +192,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
indexWriterConfig.setReaderPooling(false);
indexWriterConfig.setSimilarity(getSimilarity());
this.indexWriter = new IndexWriter(directory, indexWriterConfig);
this.snapshotsManager = new SnapshotsManager(indexWriter, snapshotter, scheduledTasksLifecycle);
this.snapshotsManager = new SnapshotsManager(indexWriter, snapshotter);
this.searcherManager = new CachedIndexSearcherManager(indexWriter,
snapshotsManager,
getSimilarity(),
@ -205,55 +202,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
);
// Start scheduled tasks
registerScheduledFixedTask(this::scheduledCommit, luceneOptions.commitDebounceTime());
var commitMillis = luceneOptions.commitDebounceTime().toMillis();
luceneHeavyTasksScheduler.schedulePeriodically(this::scheduledCommit, commitMillis, commitMillis,
TimeUnit.MILLISECONDS);
}
private Similarity getSimilarity() {
return similarity;
}
private void registerScheduledFixedTask(Runnable task, Duration duration) {
new PeriodicTask(task, duration).start();
}
private class PeriodicTask implements Runnable {
private final Runnable task;
private final Duration duration;
private volatile boolean cancelled = false;
public PeriodicTask(Runnable task, Duration duration) {
this.task = task;
this.duration = duration;
}
public void start() {
luceneHeavyTasksScheduler.schedule(this,
duration.toMillis(),
TimeUnit.MILLISECONDS
);
}
@Override
public void run() {
if (!scheduledTasksLifecycle.tryStartScheduledTask()) {
return;
}
try {
if (scheduledTasksLifecycle.isCancelled() || cancelled) return;
task.run();
if (scheduledTasksLifecycle.isCancelled() || cancelled) return;
luceneHeavyTasksScheduler.schedule(this, duration.toMillis(), TimeUnit.MILLISECONDS);
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
}
public void cancel() {
cancelled = true;
}
}
@Override
public String getLuceneIndexName() {
return luceneIndexName;
@ -272,12 +229,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> addDocument(LLTerm key, LLDocument doc) {
return Mono.fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
activeTasks.register();
try {
indexWriter.addDocument(LLUtils.toDocument(doc));
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
activeTasks.arriveAndDeregister();
}
});
}
@ -288,12 +245,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.collectList()
.flatMap(documentsList -> Mono
.fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
activeTasks.register();
try {
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
activeTasks.arriveAndDeregister();
}
})
);
@ -303,12 +260,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> deleteDocument(LLTerm id) {
return Mono.fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
activeTasks.register();
try {
indexWriter.deleteDocuments(LLUtils.toTerm(id));
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
activeTasks.arriveAndDeregister();
}
});
}
@ -316,11 +273,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> updateDocument(LLTerm id, LLDocument document) {
return Mono.fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
activeTasks.register();
try {
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document));
} finally {
scheduledTasksLifecycle.endScheduledTask();
activeTasks.arriveAndDeregister();
}
return null;
});
@ -334,7 +291,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private Mono<Void> updateDocuments(Map<LLTerm, LLDocument> documentsMap) {
return Mono
.fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
activeTasks.register();
try {
for (Entry<LLTerm, LLDocument> entry : documentsMap.entrySet()) {
LLTerm key = entry.getKey();
@ -343,7 +300,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
activeTasks.arriveAndDeregister();
}
});
}
@ -351,7 +308,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> deleteAll() {
return Mono.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
activeTasks.register();
try {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.deleteAll();
@ -361,7 +318,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
indexWriter.commit();
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
activeTasks.arriveAndDeregister();
}
}).subscribeOn(luceneHeavyTasksScheduler);
}
@ -492,7 +449,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono
.<Void>fromCallable(() -> {
logger.debug("Closing IndexWriter...");
scheduledTasksLifecycle.cancelAndWait();
activeTasks.arriveAndAwaitAdvance();
return null;
})
.subscribeOn(luceneHeavyTasksScheduler)
@ -511,13 +468,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public Mono<Void> flush() {
return Mono
.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
activeTasks.register();
try {
if (scheduledTasksLifecycle.isCancelled()) return null;
if (activeTasks.isTerminated()) return null;
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
} finally {
scheduledTasksLifecycle.endScheduledTask();
activeTasks.arriveAndDeregister();
}
return null;
})
@ -528,11 +485,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public Mono<Void> refresh(boolean force) {
return Mono
.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
activeTasks.register();
try {
if (scheduledTasksLifecycle.isCancelled()) return null;
if (activeTasks.isTerminated()) return null;
if (force) {
if (scheduledTasksLifecycle.isCancelled()) return null;
if (activeTasks.isTerminated()) return null;
//noinspection BlockingMethodInNonBlockingContext
searcherManager.maybeRefreshBlocking();
} else {
@ -540,7 +497,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
searcherManager.maybeRefresh();
}
} finally {
scheduledTasksLifecycle.endScheduledTask();
activeTasks.arriveAndDeregister();
}
return null;
})

View File

@ -1,10 +1,10 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
@ -17,7 +17,7 @@ public class SnapshotsManager {
private final IndexWriter indexWriter;
private final SnapshotDeletionPolicy snapshotter;
private final ScheduledTaskLifecycle scheduledTasksLifecycle;
private final Phaser activeTasks = new Phaser(1);
/**
* Last snapshot sequence number. 0 is not used
*/
@ -28,11 +28,9 @@ public class SnapshotsManager {
private final ConcurrentHashMap<Long, LuceneIndexSnapshot> snapshots = new ConcurrentHashMap<>();
public SnapshotsManager(IndexWriter indexWriter,
SnapshotDeletionPolicy snapshotter,
ScheduledTaskLifecycle scheduledTasksLifecycle) {
SnapshotDeletionPolicy snapshotter) {
this.indexWriter = indexWriter;
this.snapshotter = snapshotter;
this.scheduledTasksLifecycle = scheduledTasksLifecycle;
}
public LuceneIndexSnapshot resolveSnapshot(@Nullable LLSnapshot snapshot) {
@ -64,12 +62,12 @@ public class SnapshotsManager {
.defer(() -> {
if (ex instanceof IllegalStateException && "No index commit to snapshot".equals(ex.getMessage())) {
return Mono.fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
activeTasks.register();
try {
indexWriter.commit();
return snapshotter.snapshot();
} finally {
scheduledTasksLifecycle.endScheduledTask();
activeTasks.arriveAndDeregister();
}
});
} else {
@ -81,7 +79,7 @@ public class SnapshotsManager {
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
activeTasks.register();
try {
var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber());
if (indexSnapshot == null) {
@ -96,8 +94,14 @@ public class SnapshotsManager {
indexWriter.deleteUnusedFiles();
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
activeTasks.arriveAndDeregister();
}
}).subscribeOn(Schedulers.boundedElastic());
}
public void close() {
if (!activeTasks.isTerminated()) {
activeTasks.arriveAndAwaitAdvance();
}
}
}

View File

@ -1,63 +0,0 @@
package it.cavallium.dbengine.lucene;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.StampedLock;
import org.warp.commonutils.concurrency.atomicity.Atomic;
import reactor.core.Disposable;
@Atomic
public class ScheduledTaskLifecycle {
private final StampedLock lock;
private volatile boolean cancelled = false;
public ScheduledTaskLifecycle() {
this.lock = new StampedLock();
}
/**
* Mark this task as running.
* After calling this method, please call {@method endScheduledTask} inside a finally block!
*/
public void startScheduledTask() {
if (cancelled) {
throw new IllegalStateException("Already closed");
}
this.lock.readLock();
}
/**
* Mark this task as running.
* After calling this method, please call {@method endScheduledTask} inside a finally block!
* @return false if failed
*/
public boolean tryStartScheduledTask() {
if (cancelled) {
return false;
}
this.lock.readLock();
return true;
}
/**
* Mark this task as ended. Must be called after {@method startScheduledTask}
*/
public void endScheduledTask() {
this.lock.tryUnlockRead();
}
/**
* Cancel all scheduled tasks and wait all running methods to finish
*/
public void cancelAndWait() {
cancelled = true;
// Acquire a write lock to wait all tasks to end
lock.unlockWrite(lock.writeLock());
}
public boolean isCancelled() {
return cancelled;
}
}