Test huge queries

This commit is contained in:
Andrea Cavalli 2022-06-15 18:36:22 +02:00
parent 4a08a876ca
commit eb02e0f18d
13 changed files with 440 additions and 42 deletions

View File

@ -24,7 +24,8 @@ public class EmptyPriorityQueue<T> implements PriorityQueue<T> {
}
@Override
public void replaceTop(T newTop) {
public void replaceTop(T oldTop, T newTop) {
assert oldTop == null;
assert newTop == null;
}

View File

@ -34,9 +34,7 @@ public interface FullDocs<T extends LLDoc> extends ResourceIterable<T> {
FullDocs<T> docs = new FullDocs<>() {
@Override
public void close() {
for (FullDocs<T> fullDoc : fullDocs) {
fullDoc.close();
}
mergedIterable.close();
}
@Override
@ -78,7 +76,7 @@ public interface FullDocs<T extends LLDoc> extends ResourceIterable<T> {
static <T extends LLDoc> ResourceIterable<T> mergeResourceIterable(
@Nullable Sort sort,
FullDocs<T>[] fullDocs) {
return new ResourceIterable<T>() {
return new ResourceIterable<>() {
@Override
public void close() {
for (FullDocs<T> fullDoc : fullDocs) {
@ -138,7 +136,7 @@ public interface FullDocs<T extends LLDoc> extends ResourceIterable<T> {
@SuppressWarnings("unchecked") Flux<T>[] fluxes = new Flux[fullDocs.length];
for (int i = 0; i < iterables.length; i++) {
var shardIndex = i;
fluxes[i] = iterables[i].<T>map(shard -> {
fluxes[i] = iterables[i].map(shard -> {
if (shard instanceof LLScoreDoc scoreDoc) {
//noinspection unchecked
return (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex);

View File

@ -10,7 +10,6 @@ import it.cavallium.dbengine.database.disk.RocksIterWithReadOpts;
import it.cavallium.dbengine.database.disk.StandardRocksDBColumn;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultMode;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious;
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -42,6 +41,9 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
private long size = 0;
private T cachedTop;
private boolean cachedTopSet = false;
public HugePqPriorityQueue(LLTempHugePqEnv env, HugePqCodec<T> codec) {
this.tempEnv = env;
this.env = env.getEnv();
@ -74,6 +76,7 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
public void add(T element) {
ensureThread();
cachedTopSet = false;
var keyBuf = serializeKey(element);
try (keyBuf) {
try (var readOptions = newReadOptions(); var writeOptions = newWriteOptions()) {
@ -107,6 +110,9 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
@Override
public T top() {
if (cachedTopSet) {
return cachedTop;
}
ensureThread();
return databaseTop();
}
@ -118,9 +124,14 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
if (it.isValid()) {
var key = it.key();
try (var keyBuf = rocksDB.getAllocator().copyOf(key)) {
return deserializeKey(keyBuf);
var top = deserializeKey(keyBuf);
cachedTop = top;
cachedTopSet = true;
return top;
}
} else {
cachedTop = null;
cachedTopSet = true;
return null;
}
} catch (RocksDBException e) {
@ -131,6 +142,7 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
@Override
public T pop() {
ensureThread();
cachedTopSet = false;
try (var readOptions = newReadOptions();
var writeOptions = newWriteOptions();
var it = rocksDB.newRocksIterator(true, readOptions, LLRange.all(), false)) {
@ -177,10 +189,36 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
}
@Override
public void replaceTop(T newTop) {
public void replaceTop(T oldTop, T newTop) {
ensureThread();
this.pop();
this.add(newTop);
cachedTopSet = false;
if (oldTop == null) {
add(newTop);
cachedTop = newTop;
cachedTopSet = true;
} else {
try (var readOptions = newReadOptions();
var writeOptions = newWriteOptions();
var oldKeyBuf = serializeKey(oldTop);
var newKeyBuf = serializeKey(newTop);
var ignored = rocksDB.updateAtomic(readOptions,
writeOptions,
oldKeyBuf,
this::reduceOrRemove,
UpdateAtomicResultMode.NOTHING
);
var ignored2 = rocksDB.updateAtomic(readOptions,
writeOptions,
newKeyBuf,
this::incrementOrAdd,
UpdateAtomicResultMode.NOTHING
)) {
cachedTop = newTop;
cachedTopSet = true;
} catch (IOException ex) {
throw new IllegalStateException(ex);
}
}
}
@Override
@ -192,6 +230,7 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
@Override
public void clear() {
ensureThread();
cachedTopSet = false;
try (var wb = new WriteBatch(); var wo = newWriteOptions()) {
wb.deleteRange(rocksDB.getColumnFamilyHandle(), new byte[0], getBiggestKey());
size = 0;
@ -211,6 +250,7 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
public boolean remove(@NotNull T element) {
ensureThread();
Objects.requireNonNull(element);
cachedTopSet = false;
try (var readOptions = newReadOptions();
var writeOptions = newWriteOptions();
var keyBuf = serializeKey(element)) {
@ -294,7 +334,11 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
}
return t;
}, RocksIterWithReadOpts::close).concatMapIterable(item -> item);
}, rocksIterWithReadOpts -> {
if (rocksIterWithReadOpts != null) {
rocksIterWithReadOpts.close();
}
}).concatMapIterable(item -> item);
}
@Override
@ -309,7 +353,6 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
ensureThread();
this.tempEnv.freeDb(hugePqId);
if (this.codec instanceof SafeCloseable closeable) {
closeable.close();

View File

@ -8,6 +8,6 @@ class LLDocElementScoreComparator implements Comparator<LLDoc> {
@Override
public int compare(LLDoc hitA, LLDoc hitB) {
return Float.compare(hitA.score(), hitB.score());
return Float.compare(hitB.score(), hitA.score());
}
}

View File

@ -23,7 +23,7 @@ public interface PriorityQueue<T> extends ResourceIterable<T>, SafeCloseable {
/**
* Replace the top of the pq with {@code newTop}
*/
void replaceTop(T newTop);
void replaceTop(T oldTop, T newTop);
/**
* Returns the number of elements currently stored in the PriorityQueue.

View File

@ -471,7 +471,7 @@ public abstract class HugePqFullFieldDocCollector extends
final void updateBottom(int doc) {
// bottom.score is already set to Float.NaN in add().
var bottom = pq.top();
pq.replaceTop(new LLSlotDoc(docBase + doc, bottom.score(), bottom.shardIndex(), bottom.slot()));
pq.replaceTop(bottom, new LLSlotDoc(docBase + doc, bottom.score(), bottom.shardIndex(), bottom.slot()));
}
/*

View File

@ -33,6 +33,7 @@ import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.TotalHits;
import org.jetbrains.annotations.NotNull;
@ -106,8 +107,7 @@ public abstract class HugePqFullScoreDocCollector extends
// If there is a limit, and it's reached, use the replacement logic
if (limit != null && pq.size() >= limit) {
var pqTop = pq.top();
if (pqTop != null && score <= pqTop.score()) {
if (pq.top() != null && score <= pq.top().score()) {
if (totalHitsRelation == TotalHits.Relation.EQUAL_TO) {
// we just reached totalHitsThreshold, we can start setting the min
// competitive score now
@ -119,7 +119,7 @@ public abstract class HugePqFullScoreDocCollector extends
return;
} else {
// Remove the top element, then add the following element
pq.replaceTop(new LLScoreDoc(doc + docBase, score, -1));
pq.replaceTop(pq.top(), new LLScoreDoc(doc + docBase, score, -1));
// The minimum competitive score will be updated later
}
} else {
@ -296,13 +296,12 @@ public abstract class HugePqFullScoreDocCollector extends
}
protected void updateMinCompetitiveScore(Scorable scorer) throws IOException {
var pqTop = pq.top();
if (hitsThresholdChecker.isThresholdReached(true)
&& pqTop != null
&& pqTop.score() != Float.NEGATIVE_INFINITY) { // -Infinity is the score of sentinels
&& pq.top() != null
&& pq.top().score() != Float.NEGATIVE_INFINITY) { // -Infinity is the score of sentinels
// since we tie-break on doc id and collect in doc id order, we can require
// the next float
float localMinScore = Math.nextUp(pqTop.score());
float localMinScore = Math.nextUp(pq.top().score());
if (localMinScore > minCompetitiveScore) {
scorer.setMinCompetitiveScore(localMinScore);
totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO;
@ -311,7 +310,7 @@ public abstract class HugePqFullScoreDocCollector extends
// we don't use the next float but we register the document
// id so that other leaves can require it if they are after
// the current maximum
minScoreAcc.accumulate(docBase, pqTop.score());
minScoreAcc.accumulate(docBase, pq.top().score());
}
}
}

View File

@ -8,6 +8,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermStates;
@ -16,17 +17,16 @@ import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryVisitor;
import org.apache.lucene.search.TermStatistics;
import org.jetbrains.annotations.Nullable;
public class ShardIndexSearcher extends IndexSearcher {
public final int myNodeID;
private final IndexSearcher[] searchers;
private final Map<FieldAndShar, CollectionStatistics> collectionStatsCache;
private final Map<FieldAndShar, CachedCollectionStatistics> collectionStatsCache;
private final Map<TermAndShard, TermStatistics> termStatsCache;
public ShardIndexSearcher(SharedShardStatistics sharedShardStatistics,
List<IndexSearcher> searchers,
int nodeID) {
public ShardIndexSearcher(SharedShardStatistics sharedShardStatistics, List<IndexSearcher> searchers, int nodeID) {
super(searchers.get(nodeID).getIndexReader(), searchers.get(nodeID).getExecutor());
this.collectionStatsCache = sharedShardStatistics.collectionStatsCache;
this.termStatsCache = sharedShardStatistics.termStatsCache;
@ -93,15 +93,13 @@ public class ShardIndexSearcher extends IndexSearcher {
final Set<Term> missing = new HashSet<>();
for (Term term : terms) {
final TermAndShard key =
new TermAndShard(nodeID, term);
final TermAndShard key = new TermAndShard(nodeID, term);
if (!termStatsCache.containsKey(key)) {
missing.add(term);
}
}
if (missing.size() != 0) {
for (Map.Entry<Term, TermStatistics> ent :
getNodeTermStats(missing, nodeID).entrySet()) {
for (Map.Entry<Term, TermStatistics> ent : getNodeTermStats(missing, nodeID).entrySet()) {
if (ent.getValue() != null) {
final TermAndShard key = new TermAndShard(nodeID, ent.getKey());
termStatsCache.put(key, ent.getValue());
@ -115,8 +113,7 @@ public class ShardIndexSearcher extends IndexSearcher {
// Mock: in a real env, this would hit the wire and get
// term stats from remote node
Map<Term, TermStatistics> getNodeTermStats(Set<Term> terms, int nodeID)
throws IOException {
Map<Term, TermStatistics> getNodeTermStats(Set<Term> terms, int nodeID) throws IOException {
var s = searchers[nodeID];
final Map<Term, TermStatistics> stats = new HashMap<>();
if (s == null) {
@ -143,8 +140,7 @@ public class ShardIndexSearcher extends IndexSearcher {
if (nodeID == myNodeID) {
subStats = super.termStatistics(term, docFreq, totalTermFreq);
} else {
final TermAndShard key =
new TermAndShard(nodeID, term);
final TermAndShard key = new TermAndShard(nodeID, term);
subStats = termStatsCache.get(key);
if (subStats == null) {
continue; // term not found
@ -176,8 +172,14 @@ public class ShardIndexSearcher extends IndexSearcher {
final CollectionStatistics nodeStats;
if (nodeID == myNodeID) {
nodeStats = super.collectionStatistics(field);
collectionStatsCache.put(key, new CachedCollectionStatistics(nodeStats));
} else {
nodeStats = collectionStatsCache.get(key);
var nodeStatsOptional = collectionStatsCache.get(key);
if (nodeStatsOptional == null) {
nodeStatsOptional = new CachedCollectionStatistics(computeNodeCollectionStatistics(key));
collectionStatsCache.put(key, nodeStatsOptional);
}
nodeStats = nodeStatsOptional.collectionStatistics();
}
if (nodeStats == null) {
continue; // field not in sub at all
@ -203,6 +205,13 @@ public class ShardIndexSearcher extends IndexSearcher {
}
}
private CollectionStatistics computeNodeCollectionStatistics(FieldAndShar fieldAndShard) throws IOException {
var searcher = searchers[fieldAndShard.nodeID];
return searcher.collectionStatistics(fieldAndShard.field);
}
public record CachedCollectionStatistics(@Nullable CollectionStatistics collectionStatistics) {}
public static class TermAndShard {
private final int nodeID;
private final Term term;

View File

@ -1,14 +1,16 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher.CachedCollectionStatistics;
import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher.FieldAndShar;
import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher.TermAndShard;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.TermStatistics;
public class SharedShardStatistics {
public final Map<FieldAndShar, CollectionStatistics> collectionStatsCache = new ConcurrentHashMap<>();
public final Map<FieldAndShar, CachedCollectionStatistics> collectionStatsCache = new ConcurrentHashMap<>();
public final Map<TermAndShard, TermStatistics> termStatsCache = new ConcurrentHashMap<>();
}

View File

@ -4,6 +4,7 @@ import it.cavallium.dbengine.lucene.PriorityQueue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.lucene.search.HitQueue;
import reactor.core.publisher.Flux;
@ -35,7 +36,8 @@ public class PriorityQueueAdaptor<T> implements PriorityQueue<T> {
}
@Override
public void replaceTop(T newTop) {
public void replaceTop(T oldTop, T newTop) {
assert Objects.equals(oldTop, hitQueue.top());
hitQueue.updateTop(newTop);
}

View File

@ -311,9 +311,9 @@ public class TestHugePqHitQueue {
}
@Override
public void replaceTop(ScoreDoc newTop) {
referenceQueue.replaceTop(newTop);
myQueue.replaceTop(newTop);
public void replaceTop(ScoreDoc oldTop, ScoreDoc newTop) {
referenceQueue.replaceTop(oldTop, newTop);
myQueue.replaceTop(oldTop, newTop);
}
@Override

View File

@ -0,0 +1,174 @@
package it.cavallium.dbengine.lucene.hugepq.search;
import static org.junit.jupiter.api.Assertions.assertEquals;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.LLFieldDoc;
import it.cavallium.dbengine.lucene.LLScoreDoc;
import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer;
import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher;
import it.cavallium.dbengine.lucene.searcher.SharedShardStatistics;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortField.Type;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopScoreDocCollector;
import org.apache.lucene.search.TotalHits.Relation;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.util.QueryBuilder;
import org.junit.jupiter.api.Test;
public class HugePqFullFieldDocCollectorTest {
Sort sort = new Sort(new SortedNumericSortField("number_sort", Type.LONG));
Query luceneQuery = LongPoint.newRangeQuery("number", -100, 100);
@Test
public void testSingleShard() throws IOException {
try (var dir = new ByteBuffersDirectory(); var env = new LLTempHugePqEnv()) {
var analyzer = new WordAnalyzer(true, true);
var writer = new IndexWriter(dir, new IndexWriterConfig(analyzer));
writer.updateDocument(new Term("id", "00"), List.of(new SortedNumericDocValuesField("number_sort", 1), new LongPoint("number", 1)));
writer.updateDocument(new Term("id", "01"), List.of(new SortedNumericDocValuesField("number_sort", 44), new LongPoint("number", 44)));
writer.updateDocument(new Term("id", "02"), List.of(new SortedNumericDocValuesField("number_sort", 203), new LongPoint("number", 203)));
writer.updateDocument(new Term("id", "03"), List.of(new SortedNumericDocValuesField("number_sort", 209), new LongPoint("number", 209)));
writer.updateDocument(new Term("id", "04"), List.of(new SortedNumericDocValuesField("number_sort", -33), new LongPoint("number", -33)));
writer.updateDocument(new Term("id", "05"), List.of(new SortedNumericDocValuesField("number_sort", 0), new LongPoint("number", 0)));
writer.updateDocument(new Term("id", "06"), List.of(new SortedNumericDocValuesField("number_sort", 933), new LongPoint("number", 933)));
writer.updateDocument(new Term("id", "07"), List.of(new SortedNumericDocValuesField("number_sort", 6), new LongPoint("number", 6)));
writer.updateDocument(new Term("id", "08"), List.of(new SortedNumericDocValuesField("number_sort", -11), new LongPoint("number", -11)));
writer.updateDocument(new Term("id", "09"), List.of(new SortedNumericDocValuesField("number_sort", 9996), new LongPoint("number", 9996)));
writer.updateDocument(new Term("id", "10"), List.of(new SortedNumericDocValuesField("number_sort", 9), new LongPoint("number", 9)));
writer.updateDocument(new Term("id", "11"), List.of(new SortedNumericDocValuesField("number_sort", 66), new LongPoint("number", 66)));
writer.updateDocument(new Term("id", "12"), List.of(new SortedNumericDocValuesField("number_sort", 88), new LongPoint("number", 88)));
writer.updateDocument(new Term("id", "13"), List.of(new SortedNumericDocValuesField("number_sort", 222), new LongPoint("number", 222)));
writer.updateDocument(new Term("id", "14"), List.of(new SortedNumericDocValuesField("number_sort", -2), new LongPoint("number", -2)));
writer.updateDocument(new Term("id", "15"), List.of(new SortedNumericDocValuesField("number_sort", 7), new LongPoint("number", 7)));
writer.updateDocument(new Term("id", "16"), List.of(new SortedNumericDocValuesField("number_sort", 1010912093), new LongPoint("number", 1010912093)));
writer.updateDocument(new Term("id", "17"), List.of(new SortedNumericDocValuesField("number_sort", -3894789), new LongPoint("number", -3894789)));
writer.updateDocument(new Term("id", "18"), List.of(new SortedNumericDocValuesField("number_sort", 122), new LongPoint("number", 122)));
writer.updateDocument(new Term("id", "19"), List.of(new SortedNumericDocValuesField("number_sort", 2), new LongPoint("number", 2)));
writer.flush();
writer.commit();
try (var reader = DirectoryReader.open(writer, true, true)) {
var searcher = new IndexSearcher(reader);
var expectedResults = searcher.search(luceneQuery, 20, sort, false);
var expectedTotalHits = new TotalHitsCount(expectedResults.totalHits.value, expectedResults.totalHits.relation == Relation.EQUAL_TO);
var expectedDocs = Arrays
.stream(expectedResults.scoreDocs)
.map(sd -> (FieldDoc) sd)
.map(fieldDoc -> new LLFieldDoc(fieldDoc.doc, fieldDoc.score, fieldDoc.shardIndex, Arrays.asList(fieldDoc.fields)))
.toList();
try (var collector = HugePqFullFieldDocCollector.create(env, sort, 20, Integer.MAX_VALUE)) {
searcher.search(luceneQuery, collector);
var docs = collector.fullDocs().iterate().collectList().blockOptional().orElseThrow();
System.out.println("Expected docs:");
for (var expectedDoc : expectedDocs) {
System.out.println(expectedDoc);
}
System.out.println("");
System.out.println("Obtained docs:");
for (var doc : docs) {
System.out.println(doc);
}
assertEquals(expectedDocs,
docs.stream().map(elem -> new LLFieldDoc(elem.doc(), elem.score(), -1, elem.fields())).toList()
);
assertEquals(expectedTotalHits, new TotalHitsCount(collector.getTotalHits(), true));
}
}
}
}
@Test
public void testMultiShard() throws IOException {
try (var dir1 = new ByteBuffersDirectory(); var dir2 = new ByteBuffersDirectory(); var env = new LLTempHugePqEnv()) {
var analyzer = new WordAnalyzer(true, true);
var writer1 = new IndexWriter(dir1, new IndexWriterConfig(analyzer));
var writer2 = new IndexWriter(dir2, new IndexWriterConfig(analyzer));
writer1.updateDocument(new Term("id", "00"), List.of(new SortedNumericDocValuesField("number_sort", 1), new LongPoint("number", 1)));
writer1.updateDocument(new Term("id", "01"), List.of(new SortedNumericDocValuesField("number_sort", 44), new LongPoint("number", 44)));
writer1.updateDocument(new Term("id", "02"), List.of(new SortedNumericDocValuesField("number_sort", 203), new LongPoint("number", 203)));
writer1.updateDocument(new Term("id", "03"), List.of(new SortedNumericDocValuesField("number_sort", 209), new LongPoint("number", 209)));
writer1.updateDocument(new Term("id", "04"), List.of(new SortedNumericDocValuesField("number_sort", -33), new LongPoint("number", -33)));
writer1.updateDocument(new Term("id", "05"), List.of(new SortedNumericDocValuesField("number_sort", 0), new LongPoint("number", 0)));
writer1.updateDocument(new Term("id", "06"), List.of(new SortedNumericDocValuesField("number_sort", 933), new LongPoint("number", 933)));
writer1.updateDocument(new Term("id", "07"), List.of(new SortedNumericDocValuesField("number_sort", 6), new LongPoint("number", 6)));
writer1.updateDocument(new Term("id", "08"), List.of(new SortedNumericDocValuesField("number_sort", -11), new LongPoint("number", -11)));
writer1.updateDocument(new Term("id", "09"), List.of(new SortedNumericDocValuesField("number_sort", 9996), new LongPoint("number", 9996)));
writer2.updateDocument(new Term("id", "10"), List.of(new SortedNumericDocValuesField("number_sort", 9), new LongPoint("number", 9)));
writer2.updateDocument(new Term("id", "11"), List.of(new SortedNumericDocValuesField("number_sort", 66), new LongPoint("number", 66)));
writer2.updateDocument(new Term("id", "12"), List.of(new SortedNumericDocValuesField("number_sort", 88), new LongPoint("number", 88)));
writer2.updateDocument(new Term("id", "13"), List.of(new SortedNumericDocValuesField("number_sort", 222), new LongPoint("number", 222)));
writer2.updateDocument(new Term("id", "14"), List.of(new SortedNumericDocValuesField("number_sort", -2), new LongPoint("number", -2)));
writer2.updateDocument(new Term("id", "15"), List.of(new SortedNumericDocValuesField("number_sort", 7), new LongPoint("number", 7)));
writer2.updateDocument(new Term("id", "16"), List.of(new SortedNumericDocValuesField("number_sort", 1010912093), new LongPoint("number", 1010912093)));
writer2.updateDocument(new Term("id", "17"), List.of(new SortedNumericDocValuesField("number_sort", -3894789), new LongPoint("number", -3894789)));
writer2.updateDocument(new Term("id", "18"), List.of(new SortedNumericDocValuesField("number_sort", 122), new LongPoint("number", 122)));
writer2.updateDocument(new Term("id", "19"), List.of(new SortedNumericDocValuesField("number_sort", 2), new LongPoint("number", 2)));
writer1.flush();
writer2.flush();
writer1.commit();
writer2.commit();
var sharedStats = new SharedShardStatistics();
try (var reader1 = DirectoryReader.open(writer1, true, true);
var reader2 = DirectoryReader.open(writer2, true, true)) {
var searcher1 = new IndexSearcher(reader1);
var searcher2 = new IndexSearcher(reader2);
var shardSearcher1 = new ShardIndexSearcher(sharedStats, List.of(searcher1, searcher2), 0);
var shardSearcher2 = new ShardIndexSearcher(sharedStats, List.of(searcher1, searcher2), 1);
var standardSharedManager = TopFieldCollector.createSharedManager(sort, 20, null, Integer.MAX_VALUE);
var standardCollector1 = standardSharedManager.newCollector();
var standardCollector2 = standardSharedManager.newCollector();
shardSearcher1.search(luceneQuery, standardCollector1);
shardSearcher2.search(luceneQuery, standardCollector2);
var expectedResults = standardSharedManager.reduce(List.of(standardCollector1, standardCollector2));
var expectedTotalHits = new TotalHitsCount(expectedResults.totalHits.value, expectedResults.totalHits.relation == Relation.EQUAL_TO);
var expectedDocs = Arrays
.stream(expectedResults.scoreDocs)
.map(sd -> (FieldDoc) sd)
.map(fieldDoc -> new LLFieldDoc(fieldDoc.doc, fieldDoc.score, fieldDoc.shardIndex, Arrays.asList(fieldDoc.fields)))
.toList();
var collectorManager = HugePqFullFieldDocCollector.createSharedManager(env, sort, 20, Integer.MAX_VALUE);
var collector1 = collectorManager.newCollector();
var collector2 = collectorManager.newCollector();
shardSearcher1.search(luceneQuery, collector1);
shardSearcher2.search(luceneQuery, collector2);
try (var results = collectorManager.reduce(List.of(collector1, collector2))) {
var docs = results.iterate().collectList().blockOptional().orElseThrow();
System.out.println("Expected docs:");
for (var expectedDoc : expectedDocs) {
System.out.println(expectedDoc);
}
System.out.println("");
System.out.println("Obtained docs:");
for (var doc : docs) {
System.out.println(doc);
}
assertEquals(expectedDocs,
docs.stream().map(elem -> new LLFieldDoc(elem.doc(), elem.score(), -1, elem.fields())).toList()
);
assertEquals(expectedTotalHits, new TotalHitsCount(results.totalHits().value, results.totalHits().relation == Relation.EQUAL_TO));
}
}
}
}
}

View File

@ -0,0 +1,170 @@
package it.cavallium.dbengine.lucene.hugepq.search;
import static org.junit.jupiter.api.Assertions.*;
import it.cavallium.dbengine.client.query.QueryUtils;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.disk.IndexSearcherManager;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.LLScoreDoc;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.analyzer.LegacyWordAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer;
import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher;
import it.cavallium.dbengine.lucene.searcher.SharedShardStatistics;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.PhraseQuery;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.SimpleCollector;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopScoreDocCollector;
import org.apache.lucene.search.TotalHits.Relation;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.util.QueryBuilder;
import org.junit.jupiter.api.Test;
public class HugePqFullScoreDocCollectorTest {
@Test
public void testSingleShard() throws IOException {
try (var dir = new ByteBuffersDirectory(); var env = new LLTempHugePqEnv()) {
var analyzer = new WordAnalyzer(true, true);
var writer = new IndexWriter(dir, new IndexWriterConfig(analyzer));
writer.updateDocument(new Term("id", "00"), List.of(new TextField("text", "Mario Rossi", Store.YES)));
writer.updateDocument(new Term("id", "01"), List.of(new TextField("text", "Mario Rossi", Store.YES)));
writer.updateDocument(new Term("id", "02"), List.of(new TextField("text", "Mario Rossi", Store.YES)));
writer.updateDocument(new Term("id", "03"), List.of(new TextField("text", "Marios Rossi", Store.YES)));
writer.updateDocument(new Term("id", "04"), List.of(new TextField("text", "Rossi", Store.YES)));
writer.updateDocument(new Term("id", "05"), List.of(new TextField("text", "Rossi", Store.YES)));
writer.updateDocument(new Term("id", "06"), List.of(new TextField("text", "Rossi", Store.YES)));
writer.updateDocument(new Term("id", "07"), List.of(new TextField("text", "Rossi", Store.YES)));
writer.updateDocument(new Term("id", "08"), List.of(new TextField("text", "Rossi", Store.YES)));
writer.updateDocument(new Term("id", "09"), List.of(new TextField("text", "Rossi", Store.YES)));
writer.updateDocument(new Term("id", "10"), List.of(new TextField("text", "ROSSI UA", Store.YES)));
writer.updateDocument(new Term("id", "11"), List.of(new TextField("text", "Mario Barman", Store.YES)));
writer.updateDocument(new Term("id", "12"), List.of(new TextField("text", "Mario batman", Store.YES)));
writer.updateDocument(new Term("id", "13"), List.of(new TextField("text", "Admin Rossi desk", Store.YES)));
writer.updateDocument(new Term("id", "14"), List.of(new TextField("text", "MRI Marios bot", Store.YES)));
writer.updateDocument(new Term("id", "15"), List.of(new TextField("text", "Mario Rossi [beta]", Store.YES)));
writer.updateDocument(new Term("id", "16"), List.of(new TextField("text", "Mario Music Bot", Store.YES)));
writer.updateDocument(new Term("id", "17"), List.of(new TextField("text", "Mario night mode", Store.YES)));
writer.updateDocument(new Term("id", "18"), List.of(new TextField("text", "Mario stats bot", Store.YES)));
writer.updateDocument(new Term("id", "19"), List.of(new TextField("text", "Very very long text with Mario Giovanni and Rossi inside", Store.YES)));
writer.flush();
writer.commit();
try (var reader = DirectoryReader.open(writer, true, true)) {
var searcher = new IndexSearcher(reader);
var qb = new QueryBuilder(analyzer);
var luceneQuery = qb.createMinShouldMatchQuery("text", "Mario rossi", 0.3f);
var expectedResults = searcher.search(luceneQuery, 20);
var expectedTotalHits = new TotalHitsCount(expectedResults.totalHits.value, expectedResults.totalHits.relation == Relation.EQUAL_TO);
var expectedDocs = Arrays
.stream(expectedResults.scoreDocs)
.map(scoreDoc -> new LLScoreDoc(scoreDoc.doc, scoreDoc.score, scoreDoc.shardIndex))
.toList();
try (var collector = HugePqFullScoreDocCollector.create(env, 20)) {
searcher.search(luceneQuery, collector);
var docs = collector.fullDocs().iterate().collectList().blockOptional().orElseThrow();
System.out.println("Expected docs:");
for (LLScoreDoc expectedDoc : expectedDocs) {
System.out.println(expectedDoc);
}
System.out.println("");
System.out.println("Obtained docs:");
for (LLScoreDoc doc : docs) {
System.out.println(doc);
}
assertEquals(expectedDocs, docs.stream().map(elem -> new LLScoreDoc(elem.doc(), elem.score(), -1)).toList());
assertEquals(expectedTotalHits, new TotalHitsCount(collector.getTotalHits(), true));
}
}
}
}
@Test
public void testMultiShard() throws IOException {
try (var dir1 = new ByteBuffersDirectory(); var dir2 = new ByteBuffersDirectory(); var env = new LLTempHugePqEnv()) {
var analyzer = new WordAnalyzer(true, true);
var writer1 = new IndexWriter(dir1, new IndexWriterConfig(analyzer));
var writer2 = new IndexWriter(dir2, new IndexWriterConfig(analyzer));
writer1.updateDocument(new Term("id", "00"), List.of(new TextField("text", "Mario Rossi", Store.YES)));
writer1.updateDocument(new Term("id", "01"), List.of(new TextField("text", "Mario Rossi", Store.YES)));
writer1.updateDocument(new Term("id", "02"), List.of(new TextField("text", "Mario Rossi", Store.YES)));
writer1.updateDocument(new Term("id", "03"), List.of(new TextField("text", "Marios Rossi", Store.YES)));
writer1.updateDocument(new Term("id", "04"), List.of(new TextField("text", "Rossi", Store.YES)));
writer1.updateDocument(new Term("id", "05"), List.of(new TextField("text", "Rossi", Store.YES)));
writer1.updateDocument(new Term("id", "06"), List.of(new TextField("text", "Rossi", Store.YES)));
writer1.updateDocument(new Term("id", "07"), List.of(new TextField("text", "Rossi", Store.YES)));
writer1.updateDocument(new Term("id", "08"), List.of(new TextField("text", "Rossi", Store.YES)));
writer1.updateDocument(new Term("id", "09"), List.of(new TextField("text", "Rossi", Store.YES)));
writer2.updateDocument(new Term("id", "10"), List.of(new TextField("text", "ROSSI UA", Store.YES)));
writer2.updateDocument(new Term("id", "11"), List.of(new TextField("text", "Mario Barman", Store.YES)));
writer2.updateDocument(new Term("id", "12"), List.of(new TextField("text", "Mario batman", Store.YES)));
writer2.updateDocument(new Term("id", "13"), List.of(new TextField("text", "Admin Rossi desk", Store.YES)));
writer2.updateDocument(new Term("id", "14"), List.of(new TextField("text", "MRI Marios bot", Store.YES)));
writer2.updateDocument(new Term("id", "15"), List.of(new TextField("text", "Mario Rossi [beta]", Store.YES)));
writer2.updateDocument(new Term("id", "16"), List.of(new TextField("text", "Mario Music Bot", Store.YES)));
writer2.updateDocument(new Term("id", "17"), List.of(new TextField("text", "Mario night mode", Store.YES)));
writer2.updateDocument(new Term("id", "18"), List.of(new TextField("text", "Mario stats bot", Store.YES)));
writer2.updateDocument(new Term("id", "19"), List.of(new TextField("text", "Very very long text with Mario Giovanni and Rossi inside", Store.YES)));
writer1.flush();
writer2.flush();
writer1.commit();
writer2.commit();
var sharedStats = new SharedShardStatistics();
try (var reader1 = DirectoryReader.open(writer1, true, true);
var reader2 = DirectoryReader.open(writer2, true, true)) {
var searcher1 = new IndexSearcher(reader1);
var searcher2 = new IndexSearcher(reader2);
var shardSearcher1 = new ShardIndexSearcher(sharedStats, List.of(searcher1, searcher2), 0);
var shardSearcher2 = new ShardIndexSearcher(sharedStats, List.of(searcher1, searcher2), 1);
var qb = new QueryBuilder(analyzer);
var luceneQuery = qb.createMinShouldMatchQuery("text", "Mario rossi", 0.3f);
var standardSharedManager = TopScoreDocCollector.createSharedManager(20, null, Integer.MAX_VALUE);
var standardCollector1 = standardSharedManager.newCollector();
var standardCollector2 = standardSharedManager.newCollector();
shardSearcher1.search(luceneQuery, standardCollector1);
shardSearcher2.search(luceneQuery, standardCollector2);
var expectedResults = standardSharedManager.reduce(List.of(standardCollector1, standardCollector2));
var expectedTotalHits = new TotalHitsCount(expectedResults.totalHits.value, expectedResults.totalHits.relation == Relation.EQUAL_TO);
var expectedDocs = Arrays
.stream(expectedResults.scoreDocs)
.map(scoreDoc -> new LLScoreDoc(scoreDoc.doc, scoreDoc.score, scoreDoc.shardIndex))
.toList();
var collectorManager = HugePqFullScoreDocCollector.createSharedManager(env, 20, Integer.MAX_VALUE);
var collector1 = collectorManager.newCollector();
var collector2 = collectorManager.newCollector();
shardSearcher1.search(luceneQuery, collector1);
shardSearcher2.search(luceneQuery, collector2);
try (var results = collectorManager.reduce(List.of(collector1, collector2))) {
var docs = results.iterate().collectList().blockOptional().orElseThrow();
System.out.println("Expected docs:");
for (LLScoreDoc expectedDoc : expectedDocs) {
System.out.println(expectedDoc);
}
System.out.println("");
System.out.println("Obtained docs:");
for (LLScoreDoc doc : docs) {
System.out.println(doc);
}
assertEquals(expectedDocs, docs.stream().map(elem -> new LLScoreDoc(elem.doc(), elem.score(), -1)).toList());
assertEquals(expectedTotalHits, new TotalHitsCount(results.totalHits().value, results.totalHits().relation == Relation.EQUAL_TO));
}
}
}
}
}