Execute searches on its own executor
This commit is contained in:
parent
3742eedd7f
commit
d96b5a168b
|
@ -4,16 +4,15 @@ import com.google.common.cache.CacheBuilder;
|
||||||
import com.google.common.cache.CacheLoader;
|
import com.google.common.cache.CacheLoader;
|
||||||
import com.google.common.cache.LoadingCache;
|
import com.google.common.cache.LoadingCache;
|
||||||
import io.net5.buffer.api.Send;
|
import io.net5.buffer.api.Send;
|
||||||
import io.net5.buffer.api.internal.ResourceSupport;
|
|
||||||
import it.cavallium.dbengine.database.LLSnapshot;
|
import it.cavallium.dbengine.database.LLSnapshot;
|
||||||
|
import it.cavallium.dbengine.lucene.searcher.ExecutorSearcherFactory;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.UncheckedIOException;
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ForkJoinPool;
|
||||||
import java.util.concurrent.Phaser;
|
import java.util.concurrent.Phaser;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.function.Function;
|
|
||||||
import org.apache.lucene.index.IndexWriter;
|
import org.apache.lucene.index.IndexWriter;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.apache.lucene.search.SearcherFactory;
|
import org.apache.lucene.search.SearcherFactory;
|
||||||
|
@ -24,16 +23,16 @@ import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.publisher.Sinks;
|
import reactor.core.publisher.Sinks;
|
||||||
import reactor.core.publisher.Sinks.Empty;
|
import reactor.core.publisher.Sinks.Empty;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
import reactor.util.function.Tuples;
|
|
||||||
|
|
||||||
public class CachedIndexSearcherManager implements IndexSearcherManager {
|
public class CachedIndexSearcherManager implements IndexSearcherManager {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(CachedIndexSearcherManager.class);
|
private static final Logger logger = LoggerFactory.getLogger(CachedIndexSearcherManager.class);
|
||||||
|
private static final Executor SEARCH_EXECUTOR = ForkJoinPool.commonPool();
|
||||||
|
private static final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR);
|
||||||
|
|
||||||
private final SnapshotsManager snapshotsManager;
|
private final SnapshotsManager snapshotsManager;
|
||||||
private final Similarity similarity;
|
private final Similarity similarity;
|
||||||
|
@ -59,11 +58,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
|
||||||
this.similarity = similarity;
|
this.similarity = similarity;
|
||||||
this.queryRefreshDebounceTime = queryRefreshDebounceTime;
|
this.queryRefreshDebounceTime = queryRefreshDebounceTime;
|
||||||
|
|
||||||
this.searcherManager = new SearcherManager(indexWriter,
|
this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
|
||||||
applyAllDeletes,
|
|
||||||
writeAllDeletes,
|
|
||||||
new SearcherFactory()
|
|
||||||
);
|
|
||||||
|
|
||||||
Mono
|
Mono
|
||||||
.fromRunnable(() -> {
|
.fromRunnable(() -> {
|
||||||
|
@ -145,7 +140,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
|
||||||
indexSearcher = searcherManager.acquire();
|
indexSearcher = searcherManager.acquire();
|
||||||
decRef = true;
|
decRef = true;
|
||||||
} else {
|
} else {
|
||||||
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher();
|
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
|
||||||
decRef = false;
|
decRef = false;
|
||||||
}
|
}
|
||||||
indexSearcher.setSimilarity(similarity);
|
indexSearcher.setSimilarity(similarity);
|
||||||
|
@ -199,4 +194,5 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
|
||||||
public Mono<Void> close() {
|
public Mono<Void> close() {
|
||||||
return closeMono;
|
return closeMono;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,12 @@
|
||||||
package it.cavallium.dbengine.database.disk;
|
package it.cavallium.dbengine.database.disk;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ForkJoinPool;
|
||||||
import org.apache.lucene.index.DirectoryReader;
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
import org.apache.lucene.index.IndexCommit;
|
import org.apache.lucene.index.IndexCommit;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
public class LuceneIndexSnapshot {
|
public class LuceneIndexSnapshot {
|
||||||
|
@ -13,7 +16,6 @@ public class LuceneIndexSnapshot {
|
||||||
private boolean failed;
|
private boolean failed;
|
||||||
private boolean closed;
|
private boolean closed;
|
||||||
|
|
||||||
private DirectoryReader indexReader;
|
|
||||||
private IndexSearcher indexSearcher;
|
private IndexSearcher indexSearcher;
|
||||||
|
|
||||||
public LuceneIndexSnapshot(IndexCommit snapshot) {
|
public LuceneIndexSnapshot(IndexCommit snapshot) {
|
||||||
|
@ -28,21 +30,12 @@ public class LuceneIndexSnapshot {
|
||||||
* Can be called only if the snapshot has not been closed
|
* Can be called only if the snapshot has not been closed
|
||||||
* @throws IllegalStateException if closed or failed
|
* @throws IllegalStateException if closed or failed
|
||||||
*/
|
*/
|
||||||
public synchronized DirectoryReader getIndexReader() throws IllegalStateException {
|
public synchronized IndexSearcher getIndexSearcher(@Nullable Executor searchExecutor) throws IllegalStateException {
|
||||||
openDirectoryIfNeeded();
|
openDirectoryIfNeeded(searchExecutor);
|
||||||
return indexReader;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Can be called only if the snapshot has not been closed
|
|
||||||
* @throws IllegalStateException if closed or failed
|
|
||||||
*/
|
|
||||||
public synchronized IndexSearcher getIndexSearcher() throws IllegalStateException {
|
|
||||||
openDirectoryIfNeeded();
|
|
||||||
return indexSearcher;
|
return indexSearcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void openDirectoryIfNeeded() throws IllegalStateException {
|
private synchronized void openDirectoryIfNeeded(@Nullable Executor searchExecutor) throws IllegalStateException {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
throw new IllegalStateException("Snapshot is closed");
|
throw new IllegalStateException("Snapshot is closed");
|
||||||
}
|
}
|
||||||
|
@ -51,8 +44,8 @@ public class LuceneIndexSnapshot {
|
||||||
}
|
}
|
||||||
if (!initialized) {
|
if (!initialized) {
|
||||||
try {
|
try {
|
||||||
indexReader = DirectoryReader.open(snapshot);
|
var indexReader = DirectoryReader.open(snapshot);
|
||||||
indexSearcher = new IndexSearcher(indexReader);
|
indexSearcher = new IndexSearcher(indexReader, searchExecutor);
|
||||||
|
|
||||||
initialized = true;
|
initialized = true;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -66,9 +59,7 @@ public class LuceneIndexSnapshot {
|
||||||
closed = true;
|
closed = true;
|
||||||
|
|
||||||
if (initialized && !failed) {
|
if (initialized && !failed) {
|
||||||
indexReader.close();
|
indexSearcher.getIndexReader().close();
|
||||||
|
|
||||||
indexReader = null;
|
|
||||||
indexSearcher = null;
|
indexSearcher = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import it.cavallium.dbengine.database.LLSnapshot;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.Phaser;
|
import java.util.concurrent.Phaser;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.lucene.index.IndexCommit;
|
import org.apache.lucene.index.IndexCommit;
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
package it.cavallium.dbengine.lucene.searcher;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import org.apache.lucene.index.IndexReader;
|
||||||
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
|
import org.apache.lucene.search.SearcherFactory;
|
||||||
|
|
||||||
|
public class ExecutorSearcherFactory extends SearcherFactory {
|
||||||
|
|
||||||
|
private final Executor executor;
|
||||||
|
|
||||||
|
public ExecutorSearcherFactory(Executor executor) {
|
||||||
|
this.executor = executor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) {
|
||||||
|
return new IndexSearcher(reader, executor);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user