diff --git a/src/example/java/it.cavallium.dbengine.client/CodecsExample.java b/src/example/java/it.cavallium.dbengine.client/CodecsExample.java index 48fb563..62924ad 100644 --- a/src/example/java/it.cavallium.dbengine.client/CodecsExample.java +++ b/src/example/java/it.cavallium.dbengine.client/CodecsExample.java @@ -80,7 +80,7 @@ public class CodecsExample { .then(), SpeedExample.numRepeats, tuple -> tuple.getT1().close() - )).subscribeOn(Schedulers.parallel()).blockOptional(); + )).transform(LLUtils::handleDiscard).subscribeOn(Schedulers.parallel()).blockOptional(); } private static void testConversion() { @@ -88,6 +88,7 @@ public class CodecsExample { .then() .then(readNew()) .subscribeOn(Schedulers.parallel()) + .transform(LLUtils::handleDiscard) .blockOptional(); } diff --git a/src/example/java/it.cavallium.dbengine.client/IndicizationExample.java b/src/example/java/it.cavallium.dbengine.client/IndicizationExample.java index b9ba410..3e525f8 100644 --- a/src/example/java/it.cavallium.dbengine.client/IndicizationExample.java +++ b/src/example/java/it.cavallium.dbengine.client/IndicizationExample.java @@ -62,6 +62,7 @@ public class IndicizationExample { .then(index.close()) ) .subscribeOn(Schedulers.parallel()) + .transform(LLUtils::handleDiscard) .block(); tempIndex(true) .flatMap(index -> @@ -138,6 +139,7 @@ public class IndicizationExample { .then(index.close()) ) .subscribeOn(Schedulers.parallel()) + .transform(LLUtils::handleDiscard) .block(); } diff --git a/src/example/java/it.cavallium.dbengine.client/SpeedExample.java b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java index 34f7768..525560c 100644 --- a/src/example/java/it.cavallium.dbengine.client/SpeedExample.java +++ b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java @@ -56,6 +56,7 @@ public class SpeedExample { .then(test3LevelPut()) .then(test4LevelPut()) .subscribeOn(Schedulers.parallel()) + .transform(LLUtils::handleDiscard) .blockOptional(); } diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 0c28a0f..95a68d3 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -35,6 +35,7 @@ import reactor.core.publisher.SignalType; public class LuceneIndexImpl implements LuceneIndex { + private static final Duration MAX_COUNT_TIME = Duration.ofSeconds(30); private final LLLuceneIndex luceneIndex; private final Indicizer indicizer; @@ -149,13 +150,9 @@ public class LuceneIndexImpl implements LuceneIndex { @Override public Mono count(@Nullable CompositeSnapshot snapshot, Query query) { - return Mono.usingWhen(this.search(ClientQueryParams - .builder() - .snapshot(snapshot) - .query(query) - .timeout(Duration.ofSeconds(30)) - .limit(0) - .build()), searchResultKeys -> Mono.just(searchResultKeys.totalHitsCount()), LLUtils::finalizeResource); + return luceneIndex + .count(resolveSnapshot(snapshot), query, MAX_COUNT_TIME) + .doOnDiscard(DiscardingCloseable.class, DiscardingCloseable::close); } @Override @@ -205,8 +202,7 @@ public class LuceneIndexImpl implements LuceneIndex { } @Nullable - private static LLSearchResultShard mergeResults(ClientQueryParams queryParams, - List shards) { + private static LLSearchResultShard mergeResults(ClientQueryParams queryParams, List shards) { if (shards.size() == 0) { return null; } else if (shards.size() == 1) { diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index 02f148a..e96c490 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -7,6 +7,7 @@ import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.BucketParams; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -60,16 +61,20 @@ public interface LLLuceneIndex extends LLSnapshottable, SafeCloseable { @Nullable Query normalizationQuery, BucketParams bucketParams); - default Mono count(@Nullable LLSnapshot snapshot, Query query) { - QueryParams params = QueryParams.of(query, 0, 0, NoSort.of(), false, Long.MAX_VALUE); - return Mono.from(this.search(snapshot, params, null) - .map(llSearchResultShard -> { - try (llSearchResultShard) { - return llSearchResultShard.totalHitsCount(); - } - }) - .defaultIfEmpty(TotalHitsCount.of(0, true)) + default Mono count(@Nullable LLSnapshot snapshot, Query query, @Nullable Duration timeout) { + QueryParams params = QueryParams.of(query, + 0, + 0, + NoSort.of(), + false, + timeout == null ? Long.MAX_VALUE : timeout.toMillis() ); + return Mono + .usingWhen(this.search(snapshot, params, null).singleOrEmpty(), + llSearchResultShard -> Mono.just(llSearchResultShard.totalHitsCount()), + LLUtils::finalizeResource + ) + .defaultIfEmpty(TotalHitsCount.of(0, true)); } boolean isLowMemoryMode(); diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java index 2afbd4a..a9a04e4 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java @@ -156,14 +156,14 @@ public class LLMultiDatabaseConnection implements LLDatabaseConnection { var connectionIndexStructure = indexStructure .setActiveShards(new IntArrayList(entry.getValue())); - var connIndex = entry.getKey() + Flux connIndex = entry.getKey() .getLuceneIndex(clusterName, connectionIndexStructure, indicizerAnalyzers, indicizerSimilarities, luceneOptions, luceneHacks - ).cache().repeat(); + ).cast(LLLuceneIndex.class).cache().repeat(); return Flux .fromIterable(entry.getValue()) .zipWith(connIndex); @@ -171,7 +171,7 @@ public class LLMultiDatabaseConnection implements LLDatabaseConnection { .collectList() .map(indices -> { var luceneIndices = new LLLuceneIndex[indexStructure.totalShards()]; - for (Tuple2 index : indices) { + for (var index : indices) { luceneIndices[index.getT1()] = index.getT2(); } return new LLMultiLuceneIndex(clusterName, diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java index 33d8b8e..9cb7986 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java @@ -131,7 +131,7 @@ public class LLMultiLuceneIndex implements LLLuceneIndex { queryParams, keyFieldName, mltDocumentFields - )); + )).doOnDiscard(DiscardingCloseable.class, SafeCloseable::close); } private Mono mergeShards(List shards) { @@ -167,7 +167,7 @@ public class LLMultiLuceneIndex implements LLLuceneIndex { return luceneIndicesFlux.flatMap(luceneIndex -> luceneIndex.search(snapshot, queryParams, keyFieldName - )); + )).doOnDiscard(DiscardingCloseable.class, SafeCloseable::close); } @Override @@ -179,12 +179,7 @@ public class LLMultiLuceneIndex implements LLLuceneIndex { queries, normalizationQuery, bucketParams - )).collectList().flatMap(this::mergeShards); - } - - @Override - public Mono count(@Nullable LLSnapshot snapshot, Query query) { - return LLLuceneIndex.super.count(snapshot, query); + )).collectList().flatMap(this::mergeShards).doOnDiscard(DiscardingCloseable.class, SafeCloseable::close); } @Override @@ -195,7 +190,7 @@ public class LLMultiLuceneIndex implements LLLuceneIndex { @Override public void close() { Iterable> it = () -> luceneIndicesSet.stream().map(e -> Mono.fromRunnable(e::close)).iterator(); - Mono.whenDelayError(it).block(); + Mono.whenDelayError(it).transform(LLUtils::handleDiscard).block(); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 2e0c5aa..48ebf96 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -647,7 +647,7 @@ public class LLUtils { return Mono.fromRunnable(() -> LLUtils.finalizeResourceNow(resource)); } - public static Mono finalizeResource(SimpleResource resource) { + public static Mono finalizeResource(SafeCloseable resource) { return Mono.fromRunnable(resource::close); } @@ -657,7 +657,7 @@ public class LLUtils { } } - public static void finalizeResourceNow(SimpleResource resource) { + public static void finalizeResourceNow(SafeCloseable resource) { resource.close(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 336d815..e6327e5 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -74,7 +74,7 @@ public class DatabaseMapDictionaryHashed extends SerializerFixedBinaryLength keySuffixHashSerializer, Runnable onClose) { super((Drop>) (Drop) DROP); - if (dictionary.getUpdateMode().block() != UpdateMode.ALLOW) { + if (dictionary.getUpdateMode().transform(LLUtils::handleDiscard).block() != UpdateMode.ALLOW) { throw new IllegalArgumentException("Hashed maps only works when UpdateMode is ALLOW"); } this.alloc = dictionary.getAllocator(); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 32720ab..561497f 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -263,7 +263,7 @@ public interface DatabaseStageMap> extends * Value getter doesn't lock data. Please make sure to lock before getting data. */ default ValueGetterBlocking getDbValueGetter(@Nullable CompositeSnapshot snapshot) { - return k -> getValue(snapshot, k).block(); + return k -> getValue(snapshot, k).transform(LLUtils::handleDiscard).block(); } default ValueGetter getAsyncDbValueGetter(@Nullable CompositeSnapshot snapshot) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java index e92425b..de2a277 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java @@ -238,10 +238,6 @@ public class CachedIndexSearcherManager extends SimpleResource implements IndexS private class SnapshotIndexSearcher extends LLIndexSearcher { - public SnapshotIndexSearcher(IndexSearcher indexSearcher, AtomicBoolean closed) { - super(indexSearcher, closed); - } - public SnapshotIndexSearcher(IndexSearcher indexSearcher) { super(indexSearcher); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 0efbd09..7c8fce9 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -17,6 +17,7 @@ import io.micrometer.core.instrument.Timer; import it.cavallium.dbengine.client.query.QueryParser; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLIndexRequest; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResultShard; @@ -37,6 +38,7 @@ import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; +import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult; import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; import it.cavallium.dbengine.rpc.current.data.LuceneOptions; @@ -508,13 +510,28 @@ public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex @Override public Flux search(@Nullable LLSnapshot snapshot, QueryParams queryParams, @Nullable String keyFieldName) { + return searchInternal(snapshot, queryParams, keyFieldName) + .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .flux(); + } + + public Mono searchInternal(@Nullable LLSnapshot snapshot, QueryParams queryParams, + @Nullable String keyFieldName) { LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer); var searcher = searcherManager.retrieveSearcher(snapshot); - return localSearcher - .collect(searcher, localQueryParams, keyFieldName, NO_REWRITE) - .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) - .flux(); + return localSearcher.collect(searcher, localQueryParams, keyFieldName, NO_REWRITE); + } + + @Override + public Mono count(@Nullable LLSnapshot snapshot, Query query, @Nullable Duration timeout) { + var params = LuceneUtils.getCountQueryParams(query); + return Mono + .usingWhen(this.searchInternal(snapshot, params, null), + result -> Mono.just(result.totalHitsCount()), + LLUtils::finalizeResource + ) + .defaultIfEmpty(TotalHitsCount.of(0, true)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 09ede71..003e4c5 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -6,8 +6,10 @@ import com.google.common.collect.Multimap; import io.micrometer.core.instrument.MeterRegistry; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.query.QueryParser; +import it.cavallium.dbengine.client.query.current.data.NoSort; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLIndexRequest; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResultShard; @@ -25,6 +27,7 @@ import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher; import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; +import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult; import it.cavallium.dbengine.lucene.searcher.MultiSearcher; import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; @@ -34,6 +37,7 @@ import it.unimi.dsi.fastutil.ints.IntList; import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -288,15 +292,31 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI public Flux search(@Nullable LLSnapshot snapshot, QueryParams queryParams, @Nullable String keyFieldName) { + return searchInternal(snapshot, queryParams, keyFieldName) + // Transform the result type + .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .flux(); + } + + private Mono searchInternal(@Nullable LLSnapshot snapshot, + QueryParams queryParams, + @Nullable String keyFieldName) { LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer); var searchers = getIndexSearchers(snapshot); // Collect all the shards results into a single global result - return multiSearcher - .collectMulti(searchers, localQueryParams, keyFieldName, GlobalQueryRewrite.NO_REWRITE) - // Transform the result type - .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) - .flux(); + return multiSearcher.collectMulti(searchers, localQueryParams, keyFieldName, GlobalQueryRewrite.NO_REWRITE); + } + + @Override + public Mono count(@Nullable LLSnapshot snapshot, Query query, @Nullable Duration timeout) { + var params = LuceneUtils.getCountQueryParams(query); + return Mono + .usingWhen(this.searchInternal(snapshot, params, null), + result -> Mono.just(result.totalHitsCount()), + LLUtils::finalizeResource + ) + .defaultIfEmpty(TotalHitsCount.of(0, true)); } @Override @@ -336,6 +356,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) .publishOn(Schedulers.parallel()) .then() + .transform(LLUtils::handleDiscard) .block(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java index d2e9421..3acc75e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk; import static com.google.common.collect.Lists.partition; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.rpc.current.data.Column; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -92,7 +93,7 @@ public class RocksDBUtils { } } return null; - }).subscribeOn(Schedulers.boundedElastic())).toList()).block(); + }).subscribeOn(Schedulers.boundedElastic())).toList()).transform(LLUtils::handleDiscard).block(); } } } diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java index 2d56f94..719159f 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java @@ -22,6 +22,7 @@ import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUpdateDocument; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.RocksDBLongProperty; import it.cavallium.dbengine.database.RocksDBMapProperty; import it.cavallium.dbengine.database.RocksDBStringProperty; @@ -510,7 +511,7 @@ public class LLQuicConnection implements LLDatabaseConnection { @Override public void close() { - sendRequest(new CloseLuceneIndex(id)).then().block(); + sendRequest(new CloseLuceneIndex(id)).then().transform(LLUtils::handleDiscard).block(); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInputShared.java b/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInputShared.java index ae48e26..dd48c26 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInputShared.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInputShared.java @@ -69,7 +69,7 @@ public class BufferDataInputShared implements BufferDataInput { } @Override - public int readUnsignedByte() { + public int readUnsignedByte() {/* if (StackWalker.getInstance().walk(s -> s.limit(16).anyMatch(frame -> frame.getMethodName().contains("updateAndGetDelta")))) {throw new TempException();}*/ if (buf == null) throw new IndexOutOfBoundsException(); return buf.readUnsignedByte(); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index a5325ab..cf8a457 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -1,6 +1,8 @@ package it.cavallium.dbengine.lucene; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; +import static it.cavallium.dbengine.database.LLUtils.singleOrClose; +import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -9,6 +11,7 @@ import it.cavallium.data.generator.nativedata.Nullableint; import it.cavallium.data.generator.nativedata.Nullablelong; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.query.QueryParser; +import it.cavallium.dbengine.client.query.current.data.NoSort; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; @@ -18,7 +21,9 @@ import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.DatabaseStageEntry; import it.cavallium.dbengine.database.collections.DatabaseStageMap; import it.cavallium.dbengine.database.collections.ValueGetter; +import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.database.disk.LLIndexSearchers.UnshardedIndexSearchers; import it.cavallium.dbengine.lucene.analyzer.LegacyWordAnalyzer; import it.cavallium.dbengine.lucene.analyzer.NCharGramAnalyzer; import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer; @@ -28,7 +33,12 @@ import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer; import it.cavallium.dbengine.lucene.directory.RocksdbDirectory; import it.cavallium.dbengine.lucene.mlt.BigCompositeReader; import it.cavallium.dbengine.lucene.mlt.MultiMoreLikeThis; +import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher; +import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; +import it.cavallium.dbengine.lucene.searcher.LocalSearcher; +import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult; +import it.cavallium.dbengine.lucene.searcher.MultiSearcher; import it.cavallium.dbengine.lucene.similarity.NGramSimilarity; import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory; import it.cavallium.dbengine.rpc.current.data.DirectIOFSDirectory; @@ -104,7 +114,6 @@ import org.novasearch.lucene.search.similarities.BM25Similarity.BM25Model; import org.novasearch.lucene.search.similarities.LdpSimilarity; import org.novasearch.lucene.search.similarities.LtcSimilarity; import org.novasearch.lucene.search.similarities.RobertsonSimilarity; -import org.rocksdb.util.SizeUnit; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -743,4 +752,39 @@ public class LuceneUtils { public static it.cavallium.dbengine.rpc.current.data.TieredMergePolicy getDefaultMergePolicy() { return DEFAULT_MERGE_POLICY; } + + public static QueryParams getCountQueryParams(it.cavallium.dbengine.client.query.current.data.Query query) { + return QueryParams.of(query, 0, 0, NoSort.of(), false, Long.MAX_VALUE); + } + + public static Mono rewrite(LocalSearcher localSearcher, + Mono indexSearcherMono, + LocalQueryParams queryParams, + String keyFieldName, + GlobalQueryRewrite transformer) { + return Mono.usingWhen(indexSearcherMono, indexSearcher -> { + try (UnshardedIndexSearchers indexSearchers = LLIndexSearchers.unsharded(indexSearcher)) { + return Mono + .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) + .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .flatMap(queryParams2 -> + localSearcher.collect(indexSearcherMono, queryParams2, keyFieldName, NO_REWRITE)); + } + }, LLUtils::finalizeResource); + } + + public static Mono rewriteMulti(MultiSearcher multiSearcher, + Mono indexSearchersMono, + LocalQueryParams queryParams, + String keyFieldName, + GlobalQueryRewrite transformer) { + return Mono.usingWhen(indexSearchersMono, + indexSearchers -> Mono + .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) + .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .flatMap(queryParams2 -> + multiSearcher.collectMulti(indexSearchersMono, queryParams2, keyFieldName, NO_REWRITE)), + LLUtils::finalizeResource + ); + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java index 11743c2..3ba7249 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java @@ -7,7 +7,9 @@ import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRIT import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.database.disk.LLIndexSearchers.UnshardedIndexSearchers; import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; +import it.cavallium.dbengine.lucene.LuceneUtils; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -47,18 +49,10 @@ public class AdaptiveLocalSearcher implements LocalSearcher { LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - return singleOrClose(indexSearcherMono, indexSearcher -> { - var indexSearchers = LLIndexSearchers.unsharded(indexSearcher); - - if (transformer == NO_REWRITE) { - return transformedCollect(indexSearcher, queryParams, keyFieldName, transformer); - } else { - return Mono - .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .flatMap(queryParams2 -> transformedCollect(indexSearcher, queryParams2, keyFieldName, NO_REWRITE)); - } - }); + if (transformer != NO_REWRITE) { + return LuceneUtils.rewrite(this, indexSearcherMono, queryParams, keyFieldName, transformer); + } + return transformedCollect(indexSearcherMono, queryParams, keyFieldName, transformer); } @Override @@ -67,7 +61,7 @@ public class AdaptiveLocalSearcher implements LocalSearcher { } // Remember to change also AdaptiveMultiSearcher - public Mono transformedCollect(LLIndexSearcher indexSearcher, + public Mono transformedCollect(Mono indexSearcherMono, LocalQueryParams queryParams, String keyFieldName, GlobalQueryRewrite transformer) { @@ -77,36 +71,36 @@ public class AdaptiveLocalSearcher implements LocalSearcher { = Math.max(maxInMemoryResultEntries, (long) queryParams.pageLimits().getPageLimit(0)); if (!FORCE_HUGE_PQ && queryParams.limitLong() == 0) { - return countSearcher.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); + return countSearcher.collect(indexSearcherMono, queryParams, keyFieldName, transformer); } else if (!FORCE_HUGE_PQ && realLimit <= maxInMemoryResultEntries) { - return standardSearcher.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); + return standardSearcher.collect(indexSearcherMono, queryParams, keyFieldName, transformer); } else if (FORCE_HUGE_PQ || queryParams.isSorted()) { if (!FORCE_HUGE_PQ && realLimit <= maxAllowedInMemoryLimit) { - return scoredPaged.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); + return scoredPaged.collect(indexSearcherMono, queryParams, keyFieldName, transformer); } else { if (queryParams.isSortedByScore()) { if (!FORCE_HUGE_PQ && queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } if (sortedByScoreFull != null) { - return sortedByScoreFull.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); + return sortedByScoreFull.collect(indexSearcherMono, queryParams, keyFieldName, transformer); } else { - return scoredPaged.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); + return scoredPaged.collect(indexSearcherMono, queryParams, keyFieldName, transformer); } } else { if (!FORCE_HUGE_PQ && queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } if (sortedScoredFull != null) { - return sortedScoredFull.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); + return sortedScoredFull.collect(indexSearcherMono, queryParams, keyFieldName, transformer); } else { - return scoredPaged.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); + return scoredPaged.collect(indexSearcherMono, queryParams, keyFieldName, transformer); } } } } else { // Run large/unbounded searches using the continuous multi searcher - return unsortedUnscoredContinuous.collect(Mono.just(indexSearcher), queryParams, keyFieldName, transformer); + return unsortedUnscoredContinuous.collect(indexSearcherMono, queryParams, keyFieldName, transformer); } } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java index eb8354c..a8b5fbb 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java @@ -9,6 +9,7 @@ import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; +import it.cavallium.dbengine.lucene.LuceneUtils; import java.io.IOException; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; @@ -46,19 +47,14 @@ public class AdaptiveMultiSearcher implements MultiSearcher { LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - return singleOrClose(indexSearchersMono, indexSearchers -> { - if (transformer == NO_REWRITE) { - return transformedCollectMulti(indexSearchers, queryParams, keyFieldName, transformer); - } else { - return Mono.fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .flatMap(queryParams2 -> transformedCollectMulti(indexSearchers, queryParams2, keyFieldName, NO_REWRITE)); - } - }); + if (transformer != NO_REWRITE) { + return LuceneUtils.rewriteMulti(this, indexSearchersMono, queryParams, keyFieldName, transformer); + } + return transformedCollectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } // Remember to change also AdaptiveLocalSearcher - public Mono transformedCollectMulti(LLIndexSearchers indexSearchers, + public Mono transformedCollectMulti(Mono indexSearchers, LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { @@ -68,36 +64,36 @@ public class AdaptiveMultiSearcher implements MultiSearcher { = Math.max(maxInMemoryResultEntries, (long) queryParams.pageLimits().getPageLimit(0)); if (!FORCE_HUGE_PQ && queryParams.limitLong() == 0) { - return count.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + return count.collectMulti(indexSearchers, queryParams, keyFieldName, transformer); } else if (!FORCE_HUGE_PQ && realLimit <= maxInMemoryResultEntries) { - return standardSearcher.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + return standardSearcher.collectMulti(indexSearchers, queryParams, keyFieldName, transformer); } else if (FORCE_HUGE_PQ || queryParams.isSorted()) { if (!FORCE_HUGE_PQ && realLimit <= maxAllowedInMemoryLimit) { - return scoredPaged.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + return scoredPaged.collectMulti(indexSearchers, queryParams, keyFieldName, transformer); } else { if (queryParams.isSortedByScore()) { if (!FORCE_HUGE_PQ && queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } if (sortedByScoreFull != null) { - return sortedByScoreFull.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + return sortedByScoreFull.collectMulti(indexSearchers, queryParams, keyFieldName, transformer); } else { - return scoredPaged.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + return scoredPaged.collectMulti(indexSearchers, queryParams, keyFieldName, transformer); } } else { if (!FORCE_HUGE_PQ && queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } if (sortedScoredFull != null) { - return sortedScoredFull.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + return sortedScoredFull.collectMulti(indexSearchers, queryParams, keyFieldName, transformer); } else { - return scoredPaged.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + return scoredPaged.collectMulti(indexSearchers, queryParams, keyFieldName, transformer); } } } } else { // Run large/unbounded searches using the continuous multi searcher - return unsortedUnscoredContinuous.collectMulti(Mono.just(indexSearchers), queryParams, keyFieldName, transformer); + return unsortedUnscoredContinuous.collectMulti(indexSearchers, queryParams, keyFieldName, transformer); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java index b803c86..07ae092 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; import static it.cavallium.dbengine.database.LLUtils.singleOrClose; +import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; @@ -9,6 +10,8 @@ import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.database.disk.LLIndexSearchers.UnshardedIndexSearchers; +import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.utils.SimpleResource; import java.io.IOException; import java.io.UncheckedIOException; @@ -16,6 +19,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.search.IndexSearcher; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -30,61 +34,36 @@ public class CountMultiSearcher implements MultiSearcher { LocalQueryParams queryParams, String keyFieldName, GlobalQueryRewrite transformer) { - return singleOrClose(indexSearchersMono, indexSearchers -> { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = Mono - .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); - } + if (transformer != GlobalQueryRewrite.NO_REWRITE) { + return LuceneUtils.rewriteMulti(this, indexSearchersMono, queryParams, keyFieldName, transformer); + } + if (queryParams.isSorted() && queryParams.limitLong() > 0) { + throw new UnsupportedOperationException( + "Sorted queries are not supported by SimpleUnsortedUnscoredLuceneMultiSearcher"); + } + if (queryParams.needsScores() && queryParams.limitLong() > 0) { + throw new UnsupportedOperationException( + "Scored queries are not supported by SimpleUnsortedUnscoredLuceneMultiSearcher"); + } - return queryParamsMono.flatMap(queryParams2 -> { - var localQueryParams = getLocalQueryParams(queryParams2); - return Mono - .fromRunnable(() -> { - if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { - throw new UnsupportedOperationException( - "Sorted queries are not supported by SimpleUnsortedUnscoredLuceneMultiSearcher"); - } - if (queryParams2.needsScores() && queryParams2.limitLong() > 0) { - throw new UnsupportedOperationException( - "Scored queries are not supported by SimpleUnsortedUnscoredLuceneMultiSearcher"); - } - }) - .thenMany(Flux.fromIterable(indexSearchers.llShards())) - .flatMap(searcher -> this.collect(Mono.just(searcher), localQueryParams, keyFieldName, transformer)) - .collectList() - .map(results -> { - List resultsToDrop = new ArrayList<>(results.size()); - List> resultsFluxes = new ArrayList<>(results.size()); - boolean exactTotalHitsCount = true; - long totalHitsCountValue = 0; - for (LuceneSearchResult result : results) { - resultsToDrop.add(result); - resultsFluxes.add(result.results()); - exactTotalHitsCount &= result.totalHitsCount().exact(); - totalHitsCountValue += result.totalHitsCount().value(); - } + return Mono.usingWhen(indexSearchersMono, searchers -> Flux + .fromIterable(searchers.llShards()) + .flatMap(searcher -> this.collect(Mono.just(searcher), queryParams, keyFieldName, transformer)) + .collectList() + .map(results -> { + boolean exactTotalHitsCount = true; + long totalHitsCountValue = 0; + for (LuceneSearchResult result : results) { + exactTotalHitsCount &= result.totalHitsCount().exact(); + totalHitsCountValue += result.totalHitsCount().value(); + result.close(); + } - var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount); - Flux mergedFluxes = Flux - .merge(resultsFluxes) - .skip(queryParams2.offsetLong()) - .take(queryParams2.limitLong(), true); + var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount); - return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { - resultsToDrop.forEach(LLUtils::finalizeResourceNow); - try { - indexSearchers.close(); - } catch (UncheckedIOException e) { - LOG.error("Can't close index searchers", e); - } - }); - }); - }); - }); + return new LuceneSearchResult(totalHitsCount, Flux.empty(), null); + }) + .doOnDiscard(LuceneSearchResult.class, SimpleResource::close), LLUtils::finalizeResource); } private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) { @@ -103,31 +82,17 @@ public class CountMultiSearcher implements MultiSearcher { LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - return singleOrClose(indexSearcherMono, indexSearcher -> { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = Mono - .fromCallable(() -> transformer.rewrite(LLIndexSearchers.unsharded(indexSearcher), queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); - } + if (transformer != GlobalQueryRewrite.NO_REWRITE) { + return LuceneUtils.rewrite(this, indexSearcherMono, queryParams, keyFieldName, transformer); + } - return queryParamsMono - .flatMap(queryParams2 -> Mono.fromCallable(() -> { - LLUtils.ensureBlocking(); - return (long) indexSearcher.getIndexSearcher().count(queryParams2.query()); - }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) - .publishOn(Schedulers.parallel()) - .transform(TimeoutUtil.timeoutMono(queryParams.timeout())) - .map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), () -> { - try { - indexSearcher.close(); - } catch (UncheckedIOException e) { - LOG.error("Can't close index searchers", e); - } - })); - }); + return Mono.usingWhen(indexSearcherMono, indexSearcher -> Mono.fromCallable(() -> { + LLUtils.ensureBlocking(); + return (long) indexSearcher.getIndexSearcher().count(queryParams.query()); + }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())), LLUtils::finalizeResource) + .publishOn(Schedulers.parallel()) + .transform(TimeoutUtil.timeoutMono(queryParams.timeout())) + .map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), null)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/MultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/MultiSearcher.java index 173b9fe..f73b5e2 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/MultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/MultiSearcher.java @@ -32,8 +32,7 @@ public interface MultiSearcher extends LocalSearcher { LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - Mono searchers = singleOrClose(indexSearcherMono, indexSearcher -> - Mono.just(LLIndexSearchers.unsharded(indexSearcher))); + Mono searchers = indexSearcherMono.map(LLIndexSearchers::unsharded); return this.collectMulti(searchers, queryParams, keyFieldName, transformer); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java index 58fdcd6..3325ab4 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java @@ -39,45 +39,37 @@ public class PagedLocalSearcher implements LocalSearcher { LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { + if (transformer != GlobalQueryRewrite.NO_REWRITE) { + return LuceneUtils.rewrite(this, indexSearcherMono, queryParams, keyFieldName, transformer); + } PaginationInfo paginationInfo = getPaginationInfo(queryParams); - return singleOrClose(indexSearcherMono, indexSearcher -> { - var indexSearchers = LLIndexSearchers.unsharded(indexSearcher); + var indexSearchersMono = indexSearcherMono.map(LLIndexSearchers::unsharded); - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = Mono - .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); - } - - return queryParamsMono.flatMap(queryParams2 -> this - // Search first page results - .searchFirstPage(indexSearchers.shards(), queryParams2, paginationInfo) - // Compute the results of the first page - .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, - indexSearchers.shards(), - keyFieldName, - queryParams2 - )) - // Compute other results - .transform(firstResult -> this.computeOtherResults(firstResult, - indexSearchers.shards(), - queryParams2, - keyFieldName, - () -> { - try { - indexSearcher.close(); - } catch (UncheckedIOException e) { - LOG.error(e); - } + return singleOrClose(indexSearchersMono, indexSearchers -> this + // Search first page results + .searchFirstPage(indexSearchers.shards(), queryParams, paginationInfo) + // Compute the results of the first page + .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, + indexSearchers.shards(), + keyFieldName, + queryParams + )) + // Compute other results + .transform(firstResult -> this.computeOtherResults(firstResult, + indexSearchers.shards(), + queryParams, + keyFieldName, + () -> { + try { + indexSearchers.close(); + } catch (UncheckedIOException e) { + LOG.error(e); } - )) - // Ensure that one LuceneSearchResult is always returned - .single()); - }); + } + )) + // Ensure that one LuceneSearchResult is always returned + .single()); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java index 8c0342d..85b44bb 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java @@ -39,45 +39,31 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - return singleOrClose(indexSearchersMono, indexSearchers -> { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = Mono - .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); - } + if (transformer != GlobalQueryRewrite.NO_REWRITE) { + return LuceneUtils.rewriteMulti(this, indexSearchersMono, queryParams, keyFieldName, transformer); + } - return queryParamsMono.flatMap(queryParams2 -> { - PaginationInfo paginationInfo = getPaginationInfo(queryParams2); + PaginationInfo paginationInfo = getPaginationInfo(queryParams); - return this - // Search first page results - .searchFirstPage(indexSearchers.shards(), queryParams2, paginationInfo) - // Compute the results of the first page - .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, - indexSearchers, - keyFieldName, - queryParams2 - )) - // Compute other results - .map(firstResult -> this.computeOtherResults(firstResult, - indexSearchers.shards(), - queryParams2, - keyFieldName, - () -> { - try { - indexSearchers.close(); - } catch (UncheckedIOException e) { - LOG.error("Can't close index searchers", e); - } - } - )) - // Ensure that one LuceneSearchResult is always returned - .single(); - }); - }); + return singleOrClose(indexSearchersMono, indexSearchers -> this + // Search first page results + .searchFirstPage(indexSearchers.shards(), queryParams, paginationInfo) + // Compute the results of the first page + .transform(firstPageTopDocsMono -> + this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers, keyFieldName, queryParams + )) + // Compute other results + .map(firstResult -> this.computeOtherResults(firstResult, indexSearchers.shards(), queryParams, keyFieldName, + () -> { + try { + indexSearchers.close(); + } catch (UncheckedIOException e) { + LOG.error("Can't close index searchers", e); + } + } + )) + // Ensure that one LuceneSearchResult is always returned + .single()); } private Sort getSort(LocalQueryParams queryParams) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java index 31af292..4b64b7a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java @@ -37,31 +37,20 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher { LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - return singleOrClose(indexSearchersMono, indexSearchers -> { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = Mono - .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); - } - - return queryParamsMono.flatMap(queryParams2 -> { - if (queryParams2.isSorted() && !queryParams2.isSortedByScore()) { - throw new IllegalArgumentException(SortedByScoreFullMultiSearcher.this.getClass().getSimpleName() - + " doesn't support sorted queries"); - } - - return this - // Search results - .search(indexSearchers.shards(), queryParams2) - // Compute the results - .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, keyFieldName, queryParams2)) - // Ensure that one LuceneSearchResult is always returned - .single(); - }); - }); + if (transformer != GlobalQueryRewrite.NO_REWRITE) { + return LuceneUtils.rewriteMulti(this, indexSearchersMono, queryParams, keyFieldName, transformer); + } + if (queryParams.isSorted() && !queryParams.isSortedByScore()) { + throw new IllegalArgumentException(SortedByScoreFullMultiSearcher.this.getClass().getSimpleName() + + " doesn't support sorted queries"); + } + return singleOrClose(indexSearchersMono, indexSearchers -> this + // Search results + .search(indexSearchers.shards(), queryParams) + // Compute the results + .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, keyFieldName, queryParams)) + // Ensure that one LuceneSearchResult is always returned + .single()); } /** @@ -71,7 +60,6 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher { LocalQueryParams queryParams) { return Mono .fromCallable(() -> { - LLUtils.ensureBlocking(); var totalHitsThreshold = queryParams.getTotalHitsThresholdLong(); return HugePqFullScoreDocCollector.createSharedManager(env, queryParams.limitLong(), totalHitsThreshold); }) @@ -91,7 +79,7 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher { collector.close(); throw ex; } - })) + }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) .collectList() .flatMap(collectors -> Mono.fromCallable(() -> { try { @@ -104,7 +92,8 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher { throw ex; } })) - ); + ) + .publishOn(Schedulers.parallel()); } /** diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java index 092adba..86c4023 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java @@ -37,24 +37,16 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher { LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - return singleOrClose(indexSearchersMono, indexSearchers -> { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = Mono - .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); - } - - return queryParamsMono.flatMap(queryParams2 -> this - // Search results - .search(indexSearchers.shards(), queryParams2) - // Compute the results - .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, keyFieldName, queryParams2)) - // Ensure that one LuceneSearchResult is always returned - .single()); - }); + if (transformer != GlobalQueryRewrite.NO_REWRITE) { + return LuceneUtils.rewriteMulti(this, indexSearchersMono, queryParams, keyFieldName, transformer); + } + return singleOrClose(indexSearchersMono, indexSearchers -> this + // Search results + .search(indexSearchers.shards(), queryParams) + // Compute the results + .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, keyFieldName, queryParams)) + // Ensure that one LuceneSearchResult is always returned + .single()); } /** @@ -98,7 +90,9 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher { throw ex; } })) - ); + ) + .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .publishOn(Schedulers.parallel()); } /** diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java index ad77ea0..cbc72b2 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java @@ -37,24 +37,16 @@ public class StandardSearcher implements MultiSearcher { LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - return singleOrClose(indexSearchersMono, indexSearchers -> { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = Mono - .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); - } - - return queryParamsMono.flatMap(queryParams2 -> this - // Search results - .search(indexSearchers.shards(), queryParams2) - // Compute the results - .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, keyFieldName, queryParams2)) - // Ensure that one LuceneSearchResult is always returned - .single()); - }); + if (transformer != GlobalQueryRewrite.NO_REWRITE) { + return LuceneUtils.rewriteMulti(this, indexSearchersMono, queryParams, keyFieldName, transformer); + } + return singleOrClose(indexSearchersMono, indexSearchers -> this + // Search results + .search(indexSearchers.shards(), queryParams) + // Compute the results + .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, keyFieldName, queryParams)) + // Ensure that one LuceneSearchResult is always returned + .single()); } /** diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java index 9614cff..f000fac 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java @@ -33,42 +33,31 @@ public class UnsortedStreamingMultiSearcher implements MultiSearcher { LocalQueryParams queryParams, @Nullable String keyFieldName, GlobalQueryRewrite transformer) { - return singleOrClose(indexSearchersMono, indexSearchers -> { - Mono queryParamsMono; - if (transformer == GlobalQueryRewrite.NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = Mono - .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); - } + if (transformer != GlobalQueryRewrite.NO_REWRITE) { + return LuceneUtils.rewriteMulti(this, indexSearchersMono, queryParams, keyFieldName, transformer); + } + if (queryParams.isSorted() && queryParams.limitLong() > 0) { + throw new UnsupportedOperationException("Sorted queries are not supported" + " by UnsortedContinuousLuceneMultiSearcher"); + } + var localQueryParams = getLocalQueryParams(queryParams); + return singleOrClose(indexSearchersMono, indexSearchers -> Mono.fromCallable(() -> { + var shards = indexSearchers.shards(); - return queryParamsMono.map(queryParams2 -> { - var localQueryParams = getLocalQueryParams(queryParams2); - if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { - throw new UnsupportedOperationException("Sorted queries are not supported" - + " by UnsortedContinuousLuceneMultiSearcher"); + Flux scoreDocsFlux = getScoreDocs(localQueryParams, shards); + + Flux resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false); + + var totalHitsCount = new TotalHitsCount(0, false); + Flux mergedFluxes = resultsFlux.skip(queryParams.offsetLong()).take(queryParams.limitLong(), true); + + return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { + try { + indexSearchers.close(); + } catch (UncheckedIOException e) { + LOG.error("Can't close index searchers", e); } - var shards = indexSearchers.shards(); - - Flux scoreDocsFlux = getScoreDocs(localQueryParams, shards); - - Flux resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false); - - var totalHitsCount = new TotalHitsCount(0, false); - Flux mergedFluxes = resultsFlux - .skip(queryParams2.offsetLong()) - .take(queryParams2.limitLong(), true); - - return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { - try { - indexSearchers.close(); - } catch (UncheckedIOException e) { - LOG.error("Can't close index searchers", e); - } - }); }); - }); + })); } private Flux getScoreDocs(LocalQueryParams localQueryParams, List shards) { diff --git a/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java b/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java index 3be2966..34817c6 100644 --- a/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java +++ b/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java @@ -15,6 +15,7 @@ import it.cavallium.dbengine.client.Sort; import it.cavallium.dbengine.client.query.current.data.MatchAllDocsQuery; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLScoreMode; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher; import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher; @@ -136,7 +137,11 @@ public class TestLuceneIndex { index.updateDocument("test-key-13", "2111").block(); index.updateDocument("test-key-14", "2999").block(); index.updateDocument("test-key-15", "3902").block(); - Flux.range(1, 1000).concatMap(i -> index.updateDocument("test-key-" + (15 + i), "" + i)).blockLast(); + Flux + .range(1, 1000) + .concatMap(i -> index.updateDocument("test-key-" + (15 + i), "" + i)) + .transform(LLUtils::handleDiscard) + .blockLast(); tempDb.swappableLuceneSearcher().setSingle(new CountMultiSearcher()); tempDb.swappableLuceneSearcher().setMulti(new CountMultiSearcher()); assertCount(index, 1000 + 15); diff --git a/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java index 754b7b2..0ec086f 100644 --- a/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java +++ b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java @@ -10,6 +10,7 @@ import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; @@ -42,64 +43,51 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { LocalQueryParams queryParams, String keyFieldName, GlobalQueryRewrite transformer) { + if (transformer != NO_REWRITE) { + return LuceneUtils.rewriteMulti(this, indexSearchersMono, queryParams, keyFieldName, transformer); + } + if (queryParams.isSorted() && queryParams.limitLong() > 0) { + throw new UnsupportedOperationException( + "Sorted queries are not supported" + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); + } + if (queryParams.needsScores() && queryParams.limitLong() > 0) { + throw new UnsupportedOperationException( + "Scored queries are not supported" + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); + } return singleOrClose(indexSearchersMono, indexSearchers -> { - Mono queryParamsMono; - if (transformer == NO_REWRITE) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = Mono - .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())); - } + var localQueryParams = getLocalQueryParams(queryParams); + return Flux + .fromIterable(indexSearchers.llShards()) + .flatMap(searcher -> localSearcher.collect(Mono.just(searcher), localQueryParams, keyFieldName, transformer)) + .collectList() + .map(results -> { + List resultsToDrop = new ArrayList<>(results.size()); + List> resultsFluxes = new ArrayList<>(results.size()); + boolean exactTotalHitsCount = true; + long totalHitsCountValue = 0; + for (LuceneSearchResult result : results) { + resultsToDrop.add(result); + resultsFluxes.add(result.results()); + exactTotalHitsCount &= result.totalHitsCount().exact(); + totalHitsCountValue += result.totalHitsCount().value(); + } - return queryParamsMono.flatMap(queryParams2 -> { - var localQueryParams = getLocalQueryParams(queryParams2); - return Flux - .fromIterable(indexSearchers.llShards()) - .flatMap(searcher -> - localSearcher.collect(Mono.just(searcher), localQueryParams, keyFieldName, transformer)) - .collectList() - .map(results -> { - List resultsToDrop = new ArrayList<>(results.size()); - List> resultsFluxes = new ArrayList<>(results.size()); - boolean exactTotalHitsCount = true; - long totalHitsCountValue = 0; - for (LuceneSearchResult result : results) { - resultsToDrop.add(result); - resultsFluxes.add(result.results()); - exactTotalHitsCount &= result.totalHitsCount().exact(); - totalHitsCountValue += result.totalHitsCount().value(); - } + var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount); + Flux mergedFluxes = Flux + .merge(resultsFluxes) + .skip(queryParams.offsetLong()) + .take(queryParams.limitLong(), true); - var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount); - Flux mergedFluxes = Flux - .merge(resultsFluxes) - .skip(queryParams2.offsetLong()) - .take(queryParams2.limitLong(), true); - - return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { - resultsToDrop.forEach(SimpleResource::close); - try { - indexSearchers.close(); - } catch (UncheckedIOException e) { - LOG.error("Can't close index searchers", e); - } - }); - }) - .doFirst(() -> { - LLUtils.ensureBlocking(); - if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { - throw new UnsupportedOperationException("Sorted queries are not supported" - + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); - } - if (queryParams2.needsScores() && queryParams2.limitLong() > 0) { - throw new UnsupportedOperationException("Scored queries are not supported" - + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); - } - }); - } - ); + return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { + resultsToDrop.forEach(SimpleResource::close); + try { + indexSearchers.close(); + } catch (UncheckedIOException e) { + LOG.error("Can't close index searchers", e); + } + }); + }); }); } diff --git a/src/test/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullScoreDocCollectorTest.java b/src/test/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullScoreDocCollectorTest.java index 7ea4fd0..4c36ca1 100644 --- a/src/test/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullScoreDocCollectorTest.java +++ b/src/test/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullScoreDocCollectorTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.*; import it.cavallium.dbengine.client.query.QueryUtils; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.IndexSearcherManager; import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; import it.cavallium.dbengine.lucene.LLScoreDoc; @@ -78,7 +79,13 @@ public class HugePqFullScoreDocCollectorTest { .toList(); try (var collector = HugePqFullScoreDocCollector.create(env, 20)) { searcher.search(luceneQuery, collector); - var docs = collector.fullDocs().iterate().collectList().blockOptional().orElseThrow(); + var docs = collector + .fullDocs() + .iterate() + .collectList() + .transform(LLUtils::handleDiscard) + .blockOptional() + .orElseThrow(); System.out.println("Expected docs:"); for (LLScoreDoc expectedDoc : expectedDocs) { System.out.println(expectedDoc); @@ -146,23 +153,24 @@ public class HugePqFullScoreDocCollectorTest { .map(scoreDoc -> new LLScoreDoc(scoreDoc.doc, scoreDoc.score, scoreDoc.shardIndex)) .toList(); var collectorManager = HugePqFullScoreDocCollector.createSharedManager(env, 20, Integer.MAX_VALUE); - var collector1 = collectorManager.newCollector(); - var collector2 = collectorManager.newCollector(); - shardSearcher1.search(luceneQuery, collector1); - shardSearcher2.search(luceneQuery, collector2); - try (var results = collectorManager.reduce(List.of(collector1, collector2))) { - var docs = results.iterate().collectList().blockOptional().orElseThrow(); - System.out.println("Expected docs:"); - for (LLScoreDoc expectedDoc : expectedDocs) { - System.out.println(expectedDoc); + try (var collector1 = collectorManager.newCollector(); + var collector2 = collectorManager.newCollector()) { + shardSearcher1.search(luceneQuery, collector1); + shardSearcher2.search(luceneQuery, collector2); + try (var results = collectorManager.reduce(List.of(collector1, collector2))) { + var docs = results.iterate().collectList().blockOptional().orElseThrow(); + System.out.println("Expected docs:"); + for (LLScoreDoc expectedDoc : expectedDocs) { + System.out.println(expectedDoc); + } + System.out.println(""); + System.out.println("Obtained docs:"); + for (LLScoreDoc doc : docs) { + System.out.println(doc); + } + assertEquals(expectedDocs, docs.stream().map(elem -> new LLScoreDoc(elem.doc(), elem.score(), -1)).toList()); + assertEquals(expectedTotalHits, new TotalHitsCount(results.totalHits().value, results.totalHits().relation == Relation.EQUAL_TO)); } - System.out.println(""); - System.out.println("Obtained docs:"); - for (LLScoreDoc doc : docs) { - System.out.println(doc); - } - assertEquals(expectedDocs, docs.stream().map(elem -> new LLScoreDoc(elem.doc(), elem.score(), -1)).toList()); - assertEquals(expectedTotalHits, new TotalHitsCount(results.totalHits().value, results.totalHits().relation == Relation.EQUAL_TO)); } } }