diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java index 0c6f050..aa5992d 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java @@ -4,16 +4,15 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import io.net5.buffer.api.Send; -import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.lucene.searcher.ExecutorSearcherFactory; import java.io.IOException; -import java.io.UncheckedIOException; import java.time.Duration; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.SearcherFactory; @@ -24,16 +23,16 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Empty; import reactor.core.scheduler.Schedulers; -import reactor.util.function.Tuples; public class CachedIndexSearcherManager implements IndexSearcherManager { private static final Logger logger = LoggerFactory.getLogger(CachedIndexSearcherManager.class); + private static final Executor SEARCH_EXECUTOR = ForkJoinPool.commonPool(); + private static final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR); private final SnapshotsManager snapshotsManager; private final Similarity similarity; @@ -59,11 +58,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { this.similarity = similarity; this.queryRefreshDebounceTime = queryRefreshDebounceTime; - this.searcherManager = new SearcherManager(indexWriter, - applyAllDeletes, - writeAllDeletes, - new SearcherFactory() - ); + this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY); Mono .fromRunnable(() -> { @@ -145,7 +140,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { indexSearcher = searcherManager.acquire(); decRef = true; } else { - indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(); + indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR); decRef = false; } indexSearcher.setSimilarity(similarity); @@ -199,4 +194,5 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { public Mono close() { return closeMono; } + } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java b/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java index 4ed387c..23197b8 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java @@ -1,9 +1,12 @@ package it.cavallium.dbengine.database.disk; import java.io.IOException; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.search.IndexSearcher; +import org.jetbrains.annotations.Nullable; @SuppressWarnings("unused") public class LuceneIndexSnapshot { @@ -13,7 +16,6 @@ public class LuceneIndexSnapshot { private boolean failed; private boolean closed; - private DirectoryReader indexReader; private IndexSearcher indexSearcher; public LuceneIndexSnapshot(IndexCommit snapshot) { @@ -28,21 +30,12 @@ public class LuceneIndexSnapshot { * Can be called only if the snapshot has not been closed * @throws IllegalStateException if closed or failed */ - public synchronized DirectoryReader getIndexReader() throws IllegalStateException { - openDirectoryIfNeeded(); - return indexReader; - } - - /** - * Can be called only if the snapshot has not been closed - * @throws IllegalStateException if closed or failed - */ - public synchronized IndexSearcher getIndexSearcher() throws IllegalStateException { - openDirectoryIfNeeded(); + public synchronized IndexSearcher getIndexSearcher(@Nullable Executor searchExecutor) throws IllegalStateException { + openDirectoryIfNeeded(searchExecutor); return indexSearcher; } - private synchronized void openDirectoryIfNeeded() throws IllegalStateException { + private synchronized void openDirectoryIfNeeded(@Nullable Executor searchExecutor) throws IllegalStateException { if (closed) { throw new IllegalStateException("Snapshot is closed"); } @@ -51,8 +44,8 @@ public class LuceneIndexSnapshot { } if (!initialized) { try { - indexReader = DirectoryReader.open(snapshot); - indexSearcher = new IndexSearcher(indexReader); + var indexReader = DirectoryReader.open(snapshot); + indexSearcher = new IndexSearcher(indexReader, searchExecutor); initialized = true; } catch (IOException e) { @@ -66,9 +59,7 @@ public class LuceneIndexSnapshot { closed = true; if (initialized && !failed) { - indexReader.close(); - - indexReader = null; + indexSearcher.getIndexReader().close(); indexSearcher = null; } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java b/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java index 0608400..79f801c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java @@ -4,6 +4,7 @@ import it.cavallium.dbengine.database.LLSnapshot; import java.io.IOException; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.index.IndexCommit; diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ExecutorSearcherFactory.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ExecutorSearcherFactory.java new file mode 100644 index 0000000..55b2d1f --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ExecutorSearcherFactory.java @@ -0,0 +1,20 @@ +package it.cavallium.dbengine.lucene.searcher; + +import java.util.concurrent.Executor; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.SearcherFactory; + +public class ExecutorSearcherFactory extends SearcherFactory { + + private final Executor executor; + + public ExecutorSearcherFactory(Executor executor) { + this.executor = executor; + } + + @Override + public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) { + return new IndexSearcher(reader, executor); + } +}