Partial sorted implementation
This commit is contained in:
parent
e1d1e1fb05
commit
36df18796b
@ -0,0 +1,22 @@
|
|||||||
|
package it.cavallium.dbengine.lucene;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.lucene.index.LeafReaderContext;
|
||||||
|
import org.apache.lucene.search.FieldComparator;
|
||||||
|
import org.apache.lucene.search.FieldDoc;
|
||||||
|
import org.apache.lucene.search.FieldValueHitQueue.Entry;
|
||||||
|
import org.apache.lucene.search.LeafFieldComparator;
|
||||||
|
import org.apache.lucene.search.SortField;
|
||||||
|
|
||||||
|
public interface FieldValueHitQueue {
|
||||||
|
|
||||||
|
FieldComparator<?>[] getComparators();
|
||||||
|
|
||||||
|
int[] getReverseMul();
|
||||||
|
|
||||||
|
LeafFieldComparator[] getComparators(LeafReaderContext context) throws IOException;
|
||||||
|
|
||||||
|
FieldDoc fillFields(Entry entry);
|
||||||
|
|
||||||
|
SortField[] getFields();
|
||||||
|
}
|
@ -1,6 +1,6 @@
|
|||||||
package it.cavallium.dbengine.lucene;
|
package it.cavallium.dbengine.lucene;
|
||||||
|
|
||||||
public sealed interface LLDocElement permits LLFieldDoc, LLScoreDoc {
|
public sealed interface LLDocElement permits LLSlotDoc, LLFieldDoc, LLScoreDoc {
|
||||||
|
|
||||||
int doc();
|
int doc();
|
||||||
|
|
||||||
|
@ -1,5 +1,16 @@
|
|||||||
package it.cavallium.dbengine.lucene;
|
package it.cavallium.dbengine.lucene;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.StringJoiner;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public record LLFieldDoc(int doc, float score, int shardIndex, List<Object> fields) implements LLDocElement {}
|
public record LLFieldDoc(int doc, float score, int shardIndex, List<Object> fields) implements LLDocElement {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "doc=" + doc + " score=" + score + " shardIndex=" + shardIndex + " fields="+ fields.stream()
|
||||||
|
.map(Objects::toString).collect(Collectors.joining(",", "[", "]"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -4,7 +4,7 @@ import io.net5.buffer.ByteBuf;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
public class LLFieldDocCodec implements LMDBCodec<LLFieldDoc> {
|
public class LLFieldDocCodec implements LMDBSortedCodec<LLFieldDoc> {
|
||||||
|
|
||||||
private enum FieldType {
|
private enum FieldType {
|
||||||
FLOAT,
|
FLOAT,
|
||||||
|
@ -3,7 +3,7 @@ package it.cavallium.dbengine.lucene;
|
|||||||
import io.net5.buffer.ByteBuf;
|
import io.net5.buffer.ByteBuf;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
public class LLScoreDocCodec implements LMDBCodec<LLScoreDoc> {
|
public class LLScoreDocCodec implements LMDBSortedCodec<LLScoreDoc> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, LLScoreDoc data) {
|
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, LLScoreDoc data) {
|
||||||
@ -11,7 +11,7 @@ public class LLScoreDocCodec implements LMDBCodec<LLScoreDoc> {
|
|||||||
setScore(buf, data.score());
|
setScore(buf, data.score());
|
||||||
setDoc(buf, data.doc());
|
setDoc(buf, data.doc());
|
||||||
setShardIndex(buf, data.shardIndex());
|
setShardIndex(buf, data.shardIndex());
|
||||||
buf.writerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES);
|
buf.writerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES);
|
||||||
return buf.asReadOnly();
|
return buf.asReadOnly();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
25
src/main/java/it/cavallium/dbengine/lucene/LLSlotDoc.java
Normal file
25
src/main/java/it/cavallium/dbengine/lucene/LLSlotDoc.java
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package it.cavallium.dbengine.lucene;
|
||||||
|
|
||||||
|
import org.apache.lucene.search.FieldComparator;
|
||||||
|
import org.apache.lucene.search.FieldValueHitQueue;
|
||||||
|
import org.apache.lucene.search.FieldValueHitQueue.Entry;
|
||||||
|
import org.apache.lucene.search.ScoreDoc;
|
||||||
|
|
||||||
|
/** Extension of ScoreDoc to also store the {@link FieldComparator} slot. */
|
||||||
|
public record LLSlotDoc(int doc, float score, int shardIndex, int slot) implements LLDocElement {
|
||||||
|
|
||||||
|
public ScoreDoc toScoreDoc() {
|
||||||
|
return new ScoreDoc(doc, score, shardIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScoreDoc toEntry() {
|
||||||
|
var entry = new Entry(doc, slot);
|
||||||
|
entry.shardIndex = shardIndex;
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "slot:" + slot + " doc=" + doc + " score=" + score + " shardIndex=" + shardIndex;
|
||||||
|
}
|
||||||
|
}
|
173
src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java
Normal file
173
src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java
Normal file
@ -0,0 +1,173 @@
|
|||||||
|
package it.cavallium.dbengine.lucene;
|
||||||
|
|
||||||
|
import io.net5.buffer.ByteBuf;
|
||||||
|
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import org.apache.lucene.index.LeafReaderContext;
|
||||||
|
import org.apache.lucene.search.FieldComparator;
|
||||||
|
import org.apache.lucene.search.FieldDoc;
|
||||||
|
import org.apache.lucene.search.FieldValueHitQueue.Entry;
|
||||||
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
|
import org.apache.lucene.search.LeafFieldComparator;
|
||||||
|
import org.apache.lucene.search.Query;
|
||||||
|
import org.apache.lucene.search.Sort;
|
||||||
|
import org.apache.lucene.search.SortField;
|
||||||
|
|
||||||
|
public class LLSlotDocCodec implements LMDBSortedCodec<LLSlotDoc>, FieldValueHitQueue {
|
||||||
|
|
||||||
|
private final SortField[] fields;
|
||||||
|
|
||||||
|
protected final FieldComparator<?>[] comparators;
|
||||||
|
protected final int[] reverseMul;
|
||||||
|
|
||||||
|
public LLSlotDocCodec(LLTempLMDBEnv env, SortField[] fields) {
|
||||||
|
// When we get here, fields.length is guaranteed to be > 0, therefore no
|
||||||
|
// need to check it again.
|
||||||
|
|
||||||
|
// All these are required by this class's API - need to return arrays.
|
||||||
|
// Therefore even in the case of a single comparator, create an array
|
||||||
|
// anyway.
|
||||||
|
this.fields = fields;
|
||||||
|
int numComparators = fields.length;
|
||||||
|
comparators = new FieldComparator<?>[numComparators];
|
||||||
|
reverseMul = new int[numComparators];
|
||||||
|
for (int i = 0; i < numComparators; ++i) {
|
||||||
|
SortField field = fields[i];
|
||||||
|
reverseMul[i] = field.getReverse() ? -1 : 1;
|
||||||
|
comparators[i] = LMDBComparator.getComparator(env, field, i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, LLSlotDoc data) {
|
||||||
|
var buf = allocator.apply(Float.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES);
|
||||||
|
setScore(buf, data.score());
|
||||||
|
setDoc(buf, data.doc());
|
||||||
|
setShardIndex(buf, data.shardIndex());
|
||||||
|
setSlot(buf, data.slot());
|
||||||
|
buf.writerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES);
|
||||||
|
return buf.asReadOnly();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LLSlotDoc deserialize(ByteBuf buf) {
|
||||||
|
return new LLSlotDoc(getDoc(buf), getScore(buf), getShardIndex(buf), getSlot(buf));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(LLSlotDoc hitA, LLSlotDoc hitB) {
|
||||||
|
|
||||||
|
assert hitA != hitB;
|
||||||
|
assert hitA.slot() != hitB.slot();
|
||||||
|
|
||||||
|
int numComparators = comparators.length;
|
||||||
|
for (int i = 0; i < numComparators; ++i) {
|
||||||
|
final int c = reverseMul[i] * comparators[i].compare(hitA.slot(), hitB.slot());
|
||||||
|
if (c != 0) {
|
||||||
|
// Short circuit
|
||||||
|
return -c;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// avoid random sort order that could lead to duplicates (bug #31241):
|
||||||
|
return Integer.compare(hitB.doc(), hitA.doc());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareDirect(ByteBuf hitA, ByteBuf hitB) {
|
||||||
|
|
||||||
|
assert hitA != hitB;
|
||||||
|
assert getSlot(hitA) != getSlot(hitB);
|
||||||
|
|
||||||
|
int numComparators = comparators.length;
|
||||||
|
for (int i = 0; i < numComparators; ++i) {
|
||||||
|
final int c = reverseMul[i] * comparators[i].compare(getSlot(hitA), getSlot(hitB));
|
||||||
|
if (c != 0) {
|
||||||
|
// Short circuit
|
||||||
|
return -c;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// avoid random sort order that could lead to duplicates (bug #31241):
|
||||||
|
return Integer.compare(getDoc(hitB), getDoc(hitA));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static float getScore(ByteBuf hit) {
|
||||||
|
return hit.getFloat(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int getDoc(ByteBuf hit) {
|
||||||
|
return hit.getInt(Float.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int getShardIndex(ByteBuf hit) {
|
||||||
|
return hit.getInt(Float.BYTES + Integer.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int getSlot(ByteBuf hit) {
|
||||||
|
return hit.getInt(Float.BYTES + Integer.BYTES + Integer.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setScore(ByteBuf hit, float score) {
|
||||||
|
hit.setFloat(0, score);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setDoc(ByteBuf hit, int doc) {
|
||||||
|
hit.setInt(Float.BYTES, doc);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setShardIndex(ByteBuf hit, int shardIndex) {
|
||||||
|
hit.setInt(Float.BYTES + Integer.BYTES, shardIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setSlot(ByteBuf hit, int slot) {
|
||||||
|
hit.setInt(Float.BYTES + Integer.BYTES + Integer.BYTES, slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FieldComparator<?>[] getComparators() {
|
||||||
|
return comparators;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int[] getReverseMul() {
|
||||||
|
return reverseMul;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LeafFieldComparator[] getComparators(LeafReaderContext context) throws IOException {
|
||||||
|
LeafFieldComparator[] comparators = new LeafFieldComparator[this.comparators.length];
|
||||||
|
for (int i = 0; i < comparators.length; ++i) {
|
||||||
|
comparators[i] = this.comparators[i].getLeafComparator(context);
|
||||||
|
}
|
||||||
|
return comparators;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a queue Entry, creates a corresponding FieldDoc that contains the values used to sort the
|
||||||
|
* given document. These values are not the raw values out of the index, but the internal
|
||||||
|
* representation of them. This is so the given search hit can be collated by a MultiSearcher with
|
||||||
|
* other search hits.
|
||||||
|
*
|
||||||
|
* @param entry The Entry used to create a FieldDoc
|
||||||
|
* @return The newly created FieldDoc
|
||||||
|
* @see IndexSearcher#search(Query,int, Sort)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public FieldDoc fillFields(final Entry entry) {
|
||||||
|
final int n = comparators.length;
|
||||||
|
final Object[] fields = new Object[n];
|
||||||
|
for (int i = 0; i < n; ++i) {
|
||||||
|
fields[i] = comparators[i].value(entry.slot);
|
||||||
|
}
|
||||||
|
// if (maxscore > 1.0f) doc.score /= maxscore; // normalize scores
|
||||||
|
return new FieldDoc(entry.doc, entry.score, fields);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the SortFields being used by this hit queue. */
|
||||||
|
@Override
|
||||||
|
public SortField[] getFields() {
|
||||||
|
return fields;
|
||||||
|
}
|
||||||
|
}
|
@ -1,7 +1,6 @@
|
|||||||
package it.cavallium.dbengine.lucene;
|
package it.cavallium.dbengine.lucene;
|
||||||
|
|
||||||
import io.net5.buffer.ByteBuf;
|
import io.net5.buffer.ByteBuf;
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
public interface LMDBCodec<T> {
|
public interface LMDBCodec<T> {
|
||||||
@ -9,9 +8,4 @@ public interface LMDBCodec<T> {
|
|||||||
ByteBuf serialize(Function<Integer, ByteBuf> allocator, T data);
|
ByteBuf serialize(Function<Integer, ByteBuf> allocator, T data);
|
||||||
|
|
||||||
T deserialize(ByteBuf b);
|
T deserialize(ByteBuf b);
|
||||||
|
|
||||||
int compare(T o1, T o2);
|
|
||||||
|
|
||||||
int compareDirect(ByteBuf o1, ByteBuf o2);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,13 @@
|
|||||||
|
package it.cavallium.dbengine.lucene;
|
||||||
|
|
||||||
|
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import org.apache.lucene.search.FieldComparator;
|
||||||
|
import org.apache.lucene.search.SortField;
|
||||||
|
|
||||||
|
public class LMDBComparator {
|
||||||
|
|
||||||
|
public static FieldComparator<?> getComparator(LLTempLMDBEnv env, SortField field, int sortPos) {
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
}
|
@ -37,7 +37,7 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T> {
|
|||||||
|
|
||||||
private final AtomicBoolean closed = new AtomicBoolean();
|
private final AtomicBoolean closed = new AtomicBoolean();
|
||||||
private final Runnable onClose;
|
private final Runnable onClose;
|
||||||
private final LMDBCodec<T> codec;
|
private final LMDBSortedCodec<T> codec;
|
||||||
private final Env<ByteBuf> env;
|
private final Env<ByteBuf> env;
|
||||||
private final Dbi<ByteBuf> lmdb;
|
private final Dbi<ByteBuf> lmdb;
|
||||||
private final Scheduler scheduler = Schedulers.newBoundedElastic(1,
|
private final Scheduler scheduler = Schedulers.newBoundedElastic(1,
|
||||||
@ -53,7 +53,7 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T> {
|
|||||||
private T top = null;
|
private T top = null;
|
||||||
private long size = 0;
|
private long size = 0;
|
||||||
|
|
||||||
public LMDBPriorityQueue(LLTempLMDBEnv env, LMDBCodec<T> codec) {
|
public LMDBPriorityQueue(LLTempLMDBEnv env, LMDBSortedCodec<T> codec) {
|
||||||
this.onClose = env::decrementRef;
|
this.onClose = env::decrementRef;
|
||||||
var name = "$queue_" + NEXT_LMDB_QUEUE_ID.getAndIncrement();
|
var name = "$queue_" + NEXT_LMDB_QUEUE_ID.getAndIncrement();
|
||||||
this.codec = codec;
|
this.codec = codec;
|
||||||
@ -71,6 +71,10 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T> {
|
|||||||
this.cur = null;
|
this.cur = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public LMDBSortedCodec<T> getCodec() {
|
||||||
|
return codec;
|
||||||
|
}
|
||||||
|
|
||||||
private ByteBuf allocate(int size) {
|
private ByteBuf allocate(int size) {
|
||||||
return PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
|
return PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,13 @@
|
|||||||
|
package it.cavallium.dbengine.lucene;
|
||||||
|
|
||||||
|
import io.net5.buffer.ByteBuf;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
public interface LMDBSortedCodec<T> extends LMDBCodec<T> {
|
||||||
|
|
||||||
|
int compare(T o1, T o2);
|
||||||
|
|
||||||
|
int compareDirect(ByteBuf o1, ByteBuf o2);
|
||||||
|
|
||||||
|
}
|
@ -36,13 +36,6 @@ import org.apache.lucene.search.TotalHits;
|
|||||||
*/
|
*/
|
||||||
public abstract class FullDocsCollector<T extends LLDocElement> implements Collector, AutoCloseable {
|
public abstract class FullDocsCollector<T extends LLDocElement> implements Collector, AutoCloseable {
|
||||||
|
|
||||||
/**
|
|
||||||
* This is used in case topDocs() is called with illegal parameters, or there simply aren't
|
|
||||||
* (enough) results.
|
|
||||||
*/
|
|
||||||
private static final FullDocs<?> EMPTY_FULLDOCS =
|
|
||||||
new PqFullDocs(new EmptyPriorityQueue<>(), new TotalHits(0, TotalHits.Relation.EQUAL_TO));
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The priority queue which holds the top documents. Note that different implementations of
|
* The priority queue which holds the top documents. Note that different implementations of
|
||||||
* PriorityQueue give different meaning to 'top documents'. HitQueue for example aggregates the
|
* PriorityQueue give different meaning to 'top documents'. HitQueue for example aggregates the
|
||||||
|
@ -0,0 +1,252 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package it.cavallium.dbengine.lucene.collector;
|
||||||
|
|
||||||
|
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
||||||
|
import it.cavallium.dbengine.lucene.FieldValueHitQueue;
|
||||||
|
import it.cavallium.dbengine.lucene.FullDocs;
|
||||||
|
import it.cavallium.dbengine.lucene.LLDocElement;
|
||||||
|
import it.cavallium.dbengine.lucene.LLScoreDoc;
|
||||||
|
import it.cavallium.dbengine.lucene.LLSlotDoc;
|
||||||
|
import it.cavallium.dbengine.lucene.LLSlotDocCodec;
|
||||||
|
import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
|
||||||
|
import it.cavallium.dbengine.lucene.MaxScoreAccumulator;
|
||||||
|
import it.cavallium.dbengine.lucene.MaxScoreAccumulator.DocAndScore;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import org.apache.lucene.index.LeafReaderContext;
|
||||||
|
import org.apache.lucene.search.CollectorManager;
|
||||||
|
import org.apache.lucene.search.LeafCollector;
|
||||||
|
import org.apache.lucene.search.Scorable;
|
||||||
|
import org.apache.lucene.search.ScoreMode;
|
||||||
|
import org.apache.lucene.search.SortField;
|
||||||
|
import org.apache.lucene.search.TotalHits;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
|
||||||
|
public abstract class LMDBFullFieldDocCollector extends FullDocsCollector<LLDocElement> {
|
||||||
|
|
||||||
|
private final FieldValueHitQueue fieldValueHitQueue;
|
||||||
|
|
||||||
|
public abstract static class ScorerLeafCollector implements LeafCollector {
|
||||||
|
|
||||||
|
protected Scorable scorer;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setScorer(Scorable scorer) throws IOException {
|
||||||
|
this.scorer = scorer;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class SimpleLMDBFullScoreDocCollector extends LMDBFullFieldDocCollector {
|
||||||
|
|
||||||
|
SimpleLMDBFullScoreDocCollector(LLTempLMDBEnv env, @Nullable Long limit,
|
||||||
|
HitsThresholdChecker hitsThresholdChecker, MaxScoreAccumulator minScoreAcc) {
|
||||||
|
super(env, limit, hitsThresholdChecker, minScoreAcc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LeafCollector getLeafCollector(LeafReaderContext context) {
|
||||||
|
docBase = context.docBase;
|
||||||
|
return new ScorerLeafCollector() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setScorer(Scorable scorer) throws IOException {
|
||||||
|
super.setScorer(scorer);
|
||||||
|
minCompetitiveScore = 0f;
|
||||||
|
updateMinCompetitiveScore(scorer);
|
||||||
|
if (minScoreAcc != null) {
|
||||||
|
updateGlobalMinCompetitiveScore(scorer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void collect(int doc) throws IOException {
|
||||||
|
float score = scorer.score();
|
||||||
|
|
||||||
|
assert score >= 0;
|
||||||
|
|
||||||
|
totalHits++;
|
||||||
|
hitsThresholdChecker.incrementHitCount();
|
||||||
|
|
||||||
|
if (minScoreAcc != null && (totalHits & minScoreAcc.modInterval) == 0) {
|
||||||
|
updateGlobalMinCompetitiveScore(scorer);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (limit != null && pq.size() >= limit) {
|
||||||
|
var pqTop = pq.top();
|
||||||
|
if (pqTop != null && score <= pqTop.score()) {
|
||||||
|
if (totalHitsRelation == TotalHits.Relation.EQUAL_TO) {
|
||||||
|
updateMinCompetitiveScore(scorer);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
pq.replaceTop(new LLScoreDoc(doc + docBase, score, -1));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pq.add(new LLScoreDoc(doc + docBase, score, -1));
|
||||||
|
}
|
||||||
|
updateMinCompetitiveScore(scorer);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static LMDBFullFieldDocCollector create(LLTempLMDBEnv env, long numHits, int totalHitsThreshold) {
|
||||||
|
return create(env, numHits, HitsThresholdChecker.create(totalHitsThreshold), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static LMDBFullFieldDocCollector create(LLTempLMDBEnv env, int totalHitsThreshold) {
|
||||||
|
return create(env, HitsThresholdChecker.create(totalHitsThreshold), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
static LMDBFullFieldDocCollector create(
|
||||||
|
LLTempLMDBEnv env,
|
||||||
|
HitsThresholdChecker hitsThresholdChecker,
|
||||||
|
MaxScoreAccumulator minScoreAcc) {
|
||||||
|
|
||||||
|
if (hitsThresholdChecker == null) {
|
||||||
|
throw new IllegalArgumentException("hitsThresholdChecker must be non null");
|
||||||
|
}
|
||||||
|
|
||||||
|
return new SimpleLMDBFullScoreDocCollector(env, null, hitsThresholdChecker, minScoreAcc);
|
||||||
|
}
|
||||||
|
|
||||||
|
static LMDBFullFieldDocCollector create(
|
||||||
|
LLTempLMDBEnv env,
|
||||||
|
@NotNull Long numHits,
|
||||||
|
HitsThresholdChecker hitsThresholdChecker,
|
||||||
|
MaxScoreAccumulator minScoreAcc) {
|
||||||
|
|
||||||
|
if (hitsThresholdChecker == null) {
|
||||||
|
throw new IllegalArgumentException("hitsThresholdChecker must be non null");
|
||||||
|
}
|
||||||
|
|
||||||
|
return new SimpleLMDBFullScoreDocCollector(env,
|
||||||
|
(numHits < 0 || numHits >= 2147483630L) ? null : numHits,
|
||||||
|
hitsThresholdChecker,
|
||||||
|
minScoreAcc
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CollectorManager<LMDBFullFieldDocCollector, FullDocs<LLScoreDoc>> createSharedManager(
|
||||||
|
LLTempLMDBEnv env,
|
||||||
|
long numHits,
|
||||||
|
int totalHitsThreshold) {
|
||||||
|
return new CollectorManager<>() {
|
||||||
|
|
||||||
|
private final HitsThresholdChecker hitsThresholdChecker =
|
||||||
|
HitsThresholdChecker.createShared(totalHitsThreshold);
|
||||||
|
private final MaxScoreAccumulator minScoreAcc = new MaxScoreAccumulator();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LMDBFullFieldDocCollector newCollector() {
|
||||||
|
return LMDBFullFieldDocCollector.create(env, numHits, hitsThresholdChecker, minScoreAcc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FullDocs<LLScoreDoc> reduce(Collection<LMDBFullFieldDocCollector> collectors) {
|
||||||
|
return reduceShared(collectors);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CollectorManager<LMDBFullFieldDocCollector, FullDocs<LLScoreDoc>> createSharedManager(
|
||||||
|
LLTempLMDBEnv env,
|
||||||
|
int totalHitsThreshold) {
|
||||||
|
return new CollectorManager<>() {
|
||||||
|
|
||||||
|
private final HitsThresholdChecker hitsThresholdChecker =
|
||||||
|
HitsThresholdChecker.createShared(totalHitsThreshold);
|
||||||
|
private final MaxScoreAccumulator minScoreAcc = new MaxScoreAccumulator();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LMDBFullFieldDocCollector newCollector() {
|
||||||
|
return LMDBFullFieldDocCollector.create(env, hitsThresholdChecker, minScoreAcc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FullDocs<LLScoreDoc> reduce(Collection<LMDBFullFieldDocCollector> collectors) {
|
||||||
|
return reduceShared(collectors);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static FullDocs<LLSlotDoc> reduceShared(Collection<LMDBFullFieldDocCollector> collectors) {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final FullDocs<LLSlotDoc>[] fullDocs = new FullDocs[collectors.size()];
|
||||||
|
int i = 0;
|
||||||
|
for (LMDBFullFieldDocCollector collector : collectors) {
|
||||||
|
fullDocs[i++] = collector.fullDocs();
|
||||||
|
}
|
||||||
|
return FullDocs.merge(null, fullDocs);
|
||||||
|
}
|
||||||
|
|
||||||
|
int docBase;
|
||||||
|
final @Nullable Long limit;
|
||||||
|
final HitsThresholdChecker hitsThresholdChecker;
|
||||||
|
final MaxScoreAccumulator minScoreAcc;
|
||||||
|
float minCompetitiveScore;
|
||||||
|
|
||||||
|
// prevents instantiation
|
||||||
|
LMDBFullFieldDocCollector(LLTempLMDBEnv env, SortField[] fields, @Nullable Long limit,
|
||||||
|
HitsThresholdChecker hitsThresholdChecker, MaxScoreAccumulator minScoreAcc) {
|
||||||
|
//noinspection unchecked
|
||||||
|
super(new LMDBPriorityQueue(env, new LLSlotDocCodec(env, fields)));
|
||||||
|
this.fieldValueHitQueue = ((FieldValueHitQueue) ((LMDBPriorityQueue<LLDocElement>) pq).getCodec());
|
||||||
|
assert hitsThresholdChecker != null;
|
||||||
|
this.limit = limit;
|
||||||
|
this.hitsThresholdChecker = hitsThresholdChecker;
|
||||||
|
this.minScoreAcc = minScoreAcc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScoreMode scoreMode() {
|
||||||
|
return hitsThresholdChecker.scoreMode();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void updateGlobalMinCompetitiveScore(Scorable scorer) throws IOException {
|
||||||
|
assert minScoreAcc != null;
|
||||||
|
DocAndScore maxMinScore = minScoreAcc.get();
|
||||||
|
if (maxMinScore != null) {
|
||||||
|
float score = docBase > maxMinScore.docID ? Math.nextUp(maxMinScore.score) : maxMinScore.score;
|
||||||
|
if (score > minCompetitiveScore) {
|
||||||
|
assert hitsThresholdChecker.isThresholdReached();
|
||||||
|
scorer.setMinCompetitiveScore(score);
|
||||||
|
minCompetitiveScore = score;
|
||||||
|
totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void updateMinCompetitiveScore(Scorable scorer) throws IOException {
|
||||||
|
var pqTop = pq.top();
|
||||||
|
if (hitsThresholdChecker.isThresholdReached()
|
||||||
|
&& pqTop != null
|
||||||
|
&& pqTop.score() != Float.NEGATIVE_INFINITY) {
|
||||||
|
float localMinScore = Math.nextUp(pqTop.score());
|
||||||
|
if (localMinScore > minCompetitiveScore) {
|
||||||
|
scorer.setMinCompetitiveScore(localMinScore);
|
||||||
|
totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO;
|
||||||
|
minCompetitiveScore = localMinScore;
|
||||||
|
if (minScoreAcc != null) {
|
||||||
|
minScoreAcc.accumulate(pqTop.doc(), pqTop.score());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -2,7 +2,7 @@ package it.cavallium.dbengine;
|
|||||||
|
|
||||||
import io.net5.buffer.ByteBuf;
|
import io.net5.buffer.ByteBuf;
|
||||||
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
||||||
import it.cavallium.dbengine.lucene.LMDBCodec;
|
import it.cavallium.dbengine.lucene.LMDBSortedCodec;
|
||||||
import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
|
import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -21,7 +21,7 @@ public class TestLMDB {
|
|||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void beforeEach() throws IOException {
|
public void beforeEach() throws IOException {
|
||||||
this.env = new LLTempLMDBEnv();
|
this.env = new LLTempLMDBEnv();
|
||||||
this.queue = new LMDBPriorityQueue<>(env, new LMDBCodec<Integer>() {
|
this.queue = new LMDBPriorityQueue<>(env, new LMDBSortedCodec<Integer>() {
|
||||||
@Override
|
@Override
|
||||||
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, Integer data) {
|
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, Integer data) {
|
||||||
return allocator.apply(Integer.BYTES).writeInt(data).asReadOnly();
|
return allocator.apply(Integer.BYTES).writeInt(data).asReadOnly();
|
||||||
|
@ -4,7 +4,7 @@ import com.google.common.collect.Lists;
|
|||||||
import io.net5.buffer.ByteBuf;
|
import io.net5.buffer.ByteBuf;
|
||||||
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
||||||
import it.cavallium.dbengine.lucene.LLScoreDoc;
|
import it.cavallium.dbengine.lucene.LLScoreDoc;
|
||||||
import it.cavallium.dbengine.lucene.LMDBCodec;
|
import it.cavallium.dbengine.lucene.LMDBSortedCodec;
|
||||||
import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
|
import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
|
||||||
import it.cavallium.dbengine.lucene.PriorityQueue;
|
import it.cavallium.dbengine.lucene.PriorityQueue;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
@ -67,7 +67,7 @@ public class TestLMDBHitQueue {
|
|||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void beforeEach() throws IOException {
|
public void beforeEach() throws IOException {
|
||||||
this.env = new LLTempLMDBEnv();
|
this.env = new LLTempLMDBEnv();
|
||||||
var lmdbQueue = new LMDBPriorityQueue<ScoreDoc>(env, new LMDBCodec<>() {
|
var lmdbQueue = new LMDBPriorityQueue<ScoreDoc>(env, new LMDBSortedCodec<>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, ScoreDoc data) {
|
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, ScoreDoc data) {
|
||||||
|
Loading…
Reference in New Issue
Block a user