From 66fa853272b3654f827db0f31e47703c5cdaf9ef Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 7 Sep 2021 02:36:11 +0200 Subject: [PATCH] Avoid memory leaks --- .../database/disk/CachedIndexSearcher.java | 45 ++++++++++++++----- .../disk/CachedIndexSearcherManager.java | 23 ++++++++-- .../database/disk/LLLocalLuceneIndex.java | 6 ++- 3 files changed, 57 insertions(+), 17 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcher.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcher.java index 34d3d04..1458df1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcher.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcher.java @@ -5,9 +5,13 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.SearcherManager; import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CachedIndexSearcher { + private static final Logger logger = LoggerFactory.getLogger(CachedIndexSearcher.class); + private final IndexSearcher indexSearcher; private final SearcherManager associatedSearcherManager; private final Runnable afterFinalization; @@ -30,12 +34,14 @@ public class CachedIndexSearcher { public void decUsage() throws IOException { synchronized (this) { - usages--; - if (mustClose()) { - try { - close(); - } finally { - if (afterFinalization != null) afterFinalization.run(); + if (usages > 0) { + usages--; + if (mustClose()) { + try { + close(); + } finally { + if (afterFinalization != null) afterFinalization.run(); + } } } } @@ -43,12 +49,14 @@ public class CachedIndexSearcher { public void removeFromCache() throws IOException { synchronized (this) { - inCache = false; - if (mustClose()) { - try { - close(); - } finally { - if (afterFinalization != null) afterFinalization.run(); + if (inCache) { + inCache = false; + if (mustClose()) { + try { + close(); + } finally { + if (afterFinalization != null) afterFinalization.run(); + } } } } @@ -71,4 +79,17 @@ public class CachedIndexSearcher { public IndexSearcher getIndexSearcher() { return indexSearcher; } + + @Override + protected void finalize() throws Throwable { + if (usages > 0) { + logger.error("A cached index searcher has been garbage collected, but " + + usages + " usages have not been released"); + } + if (inCache) { + logger.error("A cached index searcher has been garbage collected, but it's marked" + + " as still actively cached"); + } + super.finalize(); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java index 68229e1..87d153a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java @@ -7,6 +7,7 @@ import it.cavallium.dbengine.database.LLSnapshot; import java.io.IOException; import java.time.Duration; import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; import java.util.function.Function; @@ -184,15 +185,31 @@ public class CachedIndexSearcherManager { public Mono close() { return Mono - .fromRunnable(this.closeRequested::tryEmitEmpty) + .fromRunnable(() -> { + logger.info("Closing IndexSearcherManager..."); + this.closeRequested.tryEmitEmpty(); + }) .then(refresherClosed.asMono()) .then(Mono.fromRunnable(() -> { + logger.info("Closed IndexSearcherManager"); + logger.info("Closing refreshes..."); if (!activeRefreshes.isTerminated()) { - activeRefreshes.arriveAndAwaitAdvance(); + try { + activeRefreshes.awaitAdvanceInterruptibly(activeRefreshes.arrive(), 15, TimeUnit.SECONDS); + } catch (Exception ex) { + logger.error("Failed to terminate active refreshes", ex); + } } + logger.info("Closed refreshes..."); + logger.info("Closing active searchers..."); if (!activeSearchers.isTerminated()) { - activeSearchers.arriveAndAwaitAdvance(); + try { + activeSearchers.awaitAdvanceInterruptibly(activeSearchers.arrive(), 15, TimeUnit.SECONDS); + } catch (Exception ex) { + logger.error("Failed to terminate active searchers", ex); + } } + logger.info("Closed active searchers"); cachedSnapshotSearchers.invalidateAll(); cachedSnapshotSearchers.cleanUp(); })); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 034a2d8..c7eeb69 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -448,18 +448,20 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { public Mono close() { return Mono .fromCallable(() -> { - logger.debug("Closing IndexWriter..."); + logger.info("Waiting IndexWriter tasks..."); activeTasks.arriveAndAwaitAdvance(); + logger.info("IndexWriter tasks ended"); return null; }) .subscribeOn(luceneHeavyTasksScheduler) .then(searcherManager.close()) .then(Mono.fromCallable(() -> { + logger.info("Closing IndexWriter..."); //noinspection BlockingMethodInNonBlockingContext indexWriter.close(); //noinspection BlockingMethodInNonBlockingContext directory.close(); - logger.debug("IndexWriter closed"); + logger.info("IndexWriter closed"); return null; }).subscribeOn(luceneHeavyTasksScheduler)); }