diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java index 63881b0..68229e1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java @@ -18,6 +18,8 @@ import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.AlreadyClosedException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -26,11 +28,14 @@ import reactor.core.scheduler.Schedulers; public class CachedIndexSearcherManager { + private static final Logger logger = LoggerFactory.getLogger(CachedIndexSearcherManager.class); + private final SnapshotsManager snapshotsManager; private final Similarity similarity; private final SearcherManager searcherManager; private final Duration queryRefreshDebounceTime; private final Phaser activeSearchers = new Phaser(1); + private final Phaser activeRefreshes = new Phaser(1); private final LoadingCache> cachedSnapshotSearchers; private final Mono cachedMainSearcher; @@ -55,7 +60,13 @@ public class CachedIndexSearcherManager { ); Mono - .fromRunnable(this::scheduledQueryRefresh) + .fromRunnable(() -> { + try { + maybeRefreshBlocking(); + } catch (Exception ex) { + logger.error("Failed to refresh the searcher manager", ex); + } + }) .repeatWhen(s -> s.delayElements(queryRefreshDebounceTime, Schedulers.boundedElastic())) .subscribeOn(Schedulers.boundedElastic()) .takeUntilOther(closeRequested.asMono()) @@ -98,37 +109,31 @@ public class CachedIndexSearcherManager { try { // Mark as removed from cache indexSearcher.removeFromCache(); - } catch (IOException e) { - e.printStackTrace(); + } catch (Exception ex) { + logger.error("Failed to release an old cached IndexSearcher", ex); } }); } - @SuppressWarnings("unused") - private void scheduledQueryRefresh() { - try { - boolean refreshStarted = searcherManager.maybeRefresh(); - // if refreshStarted == false, another thread is currently already refreshing - } catch (AlreadyClosedException ignored) { - - } catch (IOException ex) { - ex.printStackTrace(); - } - } - public void maybeRefreshBlocking() throws IOException { try { + activeRefreshes.register(); searcherManager.maybeRefreshBlocking(); } catch (AlreadyClosedException ignored) { + } finally { + activeRefreshes.arriveAndDeregister(); } } public void maybeRefresh() throws IOException { try { + activeRefreshes.register(); searcherManager.maybeRefresh(); } catch (AlreadyClosedException ignored) { + } finally { + activeRefreshes.arriveAndDeregister(); } } @@ -171,8 +176,8 @@ public class CachedIndexSearcherManager { try { // Decrement reference count indexSearcher.decUsage(); - } catch (IOException e) { - e.printStackTrace(); + } catch (Exception ex) { + logger.error("Failed to release an used IndexSearcher", ex); } }); } @@ -182,7 +187,12 @@ public class CachedIndexSearcherManager { .fromRunnable(this.closeRequested::tryEmitEmpty) .then(refresherClosed.asMono()) .then(Mono.fromRunnable(() -> { - activeSearchers.arriveAndAwaitAdvance(); + if (!activeRefreshes.isTerminated()) { + activeRefreshes.arriveAndAwaitAdvance(); + } + if (!activeSearchers.isTerminated()) { + activeSearchers.arriveAndAwaitAdvance(); + } cachedSnapshotSearchers.invalidateAll(); cachedSnapshotSearchers.cleanUp(); })); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 6082fb6..034a2d8 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -15,7 +15,6 @@ import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory; import it.cavallium.dbengine.lucene.LuceneUtils; -import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle; import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneLocalSearcher; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.LuceneLocalSearcher; @@ -27,6 +26,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import org.apache.lucene.index.ConcurrentMergeScheduler; import org.apache.lucene.index.IndexWriter; @@ -85,7 +85,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private final Directory directory; private final boolean lowMemory; - private final ScheduledTaskLifecycle scheduledTasksLifecycle; + private final Phaser activeTasks = new Phaser(1); public LLLocalLuceneIndex(@Nullable Path luceneBasePath, String name, @@ -170,9 +170,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { this.lowMemory = lowMemory; this.similarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities); - // Create scheduled tasks lifecycle manager - this.scheduledTasksLifecycle = new ScheduledTaskLifecycle(); - IndexWriterConfig indexWriterConfig = new IndexWriterConfig(LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers)); indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND); indexWriterConfig.setIndexDeletionPolicy(snapshotter); @@ -195,7 +192,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { indexWriterConfig.setReaderPooling(false); indexWriterConfig.setSimilarity(getSimilarity()); this.indexWriter = new IndexWriter(directory, indexWriterConfig); - this.snapshotsManager = new SnapshotsManager(indexWriter, snapshotter, scheduledTasksLifecycle); + this.snapshotsManager = new SnapshotsManager(indexWriter, snapshotter); this.searcherManager = new CachedIndexSearcherManager(indexWriter, snapshotsManager, getSimilarity(), @@ -205,55 +202,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { ); // Start scheduled tasks - registerScheduledFixedTask(this::scheduledCommit, luceneOptions.commitDebounceTime()); + var commitMillis = luceneOptions.commitDebounceTime().toMillis(); + luceneHeavyTasksScheduler.schedulePeriodically(this::scheduledCommit, commitMillis, commitMillis, + TimeUnit.MILLISECONDS); } private Similarity getSimilarity() { return similarity; } - private void registerScheduledFixedTask(Runnable task, Duration duration) { - new PeriodicTask(task, duration).start(); - } - - private class PeriodicTask implements Runnable { - - private final Runnable task; - private final Duration duration; - private volatile boolean cancelled = false; - - public PeriodicTask(Runnable task, Duration duration) { - this.task = task; - this.duration = duration; - } - - public void start() { - luceneHeavyTasksScheduler.schedule(this, - duration.toMillis(), - TimeUnit.MILLISECONDS - ); - } - - @Override - public void run() { - if (!scheduledTasksLifecycle.tryStartScheduledTask()) { - return; - } - try { - if (scheduledTasksLifecycle.isCancelled() || cancelled) return; - task.run(); - if (scheduledTasksLifecycle.isCancelled() || cancelled) return; - luceneHeavyTasksScheduler.schedule(this, duration.toMillis(), TimeUnit.MILLISECONDS); - } finally { - scheduledTasksLifecycle.endScheduledTask(); - } - } - - public void cancel() { - cancelled = true; - } - } - @Override public String getLuceneIndexName() { return luceneIndexName; @@ -272,12 +229,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono addDocument(LLTerm key, LLDocument doc) { return Mono.fromCallable(() -> { - scheduledTasksLifecycle.startScheduledTask(); + activeTasks.register(); try { indexWriter.addDocument(LLUtils.toDocument(doc)); return null; } finally { - scheduledTasksLifecycle.endScheduledTask(); + activeTasks.arriveAndDeregister(); } }); } @@ -288,12 +245,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .collectList() .flatMap(documentsList -> Mono .fromCallable(() -> { - scheduledTasksLifecycle.startScheduledTask(); + activeTasks.register(); try { indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList)); return null; } finally { - scheduledTasksLifecycle.endScheduledTask(); + activeTasks.arriveAndDeregister(); } }) ); @@ -303,12 +260,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono deleteDocument(LLTerm id) { return Mono.fromCallable(() -> { - scheduledTasksLifecycle.startScheduledTask(); + activeTasks.register(); try { indexWriter.deleteDocuments(LLUtils.toTerm(id)); return null; } finally { - scheduledTasksLifecycle.endScheduledTask(); + activeTasks.arriveAndDeregister(); } }); } @@ -316,11 +273,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono updateDocument(LLTerm id, LLDocument document) { return Mono.fromCallable(() -> { - scheduledTasksLifecycle.startScheduledTask(); + activeTasks.register(); try { indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)); } finally { - scheduledTasksLifecycle.endScheduledTask(); + activeTasks.arriveAndDeregister(); } return null; }); @@ -334,7 +291,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private Mono updateDocuments(Map documentsMap) { return Mono .fromCallable(() -> { - scheduledTasksLifecycle.startScheduledTask(); + activeTasks.register(); try { for (Entry entry : documentsMap.entrySet()) { LLTerm key = entry.getKey(); @@ -343,7 +300,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } return null; } finally { - scheduledTasksLifecycle.endScheduledTask(); + activeTasks.arriveAndDeregister(); } }); } @@ -351,7 +308,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono deleteAll() { return Mono.fromCallable(() -> { - scheduledTasksLifecycle.startScheduledTask(); + activeTasks.register(); try { //noinspection BlockingMethodInNonBlockingContext indexWriter.deleteAll(); @@ -361,7 +318,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { indexWriter.commit(); return null; } finally { - scheduledTasksLifecycle.endScheduledTask(); + activeTasks.arriveAndDeregister(); } }).subscribeOn(luceneHeavyTasksScheduler); } @@ -492,7 +449,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono .fromCallable(() -> { logger.debug("Closing IndexWriter..."); - scheduledTasksLifecycle.cancelAndWait(); + activeTasks.arriveAndAwaitAdvance(); return null; }) .subscribeOn(luceneHeavyTasksScheduler) @@ -511,13 +468,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { public Mono flush() { return Mono .fromCallable(() -> { - scheduledTasksLifecycle.startScheduledTask(); + activeTasks.register(); try { - if (scheduledTasksLifecycle.isCancelled()) return null; + if (activeTasks.isTerminated()) return null; //noinspection BlockingMethodInNonBlockingContext indexWriter.commit(); } finally { - scheduledTasksLifecycle.endScheduledTask(); + activeTasks.arriveAndDeregister(); } return null; }) @@ -528,11 +485,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { public Mono refresh(boolean force) { return Mono .fromCallable(() -> { - scheduledTasksLifecycle.startScheduledTask(); + activeTasks.register(); try { - if (scheduledTasksLifecycle.isCancelled()) return null; + if (activeTasks.isTerminated()) return null; if (force) { - if (scheduledTasksLifecycle.isCancelled()) return null; + if (activeTasks.isTerminated()) return null; //noinspection BlockingMethodInNonBlockingContext searcherManager.maybeRefreshBlocking(); } else { @@ -540,7 +497,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { searcherManager.maybeRefresh(); } } finally { - scheduledTasksLifecycle.endScheduledTask(); + activeTasks.arriveAndDeregister(); } return null; }) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java b/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java index ec163f3..0608400 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java @@ -1,10 +1,10 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.LLSnapshot; -import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle; import java.io.IOException; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; @@ -17,7 +17,7 @@ public class SnapshotsManager { private final IndexWriter indexWriter; private final SnapshotDeletionPolicy snapshotter; - private final ScheduledTaskLifecycle scheduledTasksLifecycle; + private final Phaser activeTasks = new Phaser(1); /** * Last snapshot sequence number. 0 is not used */ @@ -28,11 +28,9 @@ public class SnapshotsManager { private final ConcurrentHashMap snapshots = new ConcurrentHashMap<>(); public SnapshotsManager(IndexWriter indexWriter, - SnapshotDeletionPolicy snapshotter, - ScheduledTaskLifecycle scheduledTasksLifecycle) { + SnapshotDeletionPolicy snapshotter) { this.indexWriter = indexWriter; this.snapshotter = snapshotter; - this.scheduledTasksLifecycle = scheduledTasksLifecycle; } public LuceneIndexSnapshot resolveSnapshot(@Nullable LLSnapshot snapshot) { @@ -64,12 +62,12 @@ public class SnapshotsManager { .defer(() -> { if (ex instanceof IllegalStateException && "No index commit to snapshot".equals(ex.getMessage())) { return Mono.fromCallable(() -> { - scheduledTasksLifecycle.startScheduledTask(); + activeTasks.register(); try { indexWriter.commit(); return snapshotter.snapshot(); } finally { - scheduledTasksLifecycle.endScheduledTask(); + activeTasks.arriveAndDeregister(); } }); } else { @@ -81,7 +79,7 @@ public class SnapshotsManager { public Mono releaseSnapshot(LLSnapshot snapshot) { return Mono.fromCallable(() -> { - scheduledTasksLifecycle.startScheduledTask(); + activeTasks.register(); try { var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber()); if (indexSnapshot == null) { @@ -96,8 +94,14 @@ public class SnapshotsManager { indexWriter.deleteUnusedFiles(); return null; } finally { - scheduledTasksLifecycle.endScheduledTask(); + activeTasks.arriveAndDeregister(); } }).subscribeOn(Schedulers.boundedElastic()); } + + public void close() { + if (!activeTasks.isTerminated()) { + activeTasks.arriveAndAwaitAdvance(); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/ScheduledTaskLifecycle.java b/src/main/java/it/cavallium/dbengine/lucene/ScheduledTaskLifecycle.java deleted file mode 100644 index 4d64fa9..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/ScheduledTaskLifecycle.java +++ /dev/null @@ -1,63 +0,0 @@ -package it.cavallium.dbengine.lucene; - -import java.nio.channels.ClosedChannelException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.StampedLock; -import org.warp.commonutils.concurrency.atomicity.Atomic; -import reactor.core.Disposable; - -@Atomic -public class ScheduledTaskLifecycle { - - private final StampedLock lock; - private volatile boolean cancelled = false; - - public ScheduledTaskLifecycle() { - this.lock = new StampedLock(); - } - - /** - * Mark this task as running. - * After calling this method, please call {@method endScheduledTask} inside a finally block! - */ - public void startScheduledTask() { - if (cancelled) { - throw new IllegalStateException("Already closed"); - } - this.lock.readLock(); - } - - /** - * Mark this task as running. - * After calling this method, please call {@method endScheduledTask} inside a finally block! - * @return false if failed - */ - public boolean tryStartScheduledTask() { - if (cancelled) { - return false; - } - this.lock.readLock(); - return true; - } - - /** - * Mark this task as ended. Must be called after {@method startScheduledTask} - */ - public void endScheduledTask() { - this.lock.tryUnlockRead(); - } - - /** - * Cancel all scheduled tasks and wait all running methods to finish - */ - public void cancelAndWait() { - cancelled = true; - - // Acquire a write lock to wait all tasks to end - lock.unlockWrite(lock.writeLock()); - } - - public boolean isCancelled() { - return cancelled; - } -}