From eb02e0f18d29d9b6c2067cb484623d221550d271 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 15 Jun 2022 18:36:22 +0200 Subject: [PATCH] Test huge queries --- .../dbengine/lucene/EmptyPriorityQueue.java | 3 +- .../cavallium/dbengine/lucene/FullDocs.java | 8 +- .../dbengine/lucene/HugePqPriorityQueue.java | 57 +++++- .../lucene/LLDocElementScoreComparator.java | 2 +- .../dbengine/lucene/PriorityQueue.java | 2 +- .../search/HugePqFullFieldDocCollector.java | 2 +- .../search/HugePqFullScoreDocCollector.java | 15 +- .../lucene/searcher/ShardIndexSearcher.java | 35 ++-- .../searcher/SharedShardStatistics.java | 4 +- .../dbengine/PriorityQueueAdaptor.java | 4 +- .../dbengine/TestHugePqHitQueue.java | 6 +- .../HugePqFullFieldDocCollectorTest.java | 174 ++++++++++++++++++ .../HugePqFullScoreDocCollectorTest.java | 170 +++++++++++++++++ 13 files changed, 440 insertions(+), 42 deletions(-) create mode 100644 src/test/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullFieldDocCollectorTest.java create mode 100644 src/test/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullScoreDocCollectorTest.java diff --git a/src/main/java/it/cavallium/dbengine/lucene/EmptyPriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/EmptyPriorityQueue.java index 46ec3a5..5875f3c 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/EmptyPriorityQueue.java +++ b/src/main/java/it/cavallium/dbengine/lucene/EmptyPriorityQueue.java @@ -24,7 +24,8 @@ public class EmptyPriorityQueue implements PriorityQueue { } @Override - public void replaceTop(T newTop) { + public void replaceTop(T oldTop, T newTop) { + assert oldTop == null; assert newTop == null; } diff --git a/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java index b99f3bc..13af4b7 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java +++ b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java @@ -34,9 +34,7 @@ public interface FullDocs extends ResourceIterable { FullDocs docs = new FullDocs<>() { @Override public void close() { - for (FullDocs fullDoc : fullDocs) { - fullDoc.close(); - } + mergedIterable.close(); } @Override @@ -78,7 +76,7 @@ public interface FullDocs extends ResourceIterable { static ResourceIterable mergeResourceIterable( @Nullable Sort sort, FullDocs[] fullDocs) { - return new ResourceIterable() { + return new ResourceIterable<>() { @Override public void close() { for (FullDocs fullDoc : fullDocs) { @@ -138,7 +136,7 @@ public interface FullDocs extends ResourceIterable { @SuppressWarnings("unchecked") Flux[] fluxes = new Flux[fullDocs.length]; for (int i = 0; i < iterables.length; i++) { var shardIndex = i; - fluxes[i] = iterables[i].map(shard -> { + fluxes[i] = iterables[i].map(shard -> { if (shard instanceof LLScoreDoc scoreDoc) { //noinspection unchecked return (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex); diff --git a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java index 4aa0183..b96720a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java +++ b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java @@ -10,7 +10,6 @@ import it.cavallium.dbengine.database.disk.RocksIterWithReadOpts; import it.cavallium.dbengine.database.disk.StandardRocksDBColumn; import it.cavallium.dbengine.database.disk.UpdateAtomicResultMode; import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious; -import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -42,6 +41,9 @@ public class HugePqPriorityQueue implements PriorityQueue, Reversable codec) { this.tempEnv = env; this.env = env.getEnv(); @@ -74,6 +76,7 @@ public class HugePqPriorityQueue implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable item); + }, rocksIterWithReadOpts -> { + if (rocksIterWithReadOpts != null) { + rocksIterWithReadOpts.close(); + } + }).concatMapIterable(item -> item); } @Override @@ -309,7 +353,6 @@ public class HugePqPriorityQueue implements PriorityQueue, Reversable { @Override public int compare(LLDoc hitA, LLDoc hitB) { - return Float.compare(hitA.score(), hitB.score()); + return Float.compare(hitB.score(), hitA.score()); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/PriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/PriorityQueue.java index 9e00598..f52c932 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/PriorityQueue.java +++ b/src/main/java/it/cavallium/dbengine/lucene/PriorityQueue.java @@ -23,7 +23,7 @@ public interface PriorityQueue extends ResourceIterable, SafeCloseable { /** * Replace the top of the pq with {@code newTop} */ - void replaceTop(T newTop); + void replaceTop(T oldTop, T newTop); /** * Returns the number of elements currently stored in the PriorityQueue. diff --git a/src/main/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullFieldDocCollector.java b/src/main/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullFieldDocCollector.java index 6d50e15..9d4271c 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullFieldDocCollector.java +++ b/src/main/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullFieldDocCollector.java @@ -471,7 +471,7 @@ public abstract class HugePqFullFieldDocCollector extends final void updateBottom(int doc) { // bottom.score is already set to Float.NaN in add(). var bottom = pq.top(); - pq.replaceTop(new LLSlotDoc(docBase + doc, bottom.score(), bottom.shardIndex(), bottom.slot())); + pq.replaceTop(bottom, new LLSlotDoc(docBase + doc, bottom.score(), bottom.shardIndex(), bottom.slot())); } /* diff --git a/src/main/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullScoreDocCollector.java b/src/main/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullScoreDocCollector.java index aa88d6c..d5bc268 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullScoreDocCollector.java +++ b/src/main/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullScoreDocCollector.java @@ -33,6 +33,7 @@ import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.TotalHits; import org.jetbrains.annotations.NotNull; @@ -106,8 +107,7 @@ public abstract class HugePqFullScoreDocCollector extends // If there is a limit, and it's reached, use the replacement logic if (limit != null && pq.size() >= limit) { - var pqTop = pq.top(); - if (pqTop != null && score <= pqTop.score()) { + if (pq.top() != null && score <= pq.top().score()) { if (totalHitsRelation == TotalHits.Relation.EQUAL_TO) { // we just reached totalHitsThreshold, we can start setting the min // competitive score now @@ -119,7 +119,7 @@ public abstract class HugePqFullScoreDocCollector extends return; } else { // Remove the top element, then add the following element - pq.replaceTop(new LLScoreDoc(doc + docBase, score, -1)); + pq.replaceTop(pq.top(), new LLScoreDoc(doc + docBase, score, -1)); // The minimum competitive score will be updated later } } else { @@ -296,13 +296,12 @@ public abstract class HugePqFullScoreDocCollector extends } protected void updateMinCompetitiveScore(Scorable scorer) throws IOException { - var pqTop = pq.top(); if (hitsThresholdChecker.isThresholdReached(true) - && pqTop != null - && pqTop.score() != Float.NEGATIVE_INFINITY) { // -Infinity is the score of sentinels + && pq.top() != null + && pq.top().score() != Float.NEGATIVE_INFINITY) { // -Infinity is the score of sentinels // since we tie-break on doc id and collect in doc id order, we can require // the next float - float localMinScore = Math.nextUp(pqTop.score()); + float localMinScore = Math.nextUp(pq.top().score()); if (localMinScore > minCompetitiveScore) { scorer.setMinCompetitiveScore(localMinScore); totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO; @@ -311,7 +310,7 @@ public abstract class HugePqFullScoreDocCollector extends // we don't use the next float but we register the document // id so that other leaves can require it if they are after // the current maximum - minScoreAcc.accumulate(docBase, pqTop.score()); + minScoreAcc.accumulate(docBase, pq.top().score()); } } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ShardIndexSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ShardIndexSearcher.java index 70fa473..b22c8a5 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ShardIndexSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ShardIndexSearcher.java @@ -8,6 +8,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Optional; import java.util.Set; import org.apache.lucene.index.Term; import org.apache.lucene.index.TermStates; @@ -16,17 +17,16 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.QueryVisitor; import org.apache.lucene.search.TermStatistics; +import org.jetbrains.annotations.Nullable; public class ShardIndexSearcher extends IndexSearcher { public final int myNodeID; private final IndexSearcher[] searchers; - private final Map collectionStatsCache; + private final Map collectionStatsCache; private final Map termStatsCache; - public ShardIndexSearcher(SharedShardStatistics sharedShardStatistics, - List searchers, - int nodeID) { + public ShardIndexSearcher(SharedShardStatistics sharedShardStatistics, List searchers, int nodeID) { super(searchers.get(nodeID).getIndexReader(), searchers.get(nodeID).getExecutor()); this.collectionStatsCache = sharedShardStatistics.collectionStatsCache; this.termStatsCache = sharedShardStatistics.termStatsCache; @@ -93,15 +93,13 @@ public class ShardIndexSearcher extends IndexSearcher { final Set missing = new HashSet<>(); for (Term term : terms) { - final TermAndShard key = - new TermAndShard(nodeID, term); + final TermAndShard key = new TermAndShard(nodeID, term); if (!termStatsCache.containsKey(key)) { missing.add(term); } } if (missing.size() != 0) { - for (Map.Entry ent : - getNodeTermStats(missing, nodeID).entrySet()) { + for (Map.Entry ent : getNodeTermStats(missing, nodeID).entrySet()) { if (ent.getValue() != null) { final TermAndShard key = new TermAndShard(nodeID, ent.getKey()); termStatsCache.put(key, ent.getValue()); @@ -115,8 +113,7 @@ public class ShardIndexSearcher extends IndexSearcher { // Mock: in a real env, this would hit the wire and get // term stats from remote node - Map getNodeTermStats(Set terms, int nodeID) - throws IOException { + Map getNodeTermStats(Set terms, int nodeID) throws IOException { var s = searchers[nodeID]; final Map stats = new HashMap<>(); if (s == null) { @@ -143,8 +140,7 @@ public class ShardIndexSearcher extends IndexSearcher { if (nodeID == myNodeID) { subStats = super.termStatistics(term, docFreq, totalTermFreq); } else { - final TermAndShard key = - new TermAndShard(nodeID, term); + final TermAndShard key = new TermAndShard(nodeID, term); subStats = termStatsCache.get(key); if (subStats == null) { continue; // term not found @@ -176,8 +172,14 @@ public class ShardIndexSearcher extends IndexSearcher { final CollectionStatistics nodeStats; if (nodeID == myNodeID) { nodeStats = super.collectionStatistics(field); + collectionStatsCache.put(key, new CachedCollectionStatistics(nodeStats)); } else { - nodeStats = collectionStatsCache.get(key); + var nodeStatsOptional = collectionStatsCache.get(key); + if (nodeStatsOptional == null) { + nodeStatsOptional = new CachedCollectionStatistics(computeNodeCollectionStatistics(key)); + collectionStatsCache.put(key, nodeStatsOptional); + } + nodeStats = nodeStatsOptional.collectionStatistics(); } if (nodeStats == null) { continue; // field not in sub at all @@ -203,6 +205,13 @@ public class ShardIndexSearcher extends IndexSearcher { } } + private CollectionStatistics computeNodeCollectionStatistics(FieldAndShar fieldAndShard) throws IOException { + var searcher = searchers[fieldAndShard.nodeID]; + return searcher.collectionStatistics(fieldAndShard.field); + } + + public record CachedCollectionStatistics(@Nullable CollectionStatistics collectionStatistics) {} + public static class TermAndShard { private final int nodeID; private final Term term; diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SharedShardStatistics.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SharedShardStatistics.java index 51280db..bfb9d75 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SharedShardStatistics.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SharedShardStatistics.java @@ -1,14 +1,16 @@ package it.cavallium.dbengine.lucene.searcher; +import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher.CachedCollectionStatistics; import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher.FieldAndShar; import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher.TermAndShard; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import org.apache.lucene.search.CollectionStatistics; import org.apache.lucene.search.TermStatistics; public class SharedShardStatistics { - public final Map collectionStatsCache = new ConcurrentHashMap<>(); + public final Map collectionStatsCache = new ConcurrentHashMap<>(); public final Map termStatsCache = new ConcurrentHashMap<>(); } diff --git a/src/test/java/it/cavallium/dbengine/PriorityQueueAdaptor.java b/src/test/java/it/cavallium/dbengine/PriorityQueueAdaptor.java index d8b40de..42d5362 100644 --- a/src/test/java/it/cavallium/dbengine/PriorityQueueAdaptor.java +++ b/src/test/java/it/cavallium/dbengine/PriorityQueueAdaptor.java @@ -4,6 +4,7 @@ import it.cavallium.dbengine.lucene.PriorityQueue; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import org.apache.lucene.search.HitQueue; import reactor.core.publisher.Flux; @@ -35,7 +36,8 @@ public class PriorityQueueAdaptor implements PriorityQueue { } @Override - public void replaceTop(T newTop) { + public void replaceTop(T oldTop, T newTop) { + assert Objects.equals(oldTop, hitQueue.top()); hitQueue.updateTop(newTop); } diff --git a/src/test/java/it/cavallium/dbengine/TestHugePqHitQueue.java b/src/test/java/it/cavallium/dbengine/TestHugePqHitQueue.java index 7e02e27..6fccb72 100644 --- a/src/test/java/it/cavallium/dbengine/TestHugePqHitQueue.java +++ b/src/test/java/it/cavallium/dbengine/TestHugePqHitQueue.java @@ -311,9 +311,9 @@ public class TestHugePqHitQueue { } @Override - public void replaceTop(ScoreDoc newTop) { - referenceQueue.replaceTop(newTop); - myQueue.replaceTop(newTop); + public void replaceTop(ScoreDoc oldTop, ScoreDoc newTop) { + referenceQueue.replaceTop(oldTop, newTop); + myQueue.replaceTop(oldTop, newTop); } @Override diff --git a/src/test/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullFieldDocCollectorTest.java b/src/test/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullFieldDocCollectorTest.java new file mode 100644 index 0000000..20642b0 --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullFieldDocCollectorTest.java @@ -0,0 +1,174 @@ +package it.cavallium.dbengine.lucene.hugepq.search; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; +import it.cavallium.dbengine.lucene.LLFieldDoc; +import it.cavallium.dbengine.lucene.LLScoreDoc; +import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer; +import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher; +import it.cavallium.dbengine.lucene.searcher.SharedShardStatistics; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortField.Type; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.search.TopFieldCollector; +import org.apache.lucene.search.TopScoreDocCollector; +import org.apache.lucene.search.TotalHits.Relation; +import org.apache.lucene.store.ByteBuffersDirectory; +import org.apache.lucene.util.QueryBuilder; +import org.junit.jupiter.api.Test; + +public class HugePqFullFieldDocCollectorTest { + Sort sort = new Sort(new SortedNumericSortField("number_sort", Type.LONG)); + Query luceneQuery = LongPoint.newRangeQuery("number", -100, 100); + + @Test + public void testSingleShard() throws IOException { + try (var dir = new ByteBuffersDirectory(); var env = new LLTempHugePqEnv()) { + var analyzer = new WordAnalyzer(true, true); + var writer = new IndexWriter(dir, new IndexWriterConfig(analyzer)); + writer.updateDocument(new Term("id", "00"), List.of(new SortedNumericDocValuesField("number_sort", 1), new LongPoint("number", 1))); + writer.updateDocument(new Term("id", "01"), List.of(new SortedNumericDocValuesField("number_sort", 44), new LongPoint("number", 44))); + writer.updateDocument(new Term("id", "02"), List.of(new SortedNumericDocValuesField("number_sort", 203), new LongPoint("number", 203))); + writer.updateDocument(new Term("id", "03"), List.of(new SortedNumericDocValuesField("number_sort", 209), new LongPoint("number", 209))); + writer.updateDocument(new Term("id", "04"), List.of(new SortedNumericDocValuesField("number_sort", -33), new LongPoint("number", -33))); + writer.updateDocument(new Term("id", "05"), List.of(new SortedNumericDocValuesField("number_sort", 0), new LongPoint("number", 0))); + writer.updateDocument(new Term("id", "06"), List.of(new SortedNumericDocValuesField("number_sort", 933), new LongPoint("number", 933))); + writer.updateDocument(new Term("id", "07"), List.of(new SortedNumericDocValuesField("number_sort", 6), new LongPoint("number", 6))); + writer.updateDocument(new Term("id", "08"), List.of(new SortedNumericDocValuesField("number_sort", -11), new LongPoint("number", -11))); + writer.updateDocument(new Term("id", "09"), List.of(new SortedNumericDocValuesField("number_sort", 9996), new LongPoint("number", 9996))); + writer.updateDocument(new Term("id", "10"), List.of(new SortedNumericDocValuesField("number_sort", 9), new LongPoint("number", 9))); + writer.updateDocument(new Term("id", "11"), List.of(new SortedNumericDocValuesField("number_sort", 66), new LongPoint("number", 66))); + writer.updateDocument(new Term("id", "12"), List.of(new SortedNumericDocValuesField("number_sort", 88), new LongPoint("number", 88))); + writer.updateDocument(new Term("id", "13"), List.of(new SortedNumericDocValuesField("number_sort", 222), new LongPoint("number", 222))); + writer.updateDocument(new Term("id", "14"), List.of(new SortedNumericDocValuesField("number_sort", -2), new LongPoint("number", -2))); + writer.updateDocument(new Term("id", "15"), List.of(new SortedNumericDocValuesField("number_sort", 7), new LongPoint("number", 7))); + writer.updateDocument(new Term("id", "16"), List.of(new SortedNumericDocValuesField("number_sort", 1010912093), new LongPoint("number", 1010912093))); + writer.updateDocument(new Term("id", "17"), List.of(new SortedNumericDocValuesField("number_sort", -3894789), new LongPoint("number", -3894789))); + writer.updateDocument(new Term("id", "18"), List.of(new SortedNumericDocValuesField("number_sort", 122), new LongPoint("number", 122))); + writer.updateDocument(new Term("id", "19"), List.of(new SortedNumericDocValuesField("number_sort", 2), new LongPoint("number", 2))); + writer.flush(); + writer.commit(); + try (var reader = DirectoryReader.open(writer, true, true)) { + var searcher = new IndexSearcher(reader); + var expectedResults = searcher.search(luceneQuery, 20, sort, false); + var expectedTotalHits = new TotalHitsCount(expectedResults.totalHits.value, expectedResults.totalHits.relation == Relation.EQUAL_TO); + var expectedDocs = Arrays + .stream(expectedResults.scoreDocs) + .map(sd -> (FieldDoc) sd) + .map(fieldDoc -> new LLFieldDoc(fieldDoc.doc, fieldDoc.score, fieldDoc.shardIndex, Arrays.asList(fieldDoc.fields))) + .toList(); + try (var collector = HugePqFullFieldDocCollector.create(env, sort, 20, Integer.MAX_VALUE)) { + searcher.search(luceneQuery, collector); + var docs = collector.fullDocs().iterate().collectList().blockOptional().orElseThrow(); + System.out.println("Expected docs:"); + for (var expectedDoc : expectedDocs) { + System.out.println(expectedDoc); + } + System.out.println(""); + System.out.println("Obtained docs:"); + for (var doc : docs) { + System.out.println(doc); + } + assertEquals(expectedDocs, + docs.stream().map(elem -> new LLFieldDoc(elem.doc(), elem.score(), -1, elem.fields())).toList() + ); + assertEquals(expectedTotalHits, new TotalHitsCount(collector.getTotalHits(), true)); + } + } + } + } + + @Test + public void testMultiShard() throws IOException { + try (var dir1 = new ByteBuffersDirectory(); var dir2 = new ByteBuffersDirectory(); var env = new LLTempHugePqEnv()) { + var analyzer = new WordAnalyzer(true, true); + var writer1 = new IndexWriter(dir1, new IndexWriterConfig(analyzer)); + var writer2 = new IndexWriter(dir2, new IndexWriterConfig(analyzer)); + writer1.updateDocument(new Term("id", "00"), List.of(new SortedNumericDocValuesField("number_sort", 1), new LongPoint("number", 1))); + writer1.updateDocument(new Term("id", "01"), List.of(new SortedNumericDocValuesField("number_sort", 44), new LongPoint("number", 44))); + writer1.updateDocument(new Term("id", "02"), List.of(new SortedNumericDocValuesField("number_sort", 203), new LongPoint("number", 203))); + writer1.updateDocument(new Term("id", "03"), List.of(new SortedNumericDocValuesField("number_sort", 209), new LongPoint("number", 209))); + writer1.updateDocument(new Term("id", "04"), List.of(new SortedNumericDocValuesField("number_sort", -33), new LongPoint("number", -33))); + writer1.updateDocument(new Term("id", "05"), List.of(new SortedNumericDocValuesField("number_sort", 0), new LongPoint("number", 0))); + writer1.updateDocument(new Term("id", "06"), List.of(new SortedNumericDocValuesField("number_sort", 933), new LongPoint("number", 933))); + writer1.updateDocument(new Term("id", "07"), List.of(new SortedNumericDocValuesField("number_sort", 6), new LongPoint("number", 6))); + writer1.updateDocument(new Term("id", "08"), List.of(new SortedNumericDocValuesField("number_sort", -11), new LongPoint("number", -11))); + writer1.updateDocument(new Term("id", "09"), List.of(new SortedNumericDocValuesField("number_sort", 9996), new LongPoint("number", 9996))); + writer2.updateDocument(new Term("id", "10"), List.of(new SortedNumericDocValuesField("number_sort", 9), new LongPoint("number", 9))); + writer2.updateDocument(new Term("id", "11"), List.of(new SortedNumericDocValuesField("number_sort", 66), new LongPoint("number", 66))); + writer2.updateDocument(new Term("id", "12"), List.of(new SortedNumericDocValuesField("number_sort", 88), new LongPoint("number", 88))); + writer2.updateDocument(new Term("id", "13"), List.of(new SortedNumericDocValuesField("number_sort", 222), new LongPoint("number", 222))); + writer2.updateDocument(new Term("id", "14"), List.of(new SortedNumericDocValuesField("number_sort", -2), new LongPoint("number", -2))); + writer2.updateDocument(new Term("id", "15"), List.of(new SortedNumericDocValuesField("number_sort", 7), new LongPoint("number", 7))); + writer2.updateDocument(new Term("id", "16"), List.of(new SortedNumericDocValuesField("number_sort", 1010912093), new LongPoint("number", 1010912093))); + writer2.updateDocument(new Term("id", "17"), List.of(new SortedNumericDocValuesField("number_sort", -3894789), new LongPoint("number", -3894789))); + writer2.updateDocument(new Term("id", "18"), List.of(new SortedNumericDocValuesField("number_sort", 122), new LongPoint("number", 122))); + writer2.updateDocument(new Term("id", "19"), List.of(new SortedNumericDocValuesField("number_sort", 2), new LongPoint("number", 2))); + writer1.flush(); + writer2.flush(); + writer1.commit(); + writer2.commit(); + var sharedStats = new SharedShardStatistics(); + try (var reader1 = DirectoryReader.open(writer1, true, true); + var reader2 = DirectoryReader.open(writer2, true, true)) { + var searcher1 = new IndexSearcher(reader1); + var searcher2 = new IndexSearcher(reader2); + var shardSearcher1 = new ShardIndexSearcher(sharedStats, List.of(searcher1, searcher2), 0); + var shardSearcher2 = new ShardIndexSearcher(sharedStats, List.of(searcher1, searcher2), 1); + var standardSharedManager = TopFieldCollector.createSharedManager(sort, 20, null, Integer.MAX_VALUE); + var standardCollector1 = standardSharedManager.newCollector(); + var standardCollector2 = standardSharedManager.newCollector(); + shardSearcher1.search(luceneQuery, standardCollector1); + shardSearcher2.search(luceneQuery, standardCollector2); + var expectedResults = standardSharedManager.reduce(List.of(standardCollector1, standardCollector2)); + var expectedTotalHits = new TotalHitsCount(expectedResults.totalHits.value, expectedResults.totalHits.relation == Relation.EQUAL_TO); + var expectedDocs = Arrays + .stream(expectedResults.scoreDocs) + .map(sd -> (FieldDoc) sd) + .map(fieldDoc -> new LLFieldDoc(fieldDoc.doc, fieldDoc.score, fieldDoc.shardIndex, Arrays.asList(fieldDoc.fields))) + .toList(); + var collectorManager = HugePqFullFieldDocCollector.createSharedManager(env, sort, 20, Integer.MAX_VALUE); + var collector1 = collectorManager.newCollector(); + var collector2 = collectorManager.newCollector(); + shardSearcher1.search(luceneQuery, collector1); + shardSearcher2.search(luceneQuery, collector2); + try (var results = collectorManager.reduce(List.of(collector1, collector2))) { + var docs = results.iterate().collectList().blockOptional().orElseThrow(); + System.out.println("Expected docs:"); + for (var expectedDoc : expectedDocs) { + System.out.println(expectedDoc); + } + System.out.println(""); + System.out.println("Obtained docs:"); + for (var doc : docs) { + System.out.println(doc); + } + assertEquals(expectedDocs, + docs.stream().map(elem -> new LLFieldDoc(elem.doc(), elem.score(), -1, elem.fields())).toList() + ); + assertEquals(expectedTotalHits, new TotalHitsCount(results.totalHits().value, results.totalHits().relation == Relation.EQUAL_TO)); + } + } + } + } +} \ No newline at end of file diff --git a/src/test/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullScoreDocCollectorTest.java b/src/test/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullScoreDocCollectorTest.java new file mode 100644 index 0000000..7ea4fd0 --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/lucene/hugepq/search/HugePqFullScoreDocCollectorTest.java @@ -0,0 +1,170 @@ +package it.cavallium.dbengine.lucene.hugepq.search; + +import static org.junit.jupiter.api.Assertions.*; + +import it.cavallium.dbengine.client.query.QueryUtils; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.database.disk.IndexSearcherManager; +import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; +import it.cavallium.dbengine.lucene.LLScoreDoc; +import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.analyzer.LegacyWordAnalyzer; +import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; +import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer; +import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher; +import it.cavallium.dbengine.lucene.searcher.SharedShardStatistics; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.PhraseQuery; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.search.SimpleCollector; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopDocsCollector; +import org.apache.lucene.search.TopScoreDocCollector; +import org.apache.lucene.search.TotalHits.Relation; +import org.apache.lucene.store.ByteBuffersDirectory; +import org.apache.lucene.util.QueryBuilder; +import org.junit.jupiter.api.Test; + +public class HugePqFullScoreDocCollectorTest { + + @Test + public void testSingleShard() throws IOException { + try (var dir = new ByteBuffersDirectory(); var env = new LLTempHugePqEnv()) { + var analyzer = new WordAnalyzer(true, true); + var writer = new IndexWriter(dir, new IndexWriterConfig(analyzer)); + writer.updateDocument(new Term("id", "00"), List.of(new TextField("text", "Mario Rossi", Store.YES))); + writer.updateDocument(new Term("id", "01"), List.of(new TextField("text", "Mario Rossi", Store.YES))); + writer.updateDocument(new Term("id", "02"), List.of(new TextField("text", "Mario Rossi", Store.YES))); + writer.updateDocument(new Term("id", "03"), List.of(new TextField("text", "Marios Rossi", Store.YES))); + writer.updateDocument(new Term("id", "04"), List.of(new TextField("text", "Rossi", Store.YES))); + writer.updateDocument(new Term("id", "05"), List.of(new TextField("text", "Rossi", Store.YES))); + writer.updateDocument(new Term("id", "06"), List.of(new TextField("text", "Rossi", Store.YES))); + writer.updateDocument(new Term("id", "07"), List.of(new TextField("text", "Rossi", Store.YES))); + writer.updateDocument(new Term("id", "08"), List.of(new TextField("text", "Rossi", Store.YES))); + writer.updateDocument(new Term("id", "09"), List.of(new TextField("text", "Rossi", Store.YES))); + writer.updateDocument(new Term("id", "10"), List.of(new TextField("text", "ROSSI UA", Store.YES))); + writer.updateDocument(new Term("id", "11"), List.of(new TextField("text", "Mario Barman", Store.YES))); + writer.updateDocument(new Term("id", "12"), List.of(new TextField("text", "Mario batman", Store.YES))); + writer.updateDocument(new Term("id", "13"), List.of(new TextField("text", "Admin Rossi desk", Store.YES))); + writer.updateDocument(new Term("id", "14"), List.of(new TextField("text", "MRI Marios bot", Store.YES))); + writer.updateDocument(new Term("id", "15"), List.of(new TextField("text", "Mario Rossi [beta]", Store.YES))); + writer.updateDocument(new Term("id", "16"), List.of(new TextField("text", "Mario Music Bot", Store.YES))); + writer.updateDocument(new Term("id", "17"), List.of(new TextField("text", "Mario night mode", Store.YES))); + writer.updateDocument(new Term("id", "18"), List.of(new TextField("text", "Mario stats bot", Store.YES))); + writer.updateDocument(new Term("id", "19"), List.of(new TextField("text", "Very very long text with Mario Giovanni and Rossi inside", Store.YES))); + writer.flush(); + writer.commit(); + try (var reader = DirectoryReader.open(writer, true, true)) { + var searcher = new IndexSearcher(reader); + var qb = new QueryBuilder(analyzer); + var luceneQuery = qb.createMinShouldMatchQuery("text", "Mario rossi", 0.3f); + var expectedResults = searcher.search(luceneQuery, 20); + var expectedTotalHits = new TotalHitsCount(expectedResults.totalHits.value, expectedResults.totalHits.relation == Relation.EQUAL_TO); + var expectedDocs = Arrays + .stream(expectedResults.scoreDocs) + .map(scoreDoc -> new LLScoreDoc(scoreDoc.doc, scoreDoc.score, scoreDoc.shardIndex)) + .toList(); + try (var collector = HugePqFullScoreDocCollector.create(env, 20)) { + searcher.search(luceneQuery, collector); + var docs = collector.fullDocs().iterate().collectList().blockOptional().orElseThrow(); + System.out.println("Expected docs:"); + for (LLScoreDoc expectedDoc : expectedDocs) { + System.out.println(expectedDoc); + } + System.out.println(""); + System.out.println("Obtained docs:"); + for (LLScoreDoc doc : docs) { + System.out.println(doc); + } + assertEquals(expectedDocs, docs.stream().map(elem -> new LLScoreDoc(elem.doc(), elem.score(), -1)).toList()); + assertEquals(expectedTotalHits, new TotalHitsCount(collector.getTotalHits(), true)); + } + } + } + } + + @Test + public void testMultiShard() throws IOException { + try (var dir1 = new ByteBuffersDirectory(); var dir2 = new ByteBuffersDirectory(); var env = new LLTempHugePqEnv()) { + var analyzer = new WordAnalyzer(true, true); + var writer1 = new IndexWriter(dir1, new IndexWriterConfig(analyzer)); + var writer2 = new IndexWriter(dir2, new IndexWriterConfig(analyzer)); + writer1.updateDocument(new Term("id", "00"), List.of(new TextField("text", "Mario Rossi", Store.YES))); + writer1.updateDocument(new Term("id", "01"), List.of(new TextField("text", "Mario Rossi", Store.YES))); + writer1.updateDocument(new Term("id", "02"), List.of(new TextField("text", "Mario Rossi", Store.YES))); + writer1.updateDocument(new Term("id", "03"), List.of(new TextField("text", "Marios Rossi", Store.YES))); + writer1.updateDocument(new Term("id", "04"), List.of(new TextField("text", "Rossi", Store.YES))); + writer1.updateDocument(new Term("id", "05"), List.of(new TextField("text", "Rossi", Store.YES))); + writer1.updateDocument(new Term("id", "06"), List.of(new TextField("text", "Rossi", Store.YES))); + writer1.updateDocument(new Term("id", "07"), List.of(new TextField("text", "Rossi", Store.YES))); + writer1.updateDocument(new Term("id", "08"), List.of(new TextField("text", "Rossi", Store.YES))); + writer1.updateDocument(new Term("id", "09"), List.of(new TextField("text", "Rossi", Store.YES))); + writer2.updateDocument(new Term("id", "10"), List.of(new TextField("text", "ROSSI UA", Store.YES))); + writer2.updateDocument(new Term("id", "11"), List.of(new TextField("text", "Mario Barman", Store.YES))); + writer2.updateDocument(new Term("id", "12"), List.of(new TextField("text", "Mario batman", Store.YES))); + writer2.updateDocument(new Term("id", "13"), List.of(new TextField("text", "Admin Rossi desk", Store.YES))); + writer2.updateDocument(new Term("id", "14"), List.of(new TextField("text", "MRI Marios bot", Store.YES))); + writer2.updateDocument(new Term("id", "15"), List.of(new TextField("text", "Mario Rossi [beta]", Store.YES))); + writer2.updateDocument(new Term("id", "16"), List.of(new TextField("text", "Mario Music Bot", Store.YES))); + writer2.updateDocument(new Term("id", "17"), List.of(new TextField("text", "Mario night mode", Store.YES))); + writer2.updateDocument(new Term("id", "18"), List.of(new TextField("text", "Mario stats bot", Store.YES))); + writer2.updateDocument(new Term("id", "19"), List.of(new TextField("text", "Very very long text with Mario Giovanni and Rossi inside", Store.YES))); + writer1.flush(); + writer2.flush(); + writer1.commit(); + writer2.commit(); + var sharedStats = new SharedShardStatistics(); + try (var reader1 = DirectoryReader.open(writer1, true, true); + var reader2 = DirectoryReader.open(writer2, true, true)) { + var searcher1 = new IndexSearcher(reader1); + var searcher2 = new IndexSearcher(reader2); + var shardSearcher1 = new ShardIndexSearcher(sharedStats, List.of(searcher1, searcher2), 0); + var shardSearcher2 = new ShardIndexSearcher(sharedStats, List.of(searcher1, searcher2), 1); + var qb = new QueryBuilder(analyzer); + var luceneQuery = qb.createMinShouldMatchQuery("text", "Mario rossi", 0.3f); + var standardSharedManager = TopScoreDocCollector.createSharedManager(20, null, Integer.MAX_VALUE); + var standardCollector1 = standardSharedManager.newCollector(); + var standardCollector2 = standardSharedManager.newCollector(); + shardSearcher1.search(luceneQuery, standardCollector1); + shardSearcher2.search(luceneQuery, standardCollector2); + var expectedResults = standardSharedManager.reduce(List.of(standardCollector1, standardCollector2)); + var expectedTotalHits = new TotalHitsCount(expectedResults.totalHits.value, expectedResults.totalHits.relation == Relation.EQUAL_TO); + var expectedDocs = Arrays + .stream(expectedResults.scoreDocs) + .map(scoreDoc -> new LLScoreDoc(scoreDoc.doc, scoreDoc.score, scoreDoc.shardIndex)) + .toList(); + var collectorManager = HugePqFullScoreDocCollector.createSharedManager(env, 20, Integer.MAX_VALUE); + var collector1 = collectorManager.newCollector(); + var collector2 = collectorManager.newCollector(); + shardSearcher1.search(luceneQuery, collector1); + shardSearcher2.search(luceneQuery, collector2); + try (var results = collectorManager.reduce(List.of(collector1, collector2))) { + var docs = results.iterate().collectList().blockOptional().orElseThrow(); + System.out.println("Expected docs:"); + for (LLScoreDoc expectedDoc : expectedDocs) { + System.out.println(expectedDoc); + } + System.out.println(""); + System.out.println("Obtained docs:"); + for (LLScoreDoc doc : docs) { + System.out.println(doc); + } + assertEquals(expectedDocs, docs.stream().map(elem -> new LLScoreDoc(elem.doc(), elem.score(), -1)).toList()); + assertEquals(expectedTotalHits, new TotalHitsCount(results.totalHits().value, results.totalHits().relation == Relation.EQUAL_TO)); + } + } + } + } +} \ No newline at end of file