diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java index d13310c..5f3708a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java @@ -9,6 +9,7 @@ import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import java.io.IOException; +import java.lang.ref.Cleaner; import java.time.Duration; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -52,13 +53,15 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { private final AtomicLong activeSearchers = new AtomicLong(0); private final AtomicLong activeRefreshes = new AtomicLong(0); - private final LoadingCache>> cachedSnapshotSearchers; - private final Mono> cachedMainSearcher; + private final LoadingCache> cachedSnapshotSearchers; + private final Mono cachedMainSearcher; private final AtomicBoolean closeRequested = new AtomicBoolean(); private final Empty closeRequestedMono = Sinks.empty(); private final Mono closeMono; + private final Cleaner cleaner = Cleaner.create(); + public CachedIndexSearcherManager(IndexWriter indexWriter, SnapshotsManager snapshotsManager, Scheduler luceneHeavyTasksScheduler, @@ -96,7 +99,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { .maximumSize(3) .build(new CacheLoader<>() { @Override - public Mono> load(@NotNull LLSnapshot snapshot) { + public Mono load(@NotNull LLSnapshot snapshot) { return CachedIndexSearcherManager.this.generateCachedSearcher(snapshot); } }); @@ -141,25 +144,42 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { .cache(); } - private Mono> generateCachedSearcher(@Nullable LLSnapshot snapshot) { + private Mono generateCachedSearcher(@Nullable LLSnapshot snapshot) { return Mono.fromCallable(() -> { - if (closeRequested.get()) { - return null; + if (closeRequested.get()) { + return null; + } + activeSearchers.incrementAndGet(); + IndexSearcher indexSearcher; + boolean fromSnapshot; + if (snapshot == null) { + indexSearcher = searcherManager.acquire(); + fromSnapshot = false; + } else { + indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(searchExecutor); + fromSnapshot = true; + } + indexSearcher.setSimilarity(similarity); + assert indexSearcher.getIndexReader().getRefCount() > 0; + LLIndexSearcher llIndexSearcher; + if (fromSnapshot) { + llIndexSearcher = new SnapshotIndexSearcher(indexSearcher); + } else { + var released = new AtomicBoolean(); + llIndexSearcher = new MainIndexSearcher(indexSearcher, released); + cleaner.register(llIndexSearcher, () -> { + if (released.compareAndSet(false, true)) { + logger.warn("An index searcher was not closed!"); + try { + searcherManager.release(indexSearcher); + } catch (IOException e) { + logger.error("Failed to release the index searcher", e); + } } - activeSearchers.incrementAndGet(); - IndexSearcher indexSearcher; - boolean decRef; - if (snapshot == null) { - indexSearcher = searcherManager.acquire(); - decRef = true; - } else { - indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(searchExecutor); - decRef = false; - } - indexSearcher.setSimilarity(similarity); - assert indexSearcher.getIndexReader().getRefCount() > 0; - return new LLIndexSearcher(indexSearcher, decRef, this::dropCachedIndexSearcher).send(); }); + } + return llIndexSearcher; + }); } private void dropCachedIndexSearcher() { @@ -192,7 +212,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { } @Override - public Mono> retrieveSearcher(@Nullable LLSnapshot snapshot) { + public Mono retrieveSearcher(@Nullable LLSnapshot snapshot) { if (snapshot == null) { return this.cachedMainSearcher; } else { @@ -212,4 +232,34 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { public long getActiveRefreshes() { return activeRefreshes.get(); } + + private class MainIndexSearcher extends LLIndexSearcher { + + private final AtomicBoolean released; + + public MainIndexSearcher(IndexSearcher indexSearcher, AtomicBoolean released) { + super(indexSearcher); + this.released = released; + } + + @Override + public void onClose() throws IOException { + dropCachedIndexSearcher(); + if (released.compareAndSet(false, true)) { + searcherManager.release(indexSearcher); + } + } + } + + private class SnapshotIndexSearcher extends LLIndexSearcher { + + public SnapshotIndexSearcher(IndexSearcher indexSearcher) { + super(indexSearcher); + } + + @Override + public void onClose() { + dropCachedIndexSearcher(); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java index 8584a3f..c5973b3 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java @@ -15,7 +15,7 @@ public interface IndexSearcherManager { void maybeRefresh() throws IOException; - Mono> retrieveSearcher(@Nullable LLSnapshot snapshot); + Mono retrieveSearcher(@Nullable LLSnapshot snapshot); Mono close(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java index 597db38..38a8b5e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java @@ -3,79 +3,42 @@ package it.cavallium.dbengine.database.disk; import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Owned; import io.netty5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.database.SafeCloseable; +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.IndexSearcher; -public class LLIndexSearcher extends ResourceSupport { +public abstract class LLIndexSearcher implements Closeable { - private static final Logger logger = LogManager.getLogger(LLIndexSearcher.class); + protected static final Logger LOG = LogManager.getLogger(LLIndexSearcher.class); - private static final Drop DROP = new Drop<>() { - @Override - public void drop(LLIndexSearcher obj) { - try { - if (obj.onClose != null) { - obj.onClose.run(); - } - } catch (Throwable ex) { - logger.error("Failed to close onClose", ex); - } - } + protected final IndexSearcher indexSearcher; + private final AtomicBoolean closed = new AtomicBoolean(); - @Override - public Drop fork() { - return this; - } - - @Override - public void attach(LLIndexSearcher obj) { - - } - }; - - private IndexSearcher indexSearcher; - private final boolean decRef; - - private Runnable onClose; - - public LLIndexSearcher(IndexSearcher indexSearcher, boolean decRef, Runnable onClose) { - super(DROP); + public LLIndexSearcher(IndexSearcher indexSearcher) { this.indexSearcher = indexSearcher; - this.decRef = decRef; - this.onClose = onClose; } public IndexReader getIndexReader() { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("LLIndexSearcher must be owned to be used")); - } + if (closed.get()) throw new IllegalStateException("Closed"); return indexSearcher.getIndexReader(); } public IndexSearcher getIndexSearcher() { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("LLIndexSearcher must be owned to be used")); - } + if (closed.get()) throw new IllegalStateException("Closed"); return indexSearcher; } @Override - protected RuntimeException createResourceClosedException() { - return new IllegalStateException("Closed"); - } - - @Override - protected Owned prepareSend() { - var indexSearcher = this.indexSearcher; - var onClose = this.onClose; - return drop -> new LLIndexSearcher(indexSearcher, decRef, onClose); - } - - protected void makeInaccessible() { - this.indexSearcher = null; - this.onClose = null; + public final void close() throws IOException { + if (closed.compareAndSet(false, true)) { + onClose(); + } } + protected abstract void onClose() throws IOException; } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java index 81b8d7d..19579f3 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java @@ -6,6 +6,7 @@ import io.netty5.buffer.api.Resource; import io.netty5.buffer.api.Send; import io.netty5.buffer.api.internal.ResourceSupport; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; @@ -18,64 +19,32 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.MultiReader; import org.apache.lucene.search.IndexSearcher; -public interface LLIndexSearchers extends Resource { +public interface LLIndexSearchers extends Closeable { - static LLIndexSearchers of(List> indexSearchers) { - return new ShardedIndexSearchers(indexSearchers, null); + static LLIndexSearchers of(List indexSearchers) { + return new ShardedIndexSearchers(indexSearchers); } - static UnshardedIndexSearchers unsharded(Send indexSearcher) { - return new UnshardedIndexSearchers(indexSearcher, null); + static UnshardedIndexSearchers unsharded(LLIndexSearcher indexSearcher) { + return new UnshardedIndexSearchers(indexSearcher); } List shards(); + List llShards(); + IndexSearcher shard(int shardIndex); + LLIndexSearcher llShard(int shardIndex); + IndexReader allShards(); - class UnshardedIndexSearchers extends ResourceSupport - implements LLIndexSearchers { + class UnshardedIndexSearchers implements LLIndexSearchers { - private static final Logger logger = LogManager.getLogger(UnshardedIndexSearchers.class); + private final LLIndexSearcher indexSearcher; - private static final Drop DROP = new Drop<>() { - @Override - public void drop(UnshardedIndexSearchers obj) { - try { - if (obj.indexSearcher != null) { - obj.indexSearcher.close(); - } - } catch (Throwable ex) { - logger.error("Failed to close indexSearcher", ex); - } - try { - if (obj.onClose != null) { - obj.onClose.run(); - } - } catch (Throwable ex) { - logger.error("Failed to close onClose", ex); - } - } - - @Override - public Drop fork() { - return this; - } - - @Override - public void attach(UnshardedIndexSearchers obj) { - - } - }; - - private LLIndexSearcher indexSearcher; - private Runnable onClose; - - public UnshardedIndexSearchers(Send indexSearcher, Runnable onClose) { - super(DROP); - this.indexSearcher = indexSearcher.receive(); - this.onClose = onClose; + public UnshardedIndexSearchers(LLIndexSearcher indexSearcher) { + this.indexSearcher = indexSearcher; } @Override @@ -83,17 +52,27 @@ public interface LLIndexSearchers extends Resource { return List.of(indexSearcher.getIndexSearcher()); } + @Override + public List llShards() { + return Collections.singletonList(indexSearcher); + } + @Override public IndexSearcher shard(int shardIndex) { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("UnshardedIndexSearchers must be owned to be used")); - } if (shardIndex != -1) { throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid, this is a unsharded index"); } return indexSearcher.getIndexSearcher(); } + @Override + public LLIndexSearcher llShard(int shardIndex) { + if (shardIndex != -1) { + throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid, this is a unsharded index"); + } + return indexSearcher; + } + @Override public IndexReader allShards() { return indexSearcher.getIndexReader(); @@ -103,92 +82,42 @@ public interface LLIndexSearchers extends Resource { return this.shard(-1); } - @Override - protected RuntimeException createResourceClosedException() { - return new IllegalStateException("Closed"); + public LLIndexSearcher llShard() { + return this.llShard(-1); } @Override - protected Owned prepareSend() { - Send indexSearcher = this.indexSearcher.send(); - var onClose = this.onClose; - return drop -> { - var instance = new UnshardedIndexSearchers(indexSearcher, onClose); - drop.attach(instance); - return instance; - }; - } - - protected void makeInaccessible() { - this.indexSearcher = null; - this.onClose = null; + public void close() throws IOException { + indexSearcher.close(); } } - class ShardedIndexSearchers extends ResourceSupport - implements LLIndexSearchers { + class ShardedIndexSearchers implements LLIndexSearchers { - private static final Logger logger = LogManager.getLogger(ShardedIndexSearchers.class); + private final List indexSearchers; + private final List indexSearchersVals; - private static final Drop DROP = new Drop<>() { - @Override - public void drop(ShardedIndexSearchers obj) { - try { - for (LLIndexSearcher indexSearcher : obj.indexSearchers) { - indexSearcher.close(); - } - } catch (Throwable ex) { - logger.error("Failed to close indexSearcher", ex); - } - try { - if (obj.onClose != null) { - obj.onClose.run(); - } - } catch (Throwable ex) { - logger.error("Failed to close onClose", ex); - } - } - - @Override - public Drop fork() { - return this; - } - - @Override - public void attach(ShardedIndexSearchers obj) { - - } - }; - - private List indexSearchers; - private List indexSearchersVals; - private Runnable onClose; - - public ShardedIndexSearchers(List> indexSearchers, Runnable onClose) { - super(DROP); + public ShardedIndexSearchers(List indexSearchers) { this.indexSearchers = new ArrayList<>(indexSearchers.size()); this.indexSearchersVals = new ArrayList<>(indexSearchers.size()); - for (Send llIndexSearcher : indexSearchers) { - var indexSearcher = llIndexSearcher.receive(); + for (LLIndexSearcher indexSearcher : indexSearchers) { this.indexSearchers.add(indexSearcher); this.indexSearchersVals.add(indexSearcher.getIndexSearcher()); } - this.onClose = onClose; } @Override public List shards() { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("ShardedIndexSearchers must be owned to be used")); - } return Collections.unmodifiableList(indexSearchersVals); } + @Override + public List llShards() { + return Collections.unmodifiableList(indexSearchers); + } + @Override public IndexSearcher shard(int shardIndex) { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("ShardedIndexSearchers must be owned to be used")); - } if (shardIndex < 0) { throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid"); } @@ -196,10 +125,15 @@ public interface LLIndexSearchers extends Resource { } @Override - public IndexReader allShards() { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("ShardedIndexSearchers must be owned to be used")); + public LLIndexSearcher llShard(int shardIndex) { + if (shardIndex < 0) { + throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid"); } + return indexSearchers.get(shardIndex); + } + + @Override + public IndexReader allShards() { var irs = new IndexReader[indexSearchersVals.size()]; for (int i = 0, s = indexSearchersVals.size(); i < s; i++) { irs[i] = indexSearchersVals.get(i).getIndexReader(); @@ -217,28 +151,10 @@ public interface LLIndexSearchers extends Resource { } @Override - protected RuntimeException createResourceClosedException() { - return new IllegalStateException("Closed"); - } - - @Override - protected Owned prepareSend() { - List> indexSearchers = new ArrayList<>(this.indexSearchers.size()); - for (LLIndexSearcher indexSearcher : this.indexSearchers) { - indexSearchers.add(indexSearcher.send()); + public void close() throws IOException { + for (LLIndexSearcher indexSearcher : indexSearchers) { + indexSearcher.close(); } - var onClose = this.onClose; - return drop -> { - var instance = new ShardedIndexSearchers(indexSearchers, onClose); - drop.attach(instance); - return instance; - }; - } - - protected void makeInaccessible() { - this.indexSearchers = null; - this.indexSearchersVals = null; - this.onClose = null; } } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index bc21e7a..4a92467 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -27,6 +27,7 @@ import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLUpdateFields; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexSearchers.UnshardedIndexSearchers; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.LuceneRocksDBManager; import it.cavallium.dbengine.lucene.LuceneUtils; @@ -510,14 +511,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { localQueries.add(QueryParser.toQuery(query, luceneAnalyzer)); } var localNormalizationQuery = QueryParser.toQuery(normalizationQuery, luceneAnalyzer); - var searchers = searcherManager + Mono searchers = searcherManager .retrieveSearcher(snapshot) - .map(indexSearcher -> LLIndexSearchers.unsharded(indexSearcher).send()); + .map(LLIndexSearchers::unsharded); return decimalBucketMultiSearcher.collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery); } - public Mono> retrieveSearcher(@Nullable LLSnapshot snapshot) { + public Mono retrieveSearcher(@Nullable LLSnapshot snapshot) { return searcherManager.retrieveSearcher(snapshot); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 66e0597..97e812e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -146,7 +146,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { return clusterName; } - private Mono> getIndexSearchers(LLSnapshot snapshot) { + private Mono getIndexSearchers(LLSnapshot snapshot) { return luceneIndicesFlux .index() // Resolve the snapshot of each shard @@ -155,7 +155,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { .flatMap(luceneSnapshot -> tuple.getT2().retrieveSearcher(luceneSnapshot.orElse(null))) ) .collectList() - .map(searchers -> LLIndexSearchers.of(searchers).send()); + .map(LLIndexSearchers::of); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java index 35ba530..3f967d0 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java @@ -12,6 +12,7 @@ import java.io.IOException; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuples; public class AdaptiveLocalSearcher implements LocalSearcher { @@ -41,28 +42,22 @@ public class AdaptiveLocalSearcher implements LocalSearcher { } @Override - public Mono collect(Mono> indexSearcher, + public Mono collect(Mono indexSearcherMono, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - Mono> indexSearchersMono = indexSearcher - .map(LLIndexSearchers::unsharded) - .map(ResourceSupport::send); + return indexSearcherMono.flatMap(indexSearcher -> { + var indexSearchers = LLIndexSearchers.unsharded(indexSearcher); - if (transformer == NO_REWRITE) { - return transformedCollect(indexSearcher, queryParams, keyFieldName, transformer); - } else { - return indexSearchersMono - .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .handle((indexSearchers, sink) -> { - try { - sink.next(transformer.rewrite(indexSearchers.receive(), queryParams)); - } catch (IOException ex) { - sink.error(ex); - } - }) - .flatMap(queryParams2 -> transformedCollect(indexSearcher, queryParams2, keyFieldName, NO_REWRITE)); - } + if (transformer == NO_REWRITE) { + return transformedCollect(indexSearcher, queryParams, keyFieldName, transformer); + } else { + return Mono + .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) + .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .flatMap(queryParams2 -> transformedCollect(indexSearcher, queryParams2, keyFieldName, NO_REWRITE)); + } + }); } @Override @@ -71,7 +66,7 @@ public class AdaptiveLocalSearcher implements LocalSearcher { } // Remember to change also AdaptiveMultiSearcher - public Mono transformedCollect(Mono> indexSearcher, + public Mono transformedCollect(LLIndexSearcher indexSearcher, LocalQueryParams queryParams, String keyFieldName, GlobalQueryRewrite transformer) { @@ -81,36 +76,36 @@ public class AdaptiveLocalSearcher implements LocalSearcher { = Math.max(maxInMemoryResultEntries, (long) queryParams.pageLimits().getPageLimit(0)); if (queryParams.limitLong() == 0) { - return countSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer); + return countSearcher.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); } else if (realLimit <= maxInMemoryResultEntries) { - return standardSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer); + return standardSearcher.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); } else if (queryParams.isSorted()) { if (realLimit <= maxAllowedInMemoryLimit) { - return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer); + return scoredPaged.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); } else { if (queryParams.isSortedByScore()) { if (queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } if (sortedByScoreFull != null) { - return sortedByScoreFull.collect(indexSearcher, queryParams, keyFieldName, transformer); + return sortedByScoreFull.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); } else { - return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer); + return scoredPaged.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); } } else { if (queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } if (sortedScoredFull != null) { - return sortedScoredFull.collect(indexSearcher, queryParams, keyFieldName, transformer); + return sortedScoredFull.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); } else { - return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer); + return scoredPaged.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); } } } } else { // Run large/unbounded searches using the continuous multi searcher - return unsortedUnscoredContinuous.collect(indexSearcher, queryParams, keyFieldName, transformer); + return unsortedUnscoredContinuous.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); } } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java index 5a77369..94bcaf9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java @@ -40,28 +40,23 @@ public class AdaptiveMultiSearcher implements MultiSearcher { } @Override - public Mono collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono indexSearchersMono, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - if (transformer == NO_REWRITE) { - return transformedCollectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); - } else { - return indexSearchersMono - .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .handle((indexSearchers, sink) -> { - try { - sink.next(transformer.rewrite(indexSearchers.receive(), queryParams)); - } catch (IOException ex) { - sink.error(ex); - } - }) - .flatMap(queryParams2 -> transformedCollectMulti(indexSearchersMono, queryParams2, keyFieldName, NO_REWRITE)); - } + return indexSearchersMono.flatMap(indexSearchers -> { + if (transformer == NO_REWRITE) { + return transformedCollectMulti(indexSearchers, queryParams, keyFieldName, transformer); + } else { + return Mono.fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) + .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .flatMap(queryParams2 -> transformedCollectMulti(indexSearchers, queryParams2, keyFieldName, NO_REWRITE)); + } + }); } // Remember to change also AdaptiveLocalSearcher - public Mono transformedCollectMulti(Mono> indexSearchersMono, + public Mono transformedCollectMulti(LLIndexSearchers indexSearchers, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { @@ -70,40 +65,38 @@ public class AdaptiveMultiSearcher implements MultiSearcher { long maxAllowedInMemoryLimit = Math.max(maxInMemoryResultEntries, (long) queryParams.pageLimits().getPageLimit(0)); - return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { - if (queryParams.limitLong() == 0) { - return count.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); - } else if (realLimit <= maxInMemoryResultEntries) { - return standardSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); - } else if (queryParams.isSorted()) { - if (realLimit <= maxAllowedInMemoryLimit) { - return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); - } else { - if (queryParams.isSortedByScore()) { - if (queryParams.limitLong() < maxInMemoryResultEntries) { - throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); - } - if (sortedByScoreFull != null) { - return sortedByScoreFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); - } else { - return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); - } + if (queryParams.limitLong() == 0) { + return count.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + } else if (realLimit <= maxInMemoryResultEntries) { + return standardSearcher.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + } else if (queryParams.isSorted()) { + if (realLimit <= maxAllowedInMemoryLimit) { + return scoredPaged.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + } else { + if (queryParams.isSortedByScore()) { + if (queryParams.limitLong() < maxInMemoryResultEntries) { + throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); + } + if (sortedByScoreFull != null) { + return sortedByScoreFull.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); } else { - if (queryParams.limitLong() < maxInMemoryResultEntries) { - throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); - } - if (sortedScoredFull != null) { - return sortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); - } else { - return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); - } + return scoredPaged.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + } + } else { + if (queryParams.limitLong() < maxInMemoryResultEntries) { + throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); + } + if (sortedScoredFull != null) { + return sortedScoredFull.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + } else { + return scoredPaged.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); } } - } else { - // Run large/unbounded searches using the continuous multi searcher - return unsortedUnscoredContinuous.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } - }, true); + } else { + // Run large/unbounded searches using the continuous multi searcher + return unsortedUnscoredContinuous.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + } } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java index 19f89f7..0ca09bc 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java @@ -11,6 +11,8 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -18,71 +20,72 @@ import reactor.core.scheduler.Schedulers; public class CountMultiSearcher implements MultiSearcher { + protected static final Logger LOG = LogManager.getLogger(CountMultiSearcher.class); @Override - public Mono collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, GlobalQueryRewrite transformer) { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = indexSearchersMono - .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .handle((indexSearchers, sink) -> { - try { - sink.next(transformer.rewrite(indexSearchers.receive(), queryParams)); - } catch (IOException ex) { - sink.error(ex); - } - }); - } + return indexSearchersMono.flatMap(indexSearchers -> { + Mono queryParamsMono; + if (transformer == GlobalQueryRewrite.NO_REWRITE) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = Mono + .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) + .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); + } - return queryParamsMono.flatMap(queryParams2 -> LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { - var localQueryParams = getLocalQueryParams(queryParams2); - return Mono.fromRunnable(() -> { - if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { - throw new UnsupportedOperationException( - "Sorted queries are not supported by SimpleUnsortedUnscoredLuceneMultiSearcher"); - } - if (queryParams2.needsScores() && queryParams2.limitLong() > 0) { - throw new UnsupportedOperationException( - "Scored queries are not supported by SimpleUnsortedUnscoredLuceneMultiSearcher"); - } - }).thenMany(Flux.fromIterable(indexSearchers.shards())).flatMap(searcher -> { - var llSearcher = Mono.fromCallable(() -> new LLIndexSearcher(searcher, false, null).send()); - return this.collect(llSearcher, localQueryParams, keyFieldName, transformer); - }).collectList().map(results -> { - List resultsToDrop = new ArrayList<>(results.size()); - List> resultsFluxes = new ArrayList<>(results.size()); - boolean exactTotalHitsCount = true; - long totalHitsCountValue = 0; - for (LuceneSearchResult result : results) { - resultsToDrop.add(result); - resultsFluxes.add(result.results()); - exactTotalHitsCount &= result.totalHitsCount().exact(); - totalHitsCountValue += result.totalHitsCount().value(); - } + return queryParamsMono.flatMap(queryParams2 -> { + var localQueryParams = getLocalQueryParams(queryParams2); + return Mono + .fromRunnable(() -> { + if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { + throw new UnsupportedOperationException( + "Sorted queries are not supported by SimpleUnsortedUnscoredLuceneMultiSearcher"); + } + if (queryParams2.needsScores() && queryParams2.limitLong() > 0) { + throw new UnsupportedOperationException( + "Scored queries are not supported by SimpleUnsortedUnscoredLuceneMultiSearcher"); + } + }) + .thenMany(Flux.fromIterable(indexSearchers.llShards())) + .flatMap(searcher -> this.collect(Mono.just(searcher), localQueryParams, keyFieldName, transformer)) + .collectList() + .map(results -> { + List resultsToDrop = new ArrayList<>(results.size()); + List> resultsFluxes = new ArrayList<>(results.size()); + boolean exactTotalHitsCount = true; + long totalHitsCountValue = 0; + for (LuceneSearchResult result : results) { + resultsToDrop.add(result); + resultsFluxes.add(result.results()); + exactTotalHitsCount &= result.totalHitsCount().exact(); + totalHitsCountValue += result.totalHitsCount().value(); + } - var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount); - Flux mergedFluxes = Flux - .merge(resultsFluxes) - .skip(queryParams2.offsetLong()) - .take(queryParams2.limitLong(), true); + var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount); + Flux mergedFluxes = Flux + .merge(resultsFluxes) + .skip(queryParams2.offsetLong()) + .take(queryParams2.limitLong(), true); - return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { - for (LuceneSearchResult luceneSearchResult : resultsToDrop) { - if (luceneSearchResult.isAccessible()) { - luceneSearchResult.close(); - } - } - if (indexSearchers.isAccessible()) { - indexSearchers.close(); - } - }); + return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { + for (LuceneSearchResult luceneSearchResult : resultsToDrop) { + if (luceneSearchResult.isAccessible()) { + luceneSearchResult.close(); + } + } + try { + indexSearchers.close(); + } catch (IOException e) { + LOG.error("Can't close index searchers", e); + } + }); + }); }); - }, false)); + }); } private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) { @@ -97,37 +100,35 @@ public class CountMultiSearcher implements MultiSearcher { } @Override - public Mono collect(Mono> indexSearcherMono, + public Mono collect(Mono indexSearcherMono, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - return Mono - .usingWhen( - indexSearcherMono, - indexSearcher -> { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = Mono - .fromCallable(() -> transformer.rewrite(LLIndexSearchers.unsharded(indexSearcher), queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); - } + return indexSearcherMono.flatMap(indexSearcher -> { + Mono queryParamsMono; + if (transformer == GlobalQueryRewrite.NO_REWRITE) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = Mono + .fromCallable(() -> transformer.rewrite(LLIndexSearchers.unsharded(indexSearcher), queryParams)) + .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); + } - return queryParamsMono - .flatMap(queryParams2 -> Mono.fromCallable(() -> { - try (var is = indexSearcher.receive()) { - LLUtils.ensureBlocking(); - - return (long) is.getIndexSearcher().count(queryParams2.query()); - } - }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) - .publishOn(Schedulers.parallel()) - .transform(TimeoutUtil.timeoutMono(queryParams.timeout())); - }, - is -> Mono.empty() - ) - .map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), null)); + return queryParamsMono + .flatMap(queryParams2 -> Mono.fromCallable(() -> { + LLUtils.ensureBlocking(); + return (long) indexSearcher.getIndexSearcher().count(queryParams2.query()); + }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) + .publishOn(Schedulers.parallel()) + .transform(TimeoutUtil.timeoutMono(queryParams.timeout())) + .map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), () -> { + try { + indexSearcher.close(); + } catch (IOException e) { + LOG.error("Can't close index searchers", e); + } + })); + }); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java index 1af74ca..ab60fc8 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java @@ -14,22 +14,25 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class DecimalBucketMultiSearcher { protected static final Logger logger = LogManager.getLogger(DecimalBucketMultiSearcher.class); - public Mono collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono indexSearchersMono, BucketParams bucketParams, @NotNull List queries, @Nullable Query normalizationQuery) { - return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this - // Search results - .search(indexSearchers.shards(), bucketParams, queries, normalizationQuery) - // Ensure that one result is always returned - .single(), - true); + return Mono.usingWhen(indexSearchersMono, indexSearchers -> this + // Search results + .search(indexSearchers.shards(), bucketParams, queries, normalizationQuery) + // Ensure that one result is always returned + .single(), indexSearchers -> Mono.fromCallable(() -> { + indexSearchers.close(); + return null; + }).subscribeOn(Schedulers.boundedElastic())); } private Mono search(Iterable indexSearchers, diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalSearcher.java index 437eef8..1e29bf4 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalSearcher.java @@ -13,7 +13,7 @@ public interface LocalSearcher { * @param keyFieldName the name of the key field * @param transformer the search query transformer */ - Mono collect(Mono> indexSearcherMono, + Mono collect(Mono indexSearcherMono, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/MultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/MultiSearcher.java index 3a2dc4c..08caf45 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/MultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/MultiSearcher.java @@ -14,7 +14,7 @@ public interface MultiSearcher extends LocalSearcher { * @param keyFieldName the name of the key field * @param transformer the search query transformer */ - Mono collectMulti(Mono> indexSearchersMono, + Mono collectMulti(Mono indexSearchersMono, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer); @@ -26,11 +26,11 @@ public interface MultiSearcher extends LocalSearcher { * @param transformer the search query transformer */ @Override - default Mono collect(Mono> indexSearcherMono, + default Mono collect(Mono indexSearcherMono, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - var searchers = indexSearcherMono.map(a -> LLIndexSearchers.unsharded(a).send()); + Mono searchers = indexSearcherMono.map(LLIndexSearchers::unsharded); return this.collectMulti(searchers, queryParams, keyFieldName, transformer); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java index 0d54904..a4891a6 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java @@ -15,6 +15,8 @@ import it.cavallium.dbengine.lucene.collector.TopDocsCollectorMultiManager; import java.io.IOException; import java.util.Arrays; import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; @@ -28,47 +30,52 @@ import reactor.core.scheduler.Schedulers; public class PagedLocalSearcher implements LocalSearcher { + private static final Logger LOG = LogManager.getLogger(PagedLocalSearcher.class); + @Override - public Mono collect(Mono> indexSearcherMono, + public Mono collect(Mono indexSearcherMono, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { PaginationInfo paginationInfo = getPaginationInfo(queryParams); - var indexSearchersMono = indexSearcherMono.map(LLIndexSearchers::unsharded).map(ResourceSupport::send); + return indexSearcherMono.flatMap(indexSearcher -> { + var indexSearchers = LLIndexSearchers.unsharded(indexSearcher); - return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = Mono - .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); - } + Mono queryParamsMono; + if (transformer == GlobalQueryRewrite.NO_REWRITE) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = Mono + .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) + .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); + } - return queryParamsMono.flatMap(queryParams2 -> this + return queryParamsMono.flatMap(queryParams2 -> this // Search first page results .searchFirstPage(indexSearchers.shards(), queryParams2, paginationInfo) // Compute the results of the first page - .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers.shards(), - keyFieldName, queryParams2)) + .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, + indexSearchers.shards(), + keyFieldName, + queryParams2 + )) // Compute other results .transform(firstResult -> this.computeOtherResults(firstResult, indexSearchers.shards(), queryParams2, keyFieldName, () -> { - if (indexSearchers.isAccessible()) { - indexSearchers.close(); + try { + indexSearcher.close(); + } catch (IOException e) { + LOG.error(e); } } )) // Ensure that one LuceneSearchResult is always returned - .single() - ); - }, - false); + .single()); + }); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java index 1688d55..4c28229 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java @@ -27,54 +27,54 @@ import reactor.core.scheduler.Schedulers; public class ScoredPagedMultiSearcher implements MultiSearcher { - protected static final Logger logger = LogManager.getLogger(ScoredPagedMultiSearcher.class); + protected static final Logger LOG = LogManager.getLogger(ScoredPagedMultiSearcher.class); public ScoredPagedMultiSearcher() { } @Override - public Mono collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono indexSearchersMono, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = indexSearchersMono - .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .handle((indexSearchers, sink) -> { - try { - sink.next(transformer.rewrite(indexSearchers.receive(), queryParams)); - } catch (IOException ex) { - sink.error(ex); - } - }); - } + return indexSearchersMono.flatMap(indexSearchers -> { + Mono queryParamsMono; + if (transformer == GlobalQueryRewrite.NO_REWRITE) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = Mono + .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) + .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); + } - return queryParamsMono.flatMap(queryParams2 -> { - PaginationInfo paginationInfo = getPaginationInfo(queryParams2); + return queryParamsMono.flatMap(queryParams2 -> { + PaginationInfo paginationInfo = getPaginationInfo(queryParams2); - return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this - // Search first page results - .searchFirstPage(indexSearchers.shards(), queryParams2, paginationInfo) - // Compute the results of the first page - .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers, - keyFieldName, queryParams2)) - // Compute other results - .map(firstResult -> this.computeOtherResults(firstResult, - indexSearchers.shards(), - queryParams2, - keyFieldName, - () -> { - if (indexSearchers.isAccessible()) { - indexSearchers.close(); - } + return this + // Search first page results + .searchFirstPage(indexSearchers.shards(), queryParams2, paginationInfo) + // Compute the results of the first page + .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, + indexSearchers, + keyFieldName, + queryParams2 + )) + // Compute other results + .map(firstResult -> this.computeOtherResults(firstResult, + indexSearchers.shards(), + queryParams2, + keyFieldName, + () -> { + try { + indexSearchers.close(); + } catch (IOException e) { + LOG.error("Can't close index searchers", e); } - )) - // Ensure that one LuceneSearchResult is always returned - .single(), - false); + } + )) + // Ensure that one LuceneSearchResult is always returned + .single(); + }); }); } @@ -155,10 +155,10 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { AtomicReference currentPageInfoRef = new AtomicReference<>(secondPageInfo); return Mono .fromSupplier(currentPageInfoRef::get) - .doOnNext(s -> logger.trace("Current page info: {}", s)) + .doOnNext(s -> LOG.trace("Current page info: {}", s)) .flatMap(currentPageInfo -> this.searchPage(queryParams, indexSearchers, true, queryParams.pageLimits(), 0, currentPageInfo)) - .doOnNext(s -> logger.trace("Next page info: {}", s.nextPageInfo())) + .doOnNext(s -> LOG.trace("Next page info: {}", s.nextPageInfo())) .doOnNext(s -> currentPageInfoRef.set(s.nextPageInfo())) .repeatWhen(s -> s.takeWhile(n -> n > 0)); }) diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java index a8e95ec..82d60c3 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java @@ -22,7 +22,7 @@ import reactor.core.scheduler.Schedulers; public class SortedByScoreFullMultiSearcher implements MultiSearcher { - protected static final Logger logger = LogManager.getLogger(SortedByScoreFullMultiSearcher.class); + protected static final Logger LOG = LogManager.getLogger(SortedByScoreFullMultiSearcher.class); private final LLTempHugePqEnv env; @@ -31,40 +31,34 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher { } @Override - public Mono collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono indexSearchersMono, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = indexSearchersMono - .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .handle((indexSearchers, sink) -> { - try { - sink.next(transformer.rewrite(indexSearchers.receive(), queryParams)); - } catch (IOException ex) { - sink.error(ex); - } - }); - } - - return queryParamsMono.flatMap(queryParams2 -> { - if (queryParams2.isSorted() && !queryParams2.isSortedByScore()) { - throw new IllegalArgumentException(SortedByScoreFullMultiSearcher.this.getClass().getSimpleName() - + " doesn't support sorted queries"); + return indexSearchersMono.flatMap(indexSearchers -> { + Mono queryParamsMono; + if (transformer == GlobalQueryRewrite.NO_REWRITE) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = Mono + .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) + .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); } - return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this - // Search results - .search(indexSearchers.shards(), queryParams2) - // Compute the results - .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, - keyFieldName, queryParams2)) - // Ensure that one LuceneSearchResult is always returned - .single(), - false); + return queryParamsMono.flatMap(queryParams2 -> { + if (queryParams2.isSorted() && !queryParams2.isSortedByScore()) { + throw new IllegalArgumentException(SortedByScoreFullMultiSearcher.this.getClass().getSimpleName() + + " doesn't support sorted queries"); + } + + return this + // Search results + .search(indexSearchers.shards(), queryParams2) + // Compute the results + .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, keyFieldName, queryParams2)) + // Ensure that one LuceneSearchResult is always returned + .single(); + }); }); } @@ -127,13 +121,15 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher { .take(queryParams.limitLong(), true); return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> { - if (indexSearchers.isAccessible()) { + try { indexSearchers.close(); + } catch (IOException e) { + LOG.error("Can't close index searchers", e); } try { data.close(); } catch (Exception e) { - logger.error("Failed to discard data", e); + LOG.error("Failed to discard data", e); } }); }); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java index 6cbc49d..f549a03 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java @@ -22,7 +22,7 @@ import reactor.core.scheduler.Schedulers; public class SortedScoredFullMultiSearcher implements MultiSearcher { - protected static final Logger logger = LogManager.getLogger(SortedScoredFullMultiSearcher.class); + protected static final Logger LOG = LogManager.getLogger(SortedScoredFullMultiSearcher.class); private final LLTempHugePqEnv env; @@ -31,34 +31,28 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher { } @Override - public Mono collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono indexSearchersMono, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = indexSearchersMono - .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .handle((indexSearchers, sink) -> { - try { - sink.next(transformer.rewrite(indexSearchers.receive(), queryParams)); - } catch (IOException ex) { - sink.error(ex); - } - }); - } + return indexSearchersMono.flatMap(indexSearchers -> { + Mono queryParamsMono; + if (transformer == GlobalQueryRewrite.NO_REWRITE) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = Mono + .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) + .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); + } - return queryParamsMono.flatMap(queryParams2 -> LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this - // Search results - .search(indexSearchers.shards(), queryParams2) - // Compute the results - .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, - keyFieldName, queryParams2)) - // Ensure that one LuceneSearchResult is always returned - .single(), - false)); + return queryParamsMono.flatMap(queryParams2 -> this + // Search results + .search(indexSearchers.shards(), queryParams2) + // Compute the results + .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, keyFieldName, queryParams2)) + // Ensure that one LuceneSearchResult is always returned + .single()); + }); } /** @@ -121,8 +115,10 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher { .take(queryParams.limitLong(), true); return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> { - if (indexSearchers.isAccessible()) { + try { indexSearchers.close(); + } catch (IOException e) { + LOG.error("Can't close index searchers", e); } data.close(); }); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java index 9f8ab08..c5c4245 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java @@ -26,40 +26,34 @@ import reactor.core.scheduler.Schedulers; public class StandardSearcher implements MultiSearcher { - protected static final Logger logger = LogManager.getLogger(StandardSearcher.class); + protected static final Logger LOG = LogManager.getLogger(StandardSearcher.class); public StandardSearcher() { } @Override - public Mono collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono indexSearchersMono, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = indexSearchersMono - .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .handle((indexSearchers, sink) -> { - try { - sink.next(transformer.rewrite(indexSearchers.receive(), queryParams)); - } catch (IOException ex) { - sink.error(ex); - } - }); - } + return indexSearchersMono.flatMap(indexSearchers -> { + Mono queryParamsMono; + if (transformer == GlobalQueryRewrite.NO_REWRITE) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = Mono + .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) + .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); + } - return queryParamsMono.flatMap(queryParams2 -> LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this - // Search results - .search(indexSearchers.shards(), queryParams2) - // Compute the results - .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, - keyFieldName, queryParams2)) - // Ensure that one LuceneSearchResult is always returned - .single(), - false)); + return queryParamsMono.flatMap(queryParams2 -> this + // Search results + .search(indexSearchers.shards(), queryParams2) + // Compute the results + .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, keyFieldName, queryParams2)) + // Ensure that one LuceneSearchResult is always returned + .single()); + }); } /** @@ -144,8 +138,10 @@ public class StandardSearcher implements MultiSearcher { .take(queryParams.limitLong(), true); return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> { - if (indexSearchers.isAccessible()) { + try { indexSearchers.close(); + } catch (IOException e) { + LOG.error("Can't close index searchers", e); } }); }); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java index e54a86b..f9bb940 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java @@ -9,8 +9,11 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.MaxScoreAccumulator; +import java.io.IOException; import java.util.List; import it.cavallium.dbengine.lucene.hugepq.search.CustomHitsThresholdChecker; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ScoreDoc; import org.jetbrains.annotations.Nullable; @@ -20,13 +23,15 @@ import reactor.core.scheduler.Schedulers; public class UnsortedStreamingMultiSearcher implements MultiSearcher { + + protected static final Logger LOG = LogManager.getLogger(UnsortedStreamingMultiSearcher.class); + @Override - public Mono collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono indexSearchersMono, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - - return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { + return indexSearchersMono.flatMap(indexSearchers -> { Mono queryParamsMono; if (transformer == GlobalQueryRewrite.NO_REWRITE) { queryParamsMono = Mono.just(queryParams); @@ -54,12 +59,14 @@ public class UnsortedStreamingMultiSearcher implements MultiSearcher { .take(queryParams2.limitLong(), true); return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { - if (indexSearchers.isAccessible()) { + try { indexSearchers.close(); + } catch (IOException e) { + LOG.error("Can't close index searchers", e); } }); }); - }, false); + }); } private Flux getScoreDocs(LocalQueryParams localQueryParams, List shards) { diff --git a/src/test/java/it/cavallium/dbengine/SwappableLuceneSearcher.java b/src/test/java/it/cavallium/dbengine/SwappableLuceneSearcher.java index f9275d6..3ad6752 100644 --- a/src/test/java/it/cavallium/dbengine/SwappableLuceneSearcher.java +++ b/src/test/java/it/cavallium/dbengine/SwappableLuceneSearcher.java @@ -27,7 +27,7 @@ public class SwappableLuceneSearcher implements LocalSearcher, MultiSearcher, Cl } @Override - public Mono collect(Mono> indexSearcherMono, + public Mono collect(Mono indexSearcherMono, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { @@ -52,7 +52,7 @@ public class SwappableLuceneSearcher implements LocalSearcher, MultiSearcher, Cl } @Override - public Mono collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, GlobalQueryRewrite transformer) { diff --git a/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java index 9216628..e4eb58f 100644 --- a/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java +++ b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java @@ -14,14 +14,19 @@ import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult; import it.cavallium.dbengine.lucene.searcher.MultiSearcher; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { + private static final Logger LOG = LogManager.getLogger(UnsortedUnscoredSimpleMultiSearcher.class); + private final LocalSearcher localSearcher; public UnsortedUnscoredSimpleMultiSearcher(LocalSearcher localSearcher) { @@ -29,30 +34,27 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { } @Override - public Mono collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono indexSearchersMono, LocalQueryParams queryParams, String keyFieldName, GlobalQueryRewrite transformer) { - return LLUtils.usingSendResource(indexSearchersMono, - indexSearchers -> { - Mono queryParamsMono; - if (transformer == NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = Mono - .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); - } + return indexSearchersMono.flatMap(indexSearchers -> { + Mono queryParamsMono; + if (transformer == NO_REWRITE) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = Mono + .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) + .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); + } - return queryParamsMono.flatMap(queryParams2 -> { + return queryParamsMono.flatMap(queryParams2 -> { var localQueryParams = getLocalQueryParams(queryParams2); return Flux - .fromIterable(indexSearchers.shards()) - .flatMap(searcher -> { - var llSearcher = Mono.fromCallable(() -> new LLIndexSearcher(searcher, false, null).send()); - return localSearcher.collect(llSearcher, localQueryParams, keyFieldName, transformer); - }) + .fromIterable(indexSearchers.llShards()) + .flatMap(searcher -> + localSearcher.collect(Mono.just(searcher), localQueryParams, keyFieldName, transformer)) .collectList() .map(results -> { List resultsToDrop = new ArrayList<>(results.size()); @@ -78,8 +80,10 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { luceneSearchResult.close(); } } - if (indexSearchers.isAccessible()) { + try { indexSearchers.close(); + } catch (IOException e) { + LOG.error("Can't close index searchers", e); } }); }) @@ -94,11 +98,9 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); } }); - } - ); - }, - false - ); + } + ); + }); } private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) {