diff --git a/pom.xml b/pom.xml index cf46ce3..b1c2337 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ org.warp common-utils - 1.1.1 + 1.1.2 javax.xml.bind diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 8144fbf..6d18565 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -34,6 +34,7 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.jetbrains.annotations.Nullable; +import org.warp.commonutils.concurrency.executor.ScheduledTaskLifecycle; import org.warp.commonutils.functional.IOFunction; import org.warp.commonutils.type.ShortNamedThreadFactory; import it.cavallium.dbengine.database.LLDocument; @@ -59,14 +60,19 @@ import reactor.util.function.Tuples; public class LLLocalLuceneIndex implements LLLuceneIndex { private static final LuceneStreamSearcher streamSearcher = new AdaptiveStreamSearcher(); - private final Duration queryRefreshDebounceTime; - private final Duration commitDebounceTime; + /** + * Global lucene index scheduler. + * There is only a single thread globally to not overwhelm the disk with + * parallel commits or parallel refreshes. + */ + private static final ScheduledExecutorService scheduler + = Executors.newSingleThreadScheduledExecutor(new ShortNamedThreadFactory("Lucene")); + private final String luceneIndexName; private final SnapshotDeletionPolicy snapshotter; private final IndexWriter indexWriter; private final SearcherManager searcherManager; private final Directory directory; - private final AtomicLong lastSearcherRefresh = new AtomicLong(0); /** * Last snapshot sequence number. 0 is not used */ @@ -75,9 +81,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { * Snapshot seq no to index commit point */ private final ConcurrentHashMap snapshots = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduler; private final boolean lowMemory; + private final ScheduledTaskLifecycle scheduledTasksLifecycle; + public LLLocalLuceneIndex(Path luceneBasePath, String name, TextFieldsAnalyzer analyzer, @@ -105,20 +112,21 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } this.indexWriter = new IndexWriter(directory, indexWriterConfig); this.searcherManager = new SearcherManager(indexWriter, false, false, null); - this.queryRefreshDebounceTime = queryRefreshDebounceTime; - this.commitDebounceTime = commitDebounceTime; - this.lastSearcherRefresh.set(System.currentTimeMillis()); - this.scheduler = Executors.newSingleThreadScheduledExecutor(new ShortNamedThreadFactory("Lucene")); - scheduler.scheduleAtFixedRate(this::scheduledCommit, - commitDebounceTime.toMillis(), - commitDebounceTime.toMillis(), + + // Create scheduled tasks lifecycle manager + this.scheduledTasksLifecycle = new ScheduledTaskLifecycle(); + + // Start scheduled tasks + registerScheduledFixedTask(this::scheduledCommit, commitDebounceTime); + registerScheduledFixedTask(this::scheduledQueryRefresh, queryRefreshDebounceTime); + } + + private void registerScheduledFixedTask(Runnable task, Duration duration) { + scheduledTasksLifecycle.registerScheduledTask(scheduler.scheduleAtFixedRate(task, + duration.toMillis(), + duration.toMillis(), TimeUnit.MILLISECONDS - ); - scheduler.scheduleAtFixedRate(this::scheduledQueryRefresh, - queryRefreshDebounceTime.toMillis(), - queryRefreshDebounceTime.toMillis(), - TimeUnit.MILLISECONDS - ); + )); } @Override @@ -368,17 +376,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public void close() throws IOException { - scheduler.shutdown(); - try { - scheduler.awaitTermination(10, TimeUnit.MINUTES); - } catch (InterruptedException e) { - e.printStackTrace(); - } - if (!scheduler.isTerminated()) { - System.err.println("Terminating lucene scheduler"); - scheduler.shutdownNow(); - } - indexWriter.commit(); + scheduledTasksLifecycle.cancelAndWait(); indexWriter.close(); directory.close(); }