From e6c0f14fdaebe4b66a0f30579952b1ea7d1b94c5 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 15 Oct 2021 22:03:53 +0200 Subject: [PATCH] Implement some sort codecs --- .gitignore | 4 +- .../dbengine/lucene/ByteArrayCodec.java | 23 ++ .../dbengine/lucene/BytesRefCodec.java | 24 ++ .../dbengine/lucene/DoubleCodec.java | 17 + .../cavallium/dbengine/lucene/FloatCodec.java | 17 + .../cavallium/dbengine/lucene/FullDocs.java | 14 +- .../it/cavallium/dbengine/lucene/IArray.java | 20 ++ .../cavallium/dbengine/lucene/IntCodec.java | 17 + .../cavallium/dbengine/lucene/LLFieldDoc.java | 5 + .../dbengine/lucene/LLScoreDocCodec.java | 2 +- .../dbengine/lucene/LLSlotDocCodec.java | 12 +- .../cavallium/dbengine/lucene/LMDBArray.java | 234 ++++++++++++++ .../dbengine/lucene/LMDBComparator.java | 112 ++++++- .../dbengine/lucene/LMDBPriorityQueue.java | 116 ++++++- .../cavallium/dbengine/lucene/LongCodec.java | 17 + .../dbengine/lucene/LuceneUtils.java | 16 +- .../cavallium/dbengine/lucene/Reversable.java | 6 + .../lucene/ReversableResourceIterable.java | 3 + .../dbengine/lucene/SortFieldCodec.java | 78 +++++ .../lucene/collector/FullDocsCollector.java | 12 +- .../collector/HitsThresholdChecker.java | 60 ++-- .../collector/LMDBFullFieldDocCollector.java | 31 +- .../collector/LMDBFullScoreDocCollector.java | 10 +- .../ScoringShardsCollectorManager.java | 36 ++- .../lucene/collector/UnscoredCollector.java | 2 +- .../lucene/comparators/DocComparator.java | 190 +++++++++++ .../lucene/comparators/DoubleComparator.java | 119 +++++++ .../lucene/comparators/FloatComparator.java | 119 +++++++ .../lucene/comparators/IntComparator.java | 121 +++++++ .../lucene/comparators/LongComparator.java | 121 +++++++ .../lucene/comparators/MinDocIterator.java | 63 ++++ .../lucene/comparators/NumericComparator.java | 297 +++++++++++++++++ .../comparators/RelevanceComparator.java | 117 +++++++ .../comparators/TermOrdValComparator.java | 298 ++++++++++++++++++ .../searcher/AdaptiveLocalSearcher.java | 2 +- .../searcher/AdaptiveMultiSearcher.java | 21 +- .../lucene/searcher/LocalQueryParams.java | 32 +- .../lucene/searcher/OfficialSearcher.java | 129 ++++++++ .../lucene/searcher/PagedLocalSearcher.java | 16 +- .../searcher/ScoredPagedMultiSearcher.java | 21 +- .../SortedScoredFullMultiSearcher.java | 119 +++++++ .../UnsortedScoredFullMultiSearcher.java | 8 +- .../UnsortedUnscoredSimpleMultiSearcher.java | 12 +- ...nsortedUnscoredStreamingMultiSearcher.java | 12 +- .../dbengine/TestLuceneSearches.java | 48 ++- 45 files changed, 2613 insertions(+), 140 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/ByteArrayCodec.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/BytesRefCodec.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/DoubleCodec.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/FloatCodec.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/IArray.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/IntCodec.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/LMDBArray.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/LongCodec.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/Reversable.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/ReversableResourceIterable.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/SortFieldCodec.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/comparators/DocComparator.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/comparators/DoubleComparator.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/comparators/FloatComparator.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/comparators/IntComparator.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/comparators/LongComparator.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/comparators/MinDocIterator.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/comparators/NumericComparator.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/comparators/RelevanceComparator.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/comparators/TermOrdValComparator.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/OfficialSearcher.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java diff --git a/.gitignore b/.gitignore index ea04831..d7e66fe 100644 --- a/.gitignore +++ b/.gitignore @@ -84,7 +84,8 @@ $RECYCLE.BIN/ .LSOverride # Icon must end with two \r -Icon +Icon + # Thumbnails ._* @@ -179,3 +180,4 @@ fabric.properties .idea/caches/build_file_checksums.ser dbengine.iml +/.idea/ diff --git a/src/main/java/it/cavallium/dbengine/lucene/ByteArrayCodec.java b/src/main/java/it/cavallium/dbengine/lucene/ByteArrayCodec.java new file mode 100644 index 0000000..1f2f917 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/ByteArrayCodec.java @@ -0,0 +1,23 @@ +package it.cavallium.dbengine.lucene; + +import io.net5.buffer.ByteBuf; +import java.util.function.Function; + +public class ByteArrayCodec implements LMDBCodec { + + @Override + public ByteBuf serialize(Function allocator, byte[] data) { + var buf = allocator.apply(data.length + Integer.BYTES); + buf.writeInt(data.length); + buf.writeBytes(data); + return buf; + } + + @Override + public byte[] deserialize(ByteBuf b) { + var length = b.readInt(); + byte[] data = new byte[length]; + b.readBytes(data); + return data; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/BytesRefCodec.java b/src/main/java/it/cavallium/dbengine/lucene/BytesRefCodec.java new file mode 100644 index 0000000..2cd0160 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/BytesRefCodec.java @@ -0,0 +1,24 @@ +package it.cavallium.dbengine.lucene; + +import io.net5.buffer.ByteBuf; +import java.util.function.Function; +import org.apache.lucene.util.BytesRef; + +public class BytesRefCodec implements LMDBCodec { + + @Override + public ByteBuf serialize(Function allocator, BytesRef data) { + var buf = allocator.apply(data.length + Integer.BYTES); + buf.writeInt(data.length); + buf.writeBytes(data.bytes, data.offset, data.length); + return buf; + } + + @Override + public BytesRef deserialize(ByteBuf b) { + var length = b.readInt(); + var bytes = new byte[length]; + b.readBytes(bytes, 0, length); + return new BytesRef(bytes, 0, length); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/DoubleCodec.java b/src/main/java/it/cavallium/dbengine/lucene/DoubleCodec.java new file mode 100644 index 0000000..ffa36e8 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/DoubleCodec.java @@ -0,0 +1,17 @@ +package it.cavallium.dbengine.lucene; + +import io.net5.buffer.ByteBuf; +import java.util.function.Function; + +public class DoubleCodec implements LMDBCodec { + + @Override + public ByteBuf serialize(Function allocator, Double data) { + return allocator.apply(Double.BYTES).writeDouble(data); + } + + @Override + public Double deserialize(ByteBuf b) { + return b.readDouble(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/FloatCodec.java b/src/main/java/it/cavallium/dbengine/lucene/FloatCodec.java new file mode 100644 index 0000000..d626c7d --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/FloatCodec.java @@ -0,0 +1,17 @@ +package it.cavallium.dbengine.lucene; + +import io.net5.buffer.ByteBuf; +import java.util.function.Function; + +public class FloatCodec implements LMDBCodec { + + @Override + public ByteBuf serialize(Function allocator, Float data) { + return allocator.apply(Float.BYTES).writeFloat(data); + } + + @Override + public Float deserialize(ByteBuf b) { + return b.readFloat(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java index dc6a1f9..68523db 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java +++ b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.lucene; import static it.cavallium.dbengine.lucene.LLDocElementScoreComparator.SCORE_DOC_SCORE_ELEM_COMPARATOR; import static org.apache.lucene.search.TotalHits.Relation.*; +import it.cavallium.dbengine.lucene.collector.FullFieldDocs; import java.util.Comparator; import org.apache.lucene.search.FieldComparator; import org.apache.lucene.search.Sort; @@ -29,7 +30,7 @@ public interface FullDocs extends ResourceIterable { static FullDocs merge(@Nullable Sort sort, FullDocs[] fullDocs) { ResourceIterable mergedIterable = mergeResourceIterable(sort, fullDocs); TotalHits mergedTotalHits = mergeTotalHits(fullDocs); - return new FullDocs<>() { + FullDocs docs = new FullDocs<>() { @Override public Flux iterate() { return mergedIterable.iterate(); @@ -45,6 +46,11 @@ public interface FullDocs extends ResourceIterable { return mergedTotalHits; } }; + if (sort != null) { + return new FullFieldDocs<>(docs, sort.getSort()); + } else { + return docs; + } } static int tieBreakCompare( @@ -119,6 +125,12 @@ public interface FullDocs extends ResourceIterable { if (shard instanceof LLScoreDoc scoreDoc) { //noinspection unchecked return (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex); + } else if (shard instanceof LLFieldDoc fieldDoc) { + //noinspection unchecked + return (T) new LLFieldDoc(fieldDoc.doc(), fieldDoc.score(), shardIndex, fieldDoc.fields()); + } else if (shard instanceof LLSlotDoc slotDoc) { + //noinspection unchecked + return (T) new LLSlotDoc(slotDoc.doc(), slotDoc.score(), shardIndex, slotDoc.slot()); } else { throw new UnsupportedOperationException("Unsupported type " + shard.getClass()); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/IArray.java b/src/main/java/it/cavallium/dbengine/lucene/IArray.java new file mode 100644 index 0000000..0621809 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/IArray.java @@ -0,0 +1,20 @@ +package it.cavallium.dbengine.lucene; + +import java.util.Objects; +import org.jetbrains.annotations.Nullable; + +public interface IArray { + + @Nullable T get(long index); + + void set(long index, @Nullable T value); + + void reset(long index); + + long size(); + + default T getOrDefault(int slot, T defaultValue) { + return Objects.requireNonNullElse(get(slot), defaultValue); + } + +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/IntCodec.java b/src/main/java/it/cavallium/dbengine/lucene/IntCodec.java new file mode 100644 index 0000000..51db990 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/IntCodec.java @@ -0,0 +1,17 @@ +package it.cavallium.dbengine.lucene; + +import io.net5.buffer.ByteBuf; +import java.util.function.Function; + +public class IntCodec implements LMDBCodec { + + @Override + public ByteBuf serialize(Function allocator, Integer data) { + return allocator.apply(Integer.BYTES).writeInt(data); + } + + @Override + public Integer deserialize(ByteBuf b) { + return b.readInt(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLFieldDoc.java b/src/main/java/it/cavallium/dbengine/lucene/LLFieldDoc.java index cc10b7b..7c396d6 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LLFieldDoc.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LLFieldDoc.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.lucene; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; +import org.apache.lucene.search.FieldDoc; public record LLFieldDoc(int doc, float score, int shardIndex, List fields) implements LLDoc { @@ -11,4 +12,8 @@ public record LLFieldDoc(int doc, float score, int shardIndex, List fiel return "doc=" + doc + " score=" + score + " shardIndex=" + shardIndex + " fields="+ fields.stream() .map(Objects::toString).collect(Collectors.joining(",", "[", "]")); } + + public FieldDoc toFieldDoc() { + return new FieldDoc(doc, score, fields.toArray(Object[]::new), shardIndex); + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLScoreDocCodec.java b/src/main/java/it/cavallium/dbengine/lucene/LLScoreDocCodec.java index 2bf9372..be1cdd8 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LLScoreDocCodec.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LLScoreDocCodec.java @@ -11,7 +11,7 @@ public class LLScoreDocCodec implements LMDBSortedCodec { setScore(buf, data.score()); setDoc(buf, data.doc()); setShardIndex(buf, data.shardIndex()); - buf.writerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES); + buf.writerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES); return buf.asReadOnly(); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java b/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java index f9e12a0..e203837 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java @@ -8,8 +8,6 @@ import java.util.List; import java.util.function.Function; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.FieldComparator; -import org.apache.lucene.search.FieldDoc; -import org.apache.lucene.search.FieldValueHitQueue.Entry; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.LeafFieldComparator; import org.apache.lucene.search.Query; @@ -23,7 +21,7 @@ public class LLSlotDocCodec implements LMDBSortedCodec, FieldValueHit protected final FieldComparator[] comparators; protected final int[] reverseMul; - public LLSlotDocCodec(LLTempLMDBEnv env, SortField[] fields) { + public LLSlotDocCodec(LLTempLMDBEnv env, int numHits, SortField[] fields) { // When we get here, fields.length is guaranteed to be > 0, therefore no // need to check it again. @@ -37,7 +35,7 @@ public class LLSlotDocCodec implements LMDBSortedCodec, FieldValueHit for (int i = 0; i < numComparators; ++i) { SortField field = fields[i]; reverseMul[i] = field.getReverse() ? -1 : 1; - comparators[i] = LMDBComparator.getComparator(env, field, i); + comparators[i] = LMDBComparator.getComparator(env, field, numHits, i); } } @@ -48,7 +46,7 @@ public class LLSlotDocCodec implements LMDBSortedCodec, FieldValueHit setDoc(buf, data.doc()); setShardIndex(buf, data.shardIndex()); setSlot(buf, data.slot()); - buf.writerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES); + buf.writerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES); return buf.asReadOnly(); } @@ -59,10 +57,6 @@ public class LLSlotDocCodec implements LMDBSortedCodec, FieldValueHit @Override public int compare(LLSlotDoc hitA, LLSlotDoc hitB) { - - assert hitA != hitB; - assert hitA.slot() != hitB.slot(); - int numComparators = comparators.length; for (int i = 0; i < numComparators; ++i) { final int c = reverseMul[i] * comparators[i].compare(hitA.slot(), hitB.slot()); diff --git a/src/main/java/it/cavallium/dbengine/lucene/LMDBArray.java b/src/main/java/it/cavallium/dbengine/lucene/LMDBArray.java new file mode 100644 index 0000000..03ac722 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LMDBArray.java @@ -0,0 +1,234 @@ +package it.cavallium.dbengine.lucene; + +import static org.lmdbjava.DbiFlags.MDB_CREATE; + +import io.net5.buffer.ByteBuf; +import io.net5.buffer.PooledByteBufAllocator; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; +import java.util.Objects; +import java.util.StringJoiner; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.lmdbjava.CursorIterable; +import org.lmdbjava.CursorIterable.KeyVal; +import org.lmdbjava.Dbi; +import org.lmdbjava.Env; +import org.lmdbjava.GetOp; +import org.lmdbjava.Txn; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuples; + +public class LMDBArray implements IArray, Closeable { + + private static final boolean FORCE_SYNC = false; + private static final boolean FORCE_THREAD_LOCAL = true; + + private static final AtomicLong NEXT_LMDB_ARRAY_ID = new AtomicLong(0); + + private final AtomicBoolean closed = new AtomicBoolean(); + private final Runnable onClose; + private final LMDBCodec valueCodec; + private final Env env; + private final Dbi lmdb; + private final V defaultValue; + + private boolean writing; + private Txn readTxn; + private Txn rwTxn; + + private long allocatedSize = 0; + private final long virtualSize; + + public LMDBArray(LLTempLMDBEnv env, LMDBCodec codec, long size, @Nullable V defaultValue) { + this.onClose = env::decrementRef; + var name = "$array_" + NEXT_LMDB_ARRAY_ID.getAndIncrement(); + this.valueCodec = codec; + this.env = env.getEnvAndIncrementRef(); + this.lmdb = this.env.openDbi(name, MDB_CREATE); + this.defaultValue = defaultValue; + + this.writing = true; + if (FORCE_THREAD_LOCAL) { + this.rwTxn = null; + } else { + this.rwTxn = this.env.txnWrite(); + } + this.readTxn = null; + this.virtualSize = size; + } + + public LMDBCodec getValueCodec() { + return valueCodec; + } + + private ByteBuf allocate(int size) { + return PooledByteBufAllocator.DEFAULT.directBuffer(size, size); + } + + private void switchToMode(boolean write) { + if (write) { + if (!writing) { + writing = true; + readTxn.close(); + readTxn = null; + assert rwTxn == null; + rwTxn = env.txnWrite(); + } else if (rwTxn == null) { + assert readTxn == null; + rwTxn = env.txnWrite(); + } + } else { + if (writing) { + writing = false; + if (rwTxn != null) { + rwTxn.commit(); + rwTxn.close(); + rwTxn = null; + } + if (FORCE_SYNC) { + env.sync(true); + } + assert rwTxn == null; + assert readTxn == null; + readTxn = env.txnRead(); + } + } + } + + private void endMode() { + if (FORCE_THREAD_LOCAL) { + writing = true; + if (readTxn != null) { + readTxn.commit(); + readTxn.close(); + readTxn = null; + } + if (rwTxn != null) { + rwTxn.commit(); + rwTxn.close(); + rwTxn = null; + } + } + assert rwTxn == null; + assert readTxn == null; + } + + private static void ensureThread() { + LLUtils.ensureBlocking(); + } + + private static void ensureItThread() { + ensureThread(); + //if (!(Thread.currentThread() instanceof LMDBThread)) { + // throw new IllegalStateException("Must run in LMDB scheduler"); + //} + } + + @Override + public void set(long index, @Nullable V value) { + ensureBounds(index); + ensureThread(); + switchToMode(true); + var keyBuf = allocate(Long.BYTES); + var valueBuf = valueCodec.serialize(this::allocate, value); + keyBuf.writeLong(index); + try { + if (lmdb.put(rwTxn, keyBuf, valueBuf)) { + allocatedSize++; + } + } finally { + endMode(); + + keyBuf.release(); + valueBuf.release(); + } + } + + @Override + public void reset(long index) { + ensureBounds(index); + ensureThread(); + switchToMode(true); + var keyBuf = allocate(Long.BYTES); + keyBuf.writeLong(index); + try { + if (lmdb.delete(rwTxn, keyBuf)) { + allocatedSize--; + } + } finally { + endMode(); + keyBuf.release(); + } + } + + @Override + public @Nullable V get(long index) { + ensureBounds(index); + ensureThread(); + switchToMode(false); + var keyBuf = allocate(Long.BYTES); + keyBuf.writeLong(index); + try { + var value = lmdb.get(readTxn, keyBuf); + if (value != null) { + return valueCodec.deserialize(value); + } else { + return defaultValue; + } + } finally { + endMode(); + keyBuf.release(); + } + } + + private void ensureBounds(long index) { + if (index < 0 || index >= virtualSize) throw new IndexOutOfBoundsException(); + } + + @Override + public long size() { + ensureThread(); + return virtualSize; + } + + public long allocatedSize() { + return allocatedSize; + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + try { + ensureThread(); + if (rwTxn != null) { + rwTxn.close(); + } + if (readTxn != null) { + readTxn.close(); + } + try (var txn = env.txnWrite()) { + lmdb.drop(txn, true); + txn.commit(); + } + lmdb.close(); + } finally { + onClose.run(); + } + } + } + + + @Override + public String toString() { + return "lmdb_array[" + virtualSize + " (allocated=" + allocatedSize + ")]"; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LMDBComparator.java b/src/main/java/it/cavallium/dbengine/lucene/LMDBComparator.java index 481fb58..96c01e8 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LMDBComparator.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LMDBComparator.java @@ -1,13 +1,119 @@ package it.cavallium.dbengine.lucene; +import static org.apache.lucene.search.SortField.STRING_LAST; + import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; -import java.util.Comparator; +import it.cavallium.dbengine.lucene.comparators.DocComparator; +import it.cavallium.dbengine.lucene.comparators.DoubleComparator; +import it.cavallium.dbengine.lucene.comparators.FloatComparator; +import it.cavallium.dbengine.lucene.comparators.IntComparator; +import it.cavallium.dbengine.lucene.comparators.LongComparator; +import it.cavallium.dbengine.lucene.comparators.RelevanceComparator; +import it.cavallium.dbengine.lucene.comparators.TermOrdValComparator; +import java.io.IOException; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.search.FieldComparator; +import org.apache.lucene.search.LeafFieldComparator; import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSelector; +import org.apache.lucene.search.SortedNumericSortField; public class LMDBComparator { - public static FieldComparator getComparator(LLTempLMDBEnv env, SortField field, int sortPos) { - throw new UnsupportedOperationException("not implemented"); + public static FieldComparator getComparator(LLTempLMDBEnv env, SortField sortField, + int numHits, int sortPos) { + var sortFieldClass = sortField.getClass(); + if (sortFieldClass == org.apache.lucene.search.SortedNumericSortField.class) { + var nf = (org.apache.lucene.search.SortedNumericSortField) sortField; + var type = nf.getNumericType(); + var missingValue = nf.getMissingValue(); + var reverse = nf.getReverse(); + var selector = nf.getSelector(); + final FieldComparator fieldComparator = switch (type) { + case INT -> new IntComparator(env, numHits, nf.getField(), (Integer) missingValue, reverse, sortPos) { + @Override + public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { + return new IntLeafComparator(context) { + @Override + protected NumericDocValues getNumericDocValues(LeafReaderContext context, String field) + throws IOException { + return SortedNumericSelector.wrap(DocValues.getSortedNumeric(context.reader(), field), selector, type); + } + }; + } + }; + case FLOAT -> new FloatComparator(env, numHits, nf.getField(), (Float) missingValue, reverse, sortPos) { + @Override + public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { + return new FloatLeafComparator(context) { + @Override + protected NumericDocValues getNumericDocValues(LeafReaderContext context, String field) + throws IOException { + return SortedNumericSelector.wrap(DocValues.getSortedNumeric(context.reader(), field), selector, type); + } + }; + } + }; + case LONG -> new LongComparator(env, numHits, nf.getField(), (Long) missingValue, reverse, sortPos) { + @Override + public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { + return new LongLeafComparator(context) { + @Override + protected NumericDocValues getNumericDocValues(LeafReaderContext context, String field) + throws IOException { + return SortedNumericSelector.wrap(DocValues.getSortedNumeric(context.reader(), field), selector, type); + } + }; + } + }; + case DOUBLE -> new DoubleComparator(env, numHits, nf.getField(), (Double) missingValue, reverse, sortPos) { + @Override + public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { + return new DoubleLeafComparator(context) { + @Override + protected NumericDocValues getNumericDocValues(LeafReaderContext context, String field) + throws IOException { + return SortedNumericSelector.wrap(DocValues.getSortedNumeric(context.reader(), field), selector, type); + } + }; + } + }; + case CUSTOM, DOC, REWRITEABLE, STRING_VAL, SCORE, STRING -> throw new AssertionError(); + }; + if (!nf.getOptimizeSortWithPoints()) { + fieldComparator.disableSkipping(); + } + return fieldComparator; + } else if (sortFieldClass == SortField.class) { + var missingValue = sortField.getMissingValue(); + var reverse = sortField.getReverse(); + var field = sortField.getField(); + var comparatorSource = sortField.getComparatorSource(); + return switch (sortField.getType()) { + case SCORE -> new RelevanceComparator(env, numHits); + case DOC -> new DocComparator(env, numHits, reverse, sortPos); + case INT -> new IntComparator(env, numHits, field, (Integer) missingValue, + reverse, sortPos); + case FLOAT -> new FloatComparator(env, numHits, field, (Float) missingValue, + reverse, sortPos); + case LONG -> new LongComparator(env, numHits, field, (Long) missingValue, + reverse, sortPos); + case DOUBLE -> new DoubleComparator(env, numHits, field, (Double) missingValue, + reverse, sortPos); + case CUSTOM -> { + assert comparatorSource != null; + yield comparatorSource.newComparator(field, numHits, sortPos, reverse); + } + case STRING -> new TermOrdValComparator(env, numHits, field, missingValue == STRING_LAST); + case STRING_VAL -> throw new NotImplementedException("String val sort field not implemented"); + case REWRITEABLE -> throw new IllegalStateException( + "SortField needs to be rewritten through Sort.rewrite(..) and SortField.rewrite(..)"); + }; + } else { + throw new NotImplementedException("SortField type not implemented: " + sortFieldClass.getName()); + } } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LMDBPriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/LMDBPriorityQueue.java index a1e512d..d17d32d 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LMDBPriorityQueue.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LMDBPriorityQueue.java @@ -9,10 +9,12 @@ import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; import java.io.IOException; import java.util.Iterator; import java.util.Objects; +import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.lmdbjava.Cursor; import org.lmdbjava.CursorIterable; import org.lmdbjava.CursorIterable.KeyVal; @@ -27,7 +29,7 @@ import reactor.util.function.Tuple2; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; -public class LMDBPriorityQueue implements PriorityQueue { +public class LMDBPriorityQueue implements PriorityQueue, Reversable>, ReversableResourceIterable { private static final boolean FORCE_SYNC = false; private static final boolean FORCE_THREAD_LOCAL = true; @@ -40,8 +42,6 @@ public class LMDBPriorityQueue implements PriorityQueue { private final LMDBSortedCodec codec; private final Env env; private final Dbi lmdb; - private final Scheduler scheduler = Schedulers.newBoundedElastic(1, - Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, LMDBThread::new, Integer.MAX_VALUE); private boolean writing; private boolean iterating; @@ -186,6 +186,8 @@ public class LMDBPriorityQueue implements PriorityQueue { } } finally { endMode(); + buf.release(); + uid.release(); } assert topSingleValid(element); @@ -309,6 +311,39 @@ public class LMDBPriorityQueue implements PriorityQueue { } } + public Flux reverseIterate() { + return Flux + .generate(() -> { + ensureItThread(); + switchToMode(false, true); + iterating = true; + return true; + }, (isLastKey, sink) -> { + try { + ensureItThread(); + boolean found; + if (isLastKey) { + found = cur.last(); + } else { + found = cur.prev(); + } + if (found) { + sink.next(codec.deserialize(cur.key())); + } else { + sink.complete(); + } + return false; + } catch (Throwable ex) { + sink.error(ex); + return false; + } + }, t -> { + ensureItThread(); + iterating = false; + endMode(); + }); + } + @Override public Flux iterate() { return Flux @@ -379,8 +414,54 @@ public class LMDBPriorityQueue implements PriorityQueue { cit.close(); iterating = false; endMode(); - }) - .subscribeOn(scheduler, false); + }); + } + + public Flux reverseIterate(long skips) { + return Flux + .generate(() -> { + ensureItThread(); + switchToMode(false, true); + iterating = true; + return true; + }, (isLastKey, sink) -> { + try { + ensureItThread(); + boolean found; + if (isLastKey) { + found = cur.last(); + } else { + found = cur.prev(); + } + if (found) { + + // Skip elements + if (isLastKey) { + long remainingSkips = skips; + while (remainingSkips > 0) { + if (cur.prev()) { + remainingSkips--; + } else { + sink.complete(); + return false; + } + } + } + + sink.next(codec.deserialize(cur.key())); + } else { + sink.complete(); + } + return false; + } catch (Throwable ex) { + sink.error(ex); + return false; + } + }, t -> { + ensureItThread(); + iterating = false; + endMode(); + }); } @Override @@ -406,11 +487,6 @@ public class LMDBPriorityQueue implements PriorityQueue { onClose.run(); } } - scheduler.dispose(); - } - - public Scheduler getScheduler() { - return scheduler; } @Override @@ -419,4 +495,24 @@ public class LMDBPriorityQueue implements PriorityQueue { .add("size=" + size) .toString(); } + + @Override + public ReversableResourceIterable reverse() { + return new ReversableResourceIterable<>() { + @Override + public Flux iterate() { + return reverseIterate(); + } + + @Override + public Flux iterate(long skips) { + return reverseIterate(skips); + } + + @Override + public ReversableResourceIterable reverse() { + return LMDBPriorityQueue.this; + } + }; + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LongCodec.java b/src/main/java/it/cavallium/dbengine/lucene/LongCodec.java new file mode 100644 index 0000000..b984685 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LongCodec.java @@ -0,0 +1,17 @@ +package it.cavallium.dbengine.lucene; + +import io.net5.buffer.ByteBuf; +import java.util.function.Function; + +public class LongCodec implements LMDBCodec { + + @Override + public ByteBuf serialize(Function allocator, Long data) { + return allocator.apply(Long.BYTES).writeLong(data); + } + + @Override + public Long deserialize(ByteBuf b) { + return b.readLong(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 56e7229..9375b62 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -342,8 +342,8 @@ public class LuceneUtils { public static LocalQueryParams toLocalQueryParams(QueryParams queryParams) { return new LocalQueryParams(QueryParser.toQuery(queryParams.query()), - safeLongToInt(queryParams.offset()), - safeLongToInt(queryParams.limit()), + queryParams.offset(), + queryParams.limit(), DEFAULT_PAGE_LIMITS, queryParams.minCompetitiveScore().getNullable(), QueryParser.toSort(queryParams.sort()), @@ -457,6 +457,10 @@ public class LuceneUtils { return complete ? Integer.MAX_VALUE : 1; } + public static long totalHitsThresholdLong(boolean complete) { + return complete ? Long.MAX_VALUE : 1; + } + public static TotalHitsCount convertTotalHitsCount(TotalHits totalHits) { return switch (totalHits.relation) { case EQUAL_TO -> TotalHitsCount.of(totalHits.value, true); @@ -498,8 +502,8 @@ public class LuceneUtils { mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty()); if (mltDocumentFields.isEmpty()) { return new LocalQueryParams(new MatchNoDocsQuery(), - localQueryParams.offset(), - localQueryParams.limit(), + localQueryParams.offsetLong(), + localQueryParams.limitLong(), DEFAULT_PAGE_LIMITS, localQueryParams.minCompetitiveScore(), localQueryParams.sort(), @@ -543,8 +547,8 @@ public class LuceneUtils { } return new LocalQueryParams(luceneQuery, - localQueryParams.offset(), - localQueryParams.limit(), + localQueryParams.offsetLong(), + localQueryParams.limitLong(), DEFAULT_PAGE_LIMITS, localQueryParams.minCompetitiveScore(), localQueryParams.sort(), diff --git a/src/main/java/it/cavallium/dbengine/lucene/Reversable.java b/src/main/java/it/cavallium/dbengine/lucene/Reversable.java new file mode 100644 index 0000000..247cf08 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/Reversable.java @@ -0,0 +1,6 @@ +package it.cavallium.dbengine.lucene; + +public interface Reversable> { + + T reverse(); +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/ReversableResourceIterable.java b/src/main/java/it/cavallium/dbengine/lucene/ReversableResourceIterable.java new file mode 100644 index 0000000..7ce6943 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/ReversableResourceIterable.java @@ -0,0 +1,3 @@ +package it.cavallium.dbengine.lucene; + +public interface ReversableResourceIterable extends ResourceIterable, Reversable> {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/SortFieldCodec.java b/src/main/java/it/cavallium/dbengine/lucene/SortFieldCodec.java new file mode 100644 index 0000000..b10fa6d --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/SortFieldCodec.java @@ -0,0 +1,78 @@ +package it.cavallium.dbengine.lucene; + +import io.net5.buffer.ByteBuf; +import io.net5.buffer.PooledByteBufAllocator; +import java.io.IOException; +import java.util.function.Function; +import org.apache.lucene.search.SortField; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; + +public class SortFieldCodec implements LMDBCodec { + + @Override + public ByteBuf serialize(Function allocator, SortField data) { + var out = new ByteBufDataOutput(); + try { + var provider = data.getIndexSorter().getProviderName(); + out.writeString(provider); + SortField.Provider.forName(provider).writeSortField(data, out); + } catch (IOException e) { + throw new RuntimeException(e); + } + return out.buf; + } + + @Override + public SortField deserialize(ByteBuf b) { + var in = new ByteBufDataInput(b); + try { + return SortField.Provider.forName(in.readString()).readSortField(in); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static class ByteBufDataOutput extends DataOutput { + + private final ByteBuf buf; + + public ByteBufDataOutput() { + this.buf = PooledByteBufAllocator.DEFAULT.directBuffer(); + } + + @Override + public void writeByte(byte b) { + buf.writeByte(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + buf.writeBytes(b, offset, length); + } + } + + private static class ByteBufDataInput extends DataInput { + + private final ByteBuf buf; + + public ByteBufDataInput(ByteBuf b) { + this.buf = b; + } + + @Override + public byte readByte() { + return buf.readByte(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) { + buf.readBytes(b, offset, len); + } + + @Override + public void skipBytes(long numBytes) { + buf.skipBytes((int) numBytes); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java index fed8509..42b7094 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java @@ -21,6 +21,9 @@ import it.cavallium.dbengine.lucene.LLDoc; import it.cavallium.dbengine.lucene.LazyFullDocs; import it.cavallium.dbengine.lucene.PriorityQueue; import it.cavallium.dbengine.lucene.ResourceIterable; +import it.cavallium.dbengine.lucene.Reversable; +import it.cavallium.dbengine.lucene.ReversableResourceIterable; +import java.io.Closeable; import org.apache.lucene.search.Collector; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHits; @@ -34,7 +37,8 @@ import org.apache.lucene.search.TotalHits; * #FullDocsCollector(PriorityQueue)}. In that case however, you might want to consider overriding * all methods, in order to avoid a NullPointerException. */ -public abstract class FullDocsCollector implements Collector, AutoCloseable { +public abstract class FullDocsCollector & Reversable>, INTERNAL extends LLDoc, + EXTERNAL extends LLDoc> implements Collector, AutoCloseable { /** * The priority queue which holds the top documents. Note that different implementations of @@ -42,7 +46,7 @@ public abstract class FullDocsCollector pq; + protected final PQ pq; /** The total number of documents that the collector encountered. */ protected int totalHits; @@ -50,7 +54,7 @@ public abstract class FullDocsCollector pq) { + protected FullDocsCollector(PQ pq) { this.pq = pq; } @@ -61,7 +65,7 @@ public abstract class FullDocsCollector fullDocs() { - return new LazyFullDocs<>(mapResults(this.pq), new TotalHits(totalHits, totalHitsRelation)); + return new LazyFullDocs<>(mapResults(this.pq.reverse()), new TotalHits(totalHits, totalHitsRelation)); } public abstract ResourceIterable mapResults(ResourceIterable it); diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/HitsThresholdChecker.java b/src/main/java/it/cavallium/dbengine/lucene/collector/HitsThresholdChecker.java index 768be13..84f3dd9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/HitsThresholdChecker.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/HitsThresholdChecker.java @@ -24,10 +24,10 @@ import org.apache.lucene.search.ScoreMode; abstract class HitsThresholdChecker { /** Implementation of HitsThresholdChecker which allows global hit counting */ private static class GlobalHitsThresholdChecker extends HitsThresholdChecker { - private final int totalHitsThreshold; + private final long totalHitsThreshold; private final AtomicLong globalHitCount; - public GlobalHitsThresholdChecker(int totalHitsThreshold) { + public GlobalHitsThresholdChecker(long totalHitsThreshold) { if (totalHitsThreshold < 0) { throw new IllegalArgumentException( @@ -44,27 +44,38 @@ abstract class HitsThresholdChecker { } @Override - public boolean isThresholdReached() { - return globalHitCount.getAcquire() > totalHitsThreshold; + public boolean isThresholdReached(boolean supports64Bit) { + if (supports64Bit) { + return globalHitCount.getAcquire() > totalHitsThreshold; + } else { + return Math.min(globalHitCount.getAcquire(), Integer.MAX_VALUE) > Math.min(totalHitsThreshold, Integer.MAX_VALUE); + } } @Override public ScoreMode scoreMode() { - return totalHitsThreshold == Integer.MAX_VALUE ? ScoreMode.COMPLETE : ScoreMode.TOP_SCORES; + if (totalHitsThreshold == Long.MAX_VALUE) { + return ScoreMode.COMPLETE; + } + return ScoreMode.TOP_SCORES; } @Override - public int getHitsThreshold() { - return totalHitsThreshold; + public long getHitsThreshold(boolean supports64Bit) { + if (supports64Bit) { + return totalHitsThreshold; + } else { + return Math.min(totalHitsThreshold, Integer.MAX_VALUE); + } } } /** Default implementation of HitsThresholdChecker to be used for single threaded execution */ private static class LocalHitsThresholdChecker extends HitsThresholdChecker { - private final int totalHitsThreshold; - private int hitCount; + private final long totalHitsThreshold; + private long hitCount; - public LocalHitsThresholdChecker(int totalHitsThreshold) { + public LocalHitsThresholdChecker(long totalHitsThreshold) { if (totalHitsThreshold < 0) { throw new IllegalArgumentException( @@ -80,32 +91,43 @@ abstract class HitsThresholdChecker { } @Override - public boolean isThresholdReached() { - return hitCount > totalHitsThreshold; + public boolean isThresholdReached(boolean supports64Bit) { + if (supports64Bit) { + return hitCount > totalHitsThreshold; + } else { + return Math.min(hitCount, Integer.MAX_VALUE) > Math.min(totalHitsThreshold, Integer.MAX_VALUE); + } } @Override public ScoreMode scoreMode() { - return totalHitsThreshold == Integer.MAX_VALUE ? ScoreMode.COMPLETE : ScoreMode.TOP_SCORES; + if (totalHitsThreshold == Long.MAX_VALUE) { + return ScoreMode.COMPLETE; + } + return ScoreMode.TOP_SCORES; } @Override - public int getHitsThreshold() { - return totalHitsThreshold; + public long getHitsThreshold(boolean supports64Bit) { + if (supports64Bit) { + return totalHitsThreshold; + } else { + return Math.min(totalHitsThreshold, Integer.MAX_VALUE); + } } } /* * Returns a threshold checker that is useful for single threaded searches */ - public static HitsThresholdChecker create(final int totalHitsThreshold) { + public static HitsThresholdChecker create(final long totalHitsThreshold) { return new LocalHitsThresholdChecker(totalHitsThreshold); } /* * Returns a threshold checker that is based on a shared counter */ - public static HitsThresholdChecker createShared(final int totalHitsThreshold) { + public static HitsThresholdChecker createShared(final long totalHitsThreshold) { return new GlobalHitsThresholdChecker(totalHitsThreshold); } @@ -113,7 +135,7 @@ abstract class HitsThresholdChecker { public abstract ScoreMode scoreMode(); - public abstract int getHitsThreshold(); + public abstract long getHitsThreshold(boolean supports64Bit); - public abstract boolean isThresholdReached(); + public abstract boolean isThresholdReached(boolean supports64Bit); } \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullFieldDocCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullFieldDocCollector.java index 2805235..1647e1e 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullFieldDocCollector.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullFieldDocCollector.java @@ -61,7 +61,8 @@ import reactor.core.publisher.Flux; * TopFieldCollector. * */ -public abstract class LMDBFullFieldDocCollector extends FullDocsCollector { +public abstract class LMDBFullFieldDocCollector + extends FullDocsCollector, LLSlotDoc, LLFieldDoc> { // TODO: one optimization we could do is to pre-fill // the queue with sentinel value that guaranteed to @@ -109,7 +110,7 @@ public abstract class LMDBFullFieldDocCollector extends FullDocsCollector queue, + LMDBPriorityQueue queue, FieldValueHitQueue fieldValueHitQueue, - int numHits, + long numHits, HitsThresholdChecker hitsThresholdChecker, MaxScoreAccumulator minScoreAcc) { super(queue, fieldValueHitQueue, numHits, hitsThresholdChecker, sort.needsScores(), minScoreAcc); @@ -257,9 +258,7 @@ public abstract class LMDBFullFieldDocCollector extends FullDocsCollector firstComparator; final boolean canSetMinScore; @@ -283,9 +282,9 @@ public abstract class LMDBFullFieldDocCollector extends FullDocsCollector pq, + LMDBPriorityQueue pq, FieldValueHitQueue fieldValueHitQueue, - int numHits, + long numHits, HitsThresholdChecker hitsThresholdChecker, boolean needsScores, MaxScoreAccumulator minScoreAcc) { @@ -299,12 +298,12 @@ public abstract class LMDBFullFieldDocCollector extends FullDocsCollector minCompetitiveScore) { @@ -401,7 +400,7 @@ public abstract class LMDBFullFieldDocCollector extends FullDocsCollector(env, fieldValueHitQueue); // inform a comparator that sort is based on this single field @@ -420,7 +419,7 @@ public abstract class LMDBFullFieldDocCollector extends FullDocsCollector> createSharedManager( - LLTempLMDBEnv env, Sort sort, int numHits, FieldDoc after, int totalHitsThreshold) { + LLTempLMDBEnv env, Sort sort, int numHits, long totalHitsThreshold) { return new CollectorManager<>() { private final HitsThresholdChecker hitsThresholdChecker = diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollector.java index e25885b..909d3a8 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollector.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollector.java @@ -49,7 +49,7 @@ import org.jetbrains.annotations.Nullable; *

NOTE: The values {@link Float#NaN} and {@link Float#NEGATIVE_INFINITY} are not valid * scores. This collector will not properly collect hits with such scores. */ -public abstract class LMDBFullScoreDocCollector extends FullDocsCollector { +public abstract class LMDBFullScoreDocCollector extends FullDocsCollector, LLScoreDoc, LLScoreDoc> { /** Scorable leaf collector */ public abstract static class ScorerLeafCollector implements LeafCollector { @@ -197,7 +197,7 @@ public abstract class LMDBFullScoreDocCollector extends FullDocsCollector> createSharedManager( LLTempLMDBEnv env, long numHits, - int totalHitsThreshold) { + long totalHitsThreshold) { return new CollectorManager<>() { private final HitsThresholdChecker hitsThresholdChecker = @@ -222,7 +222,7 @@ public abstract class LMDBFullScoreDocCollector extends FullDocsCollector> createSharedManager( LLTempLMDBEnv env, - int totalHitsThreshold) { + long totalHitsThreshold) { return new CollectorManager<>() { private final HitsThresholdChecker hitsThresholdChecker = @@ -282,7 +282,7 @@ public abstract class LMDBFullScoreDocCollector extends FullDocsCollector maxMinScore.docID ? Math.nextUp(maxMinScore.score) : maxMinScore.score; if (score > minCompetitiveScore) { - assert hitsThresholdChecker.isThresholdReached(); + assert hitsThresholdChecker.isThresholdReached(true); scorer.setMinCompetitiveScore(score); minCompetitiveScore = score; totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO; @@ -292,7 +292,7 @@ public abstract class LMDBFullScoreDocCollector extends FullDocsCollector { + private final Query query; @Nullable private final Sort sort; private final int numHits; @@ -25,37 +30,43 @@ public class ScoringShardsCollectorManager implements CollectorManager sharedCollectorManager; + private List indexSearchers; - public ScoringShardsCollectorManager(@Nullable final Sort sort, + public ScoringShardsCollectorManager(Query query, + @Nullable final Sort sort, final int numHits, final FieldDoc after, final int totalHitsThreshold, int startN, int topN) { - this(sort, numHits, after, totalHitsThreshold, (Integer) startN, (Integer) topN); + this(query, sort, numHits, after, totalHitsThreshold, (Integer) startN, (Integer) topN); } - public ScoringShardsCollectorManager(@Nullable final Sort sort, + public ScoringShardsCollectorManager(Query query, + @Nullable final Sort sort, final int numHits, final FieldDoc after, final int totalHitsThreshold, int startN) { - this(sort, numHits, after, totalHitsThreshold, (Integer) startN, (Integer) 2147483630); + this(query, sort, numHits, after, totalHitsThreshold, (Integer) startN, (Integer) 2147483630); } - public ScoringShardsCollectorManager(@Nullable final Sort sort, + public ScoringShardsCollectorManager(Query query, + @Nullable final Sort sort, final int numHits, final FieldDoc after, final int totalHitsThreshold) { - this(sort, numHits, after, totalHitsThreshold, null, null); + this(query, sort, numHits, after, totalHitsThreshold, null, null); } - private ScoringShardsCollectorManager(@Nullable final Sort sort, + private ScoringShardsCollectorManager(Query query, + @Nullable final Sort sort, final int numHits, final FieldDoc after, final int totalHitsThreshold, @Nullable Integer startN, @Nullable Integer topN) { + this.query = query; this.sort = sort; this.numHits = numHits; this.after = after; @@ -76,6 +87,10 @@ public class ScoringShardsCollectorManager implements CollectorManager indexSearcher) { + this.indexSearchers = indexSearcher; + } + @Override public TopDocs reduce(Collection collectors) throws IOException { if (Schedulers.isInNonBlockingThread()) { @@ -87,6 +102,13 @@ public class ScoringShardsCollectorManager implements CollectorManager 0 && Float.isNaN(topDocs[i].scoreDocs[0].score) && sort.needsScores()) { + Objects.requireNonNull(indexSearchers, "You must call setIndexSearchers before calling reduce!"); + TopFieldCollector.populateScores(topDocs[i].scoreDocs, indexSearchers.get(i), query); + } + for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) { scoreDoc.shardIndex = i; } diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/UnscoredCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/UnscoredCollector.java index 898a8b0..1eb930b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/UnscoredCollector.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/UnscoredCollector.java @@ -92,7 +92,7 @@ public class UnscoredCollector extends TopDocsCollector implements Lea private void populateResults(ScoreDoc[] results, int start, int howMany) { int i = 0; for (int docId : docIds.subList(start, start + howMany)) { - results[i] = new ScoreDoc(docId, 1.0f); + results[i] = new ScoreDoc(docId, Float.NaN); i++; } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/comparators/DocComparator.java b/src/main/java/it/cavallium/dbengine/lucene/comparators/DocComparator.java new file mode 100644 index 0000000..dcc5b7f --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/comparators/DocComparator.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package it.cavallium.dbengine.lucene.comparators; + +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.IArray; +import it.cavallium.dbengine.lucene.IntCodec; +import it.cavallium.dbengine.lucene.LMDBArray; +import it.cavallium.dbengine.lucene.LMDBPriorityQueue; +import it.cavallium.dbengine.lucene.LongCodec; +import java.io.IOException; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.FieldComparator; +import org.apache.lucene.search.LeafFieldComparator; +import org.apache.lucene.search.Scorable; + +/** Comparator that sorts by asc _doc */ +public class DocComparator extends FieldComparator { + private final IArray docIDs; + private final boolean enableSkipping; // if skipping functionality should be enabled + private int bottom; + private int topValue; + private boolean topValueSet; + private boolean bottomValueSet; + private boolean hitsThresholdReached; + + /** Creates a new comparator based on document ids for {@code numHits} */ + public DocComparator(LLTempLMDBEnv env, int numHits, boolean reverse, int sortPost) { + this.docIDs = new LMDBArray<>(env, new IntCodec(), numHits, 0); + // skipping functionality is enabled if we are sorting by _doc in asc order as a primary sort + this.enableSkipping = (!reverse && sortPost == 0); + } + + @Override + public int compare(int slot1, int slot2) { + // No overflow risk because docIDs are non-negative + return docIDs.getOrDefault(slot1, 0) - docIDs.getOrDefault(slot2, 0); + } + + @Override + public LeafFieldComparator getLeafComparator(LeafReaderContext context) { + // TODO: can we "map" our docIDs to the current + // reader? saves having to then subtract on every + // compare call + return new DocLeafComparator(context); + } + + @Override + public void setTopValue(Integer value) { + topValue = value; + topValueSet = true; + } + + @Override + public Integer value(int slot) { + return docIDs.getOrDefault(slot, 0); + } + + /** + * DocLeafComparator with skipping functionality. When sort by _doc asc, after collecting top N + * matches and enough hits, the comparator can skip all the following documents. When sort by _doc + * asc and "top" document is set after which search should start, the comparator provides an + * iterator that can quickly skip to the desired "top" document. + */ + private class DocLeafComparator implements LeafFieldComparator { + private final int docBase; + private final int minDoc; + private final int maxDoc; + private DocIdSetIterator competitiveIterator; // iterator that starts from topValue + + public DocLeafComparator(LeafReaderContext context) { + this.docBase = context.docBase; + if (enableSkipping) { + // Skip docs before topValue, but include docs starting with topValue. + // Including topValue is necessary when doing sort on [_doc, other fields] + // in a distributed search where there are docs from different indices + // with the same docID. + this.minDoc = topValue; + this.maxDoc = context.reader().maxDoc(); + this.competitiveIterator = DocIdSetIterator.all(maxDoc); + } else { + this.minDoc = -1; + this.maxDoc = -1; + this.competitiveIterator = null; + } + } + + @Override + public void setBottom(int slot) { + bottom = docIDs.getOrDefault(slot, 0); + bottomValueSet = true; + updateIterator(); + } + + @Override + public int compareBottom(int doc) { + // No overflow risk because docIDs are non-negative + return bottom - (docBase + doc); + } + + @Override + public int compareTop(int doc) { + int docValue = docBase + doc; + return Integer.compare(topValue, docValue); + } + + @Override + public void copy(int slot, int doc) throws IOException { + docIDs.set(slot, docBase + doc); + } + + @Override + public void setScorer(Scorable scorer) throws IOException { + // update an iterator on a new segment + updateIterator(); + } + + @Override + public DocIdSetIterator competitiveIterator() { + if (enableSkipping == false) { + return null; + } else { + return new DocIdSetIterator() { + private int docID = competitiveIterator.docID(); + + @Override + public int nextDoc() throws IOException { + return advance(docID + 1); + } + + @Override + public int docID() { + return docID; + } + + @Override + public long cost() { + return competitiveIterator.cost(); + } + + @Override + public int advance(int target) throws IOException { + return docID = competitiveIterator.advance(target); + } + }; + } + } + + @Override + public void setHitsThresholdReached() { + hitsThresholdReached = true; + updateIterator(); + } + + private void updateIterator() { + if (enableSkipping == false || hitsThresholdReached == false) return; + if (bottomValueSet) { + // since we've collected top N matches, we can early terminate + // Currently early termination on _doc is also implemented in TopFieldCollector, but this + // will be removed + // once all bulk scores uses collectors' iterators + competitiveIterator = DocIdSetIterator.empty(); + } else if (topValueSet) { + // skip to the desired top doc + if (docBase + maxDoc <= minDoc) { + competitiveIterator = DocIdSetIterator.empty(); // skip this segment + } else { + int segmentMinDoc = Math.max(competitiveIterator.docID(), minDoc - docBase); + competitiveIterator = new MinDocIterator(segmentMinDoc, maxDoc); + } + } + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/comparators/DoubleComparator.java b/src/main/java/it/cavallium/dbengine/lucene/comparators/DoubleComparator.java new file mode 100644 index 0000000..34a2b2a --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/comparators/DoubleComparator.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package it.cavallium.dbengine.lucene.comparators; + +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.DoubleCodec; +import it.cavallium.dbengine.lucene.IArray; +import it.cavallium.dbengine.lucene.LMDBArray; +import it.cavallium.dbengine.lucene.SortFieldCodec; +import java.io.IOException; +import org.apache.lucene.document.DoublePoint; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.LeafFieldComparator; + +/** + * Comparator based on {@link Double#compare} for {@code numHits}. This comparator provides a + * skipping functionality - an iterator that can skip over non-competitive documents. + */ +public class DoubleComparator extends NumericComparator { + private final IArray values; + protected double topValue; + protected double bottom; + + public DoubleComparator(LLTempLMDBEnv env, + int numHits, String field, Double missingValue, boolean reverse, int sortPos) { + super(field, missingValue != null ? missingValue : 0.0, reverse, sortPos, Double.BYTES); + values = new LMDBArray<>(env, new DoubleCodec(), numHits, 0d); + } + + @Override + public int compare(int slot1, int slot2) { + return Double.compare(values.getOrDefault(slot1, 0d), values.getOrDefault(slot2, 0d)); + } + + @Override + public void setTopValue(Double value) { + super.setTopValue(value); + topValue = value; + } + + @Override + public Double value(int slot) { + return values.getOrDefault(slot, 0d); + } + + @Override + public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { + return new DoubleLeafComparator(context); + } + + /** Leaf comparator for {@link DoubleComparator} that provides skipping functionality */ + public class DoubleLeafComparator extends NumericLeafComparator { + + public DoubleLeafComparator(LeafReaderContext context) throws IOException { + super(context); + } + + private double getValueForDoc(int doc) throws IOException { + if (docValues.advanceExact(doc)) { + return Double.longBitsToDouble(docValues.longValue()); + } else { + return missingValue; + } + } + + @Override + public void setBottom(int slot) throws IOException { + bottom = values.getOrDefault(slot, 0d); + super.setBottom(slot); + } + + @Override + public int compareBottom(int doc) throws IOException { + return Double.compare(bottom, getValueForDoc(doc)); + } + + @Override + public int compareTop(int doc) throws IOException { + return Double.compare(topValue, getValueForDoc(doc)); + } + + @Override + public void copy(int slot, int doc) throws IOException { + values.set(slot, getValueForDoc(doc)); + super.copy(slot, doc); + } + + @Override + protected boolean isMissingValueCompetitive() { + int result = Double.compare(missingValue, bottom); + return reverse ? (result >= 0) : (result <= 0); + } + + @Override + protected void encodeBottom(byte[] packedValue) { + DoublePoint.encodeDimension(bottom, packedValue, 0); + } + + @Override + protected void encodeTop(byte[] packedValue) { + DoublePoint.encodeDimension(topValue, packedValue, 0); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/comparators/FloatComparator.java b/src/main/java/it/cavallium/dbengine/lucene/comparators/FloatComparator.java new file mode 100644 index 0000000..dd1b401 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/comparators/FloatComparator.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package it.cavallium.dbengine.lucene.comparators; + +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.DoubleCodec; +import it.cavallium.dbengine.lucene.FloatCodec; +import it.cavallium.dbengine.lucene.IArray; +import it.cavallium.dbengine.lucene.LMDBArray; +import java.io.IOException; +import org.apache.lucene.document.FloatPoint; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.LeafFieldComparator; + +/** + * Comparator based on {@link Float#compare} for {@code numHits}. This comparator provides a + * skipping functionality – an iterator that can skip over non-competitive documents. + */ +public class FloatComparator extends NumericComparator { + private final IArray values; + protected float topValue; + protected float bottom; + + public FloatComparator(LLTempLMDBEnv env, + int numHits, String field, Float missingValue, boolean reverse, int sortPos) { + super(field, missingValue != null ? missingValue : 0.0f, reverse, sortPos, Float.BYTES); + values = new LMDBArray<>(env, new FloatCodec(), numHits, 0f); + } + + @Override + public int compare(int slot1, int slot2) { + return Float.compare(values.getOrDefault(slot1, 0f), values.getOrDefault(slot2, 0f)); + } + + @Override + public void setTopValue(Float value) { + super.setTopValue(value); + topValue = value; + } + + @Override + public Float value(int slot) { + return values.getOrDefault(slot, 0f); + } + + @Override + public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { + return new FloatLeafComparator(context); + } + + /** Leaf comparator for {@link FloatComparator} that provides skipping functionality */ + public class FloatLeafComparator extends NumericLeafComparator { + + public FloatLeafComparator(LeafReaderContext context) throws IOException { + super(context); + } + + private float getValueForDoc(int doc) throws IOException { + if (docValues.advanceExact(doc)) { + return Float.intBitsToFloat((int) docValues.longValue()); + } else { + return missingValue; + } + } + + @Override + public void setBottom(int slot) throws IOException { + bottom = values.getOrDefault(slot, 0f); + super.setBottom(slot); + } + + @Override + public int compareBottom(int doc) throws IOException { + return Float.compare(bottom, getValueForDoc(doc)); + } + + @Override + public int compareTop(int doc) throws IOException { + return Float.compare(topValue, getValueForDoc(doc)); + } + + @Override + public void copy(int slot, int doc) throws IOException { + values.set(slot, getValueForDoc(doc)); + super.copy(slot, doc); + } + + @Override + protected boolean isMissingValueCompetitive() { + int result = Float.compare(missingValue, bottom); + return reverse ? (result >= 0) : (result <= 0); + } + + @Override + protected void encodeBottom(byte[] packedValue) { + FloatPoint.encodeDimension(bottom, packedValue, 0); + } + + @Override + protected void encodeTop(byte[] packedValue) { + FloatPoint.encodeDimension(topValue, packedValue, 0); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/comparators/IntComparator.java b/src/main/java/it/cavallium/dbengine/lucene/comparators/IntComparator.java new file mode 100644 index 0000000..9cccc6c --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/comparators/IntComparator.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package it.cavallium.dbengine.lucene.comparators; + +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.FloatCodec; +import it.cavallium.dbengine.lucene.IArray; +import it.cavallium.dbengine.lucene.IntCodec; +import it.cavallium.dbengine.lucene.LMDBArray; +import java.io.IOException; +import org.apache.lucene.document.IntPoint; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.LeafFieldComparator; + +/** + * Comparator based on {@link Integer#compare} for {@code numHits}. This comparator provides a + * skipping functionality – an iterator that can skip over non-competitive documents. + */ +public class IntComparator extends NumericComparator { + private final IArray values; + protected int topValue; + protected int bottom; + + public IntComparator(LLTempLMDBEnv env, + int numHits, String field, Integer missingValue, boolean reverse, int sortPos) { + super(field, missingValue != null ? missingValue : 0, reverse, sortPos, Integer.BYTES); + values = new LMDBArray<>(env, new IntCodec(), numHits, 0); + } + + @Override + public int compare(int slot1, int slot2) { + return Integer.compare(values.getOrDefault(slot1, 0), values.getOrDefault(slot2, 0)); + } + + @Override + public void setTopValue(Integer value) { + super.setTopValue(value); + topValue = value; + } + + @Override + public Integer value(int slot) { + return values.getOrDefault(slot, 0); + } + + @Override + public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { + return new IntLeafComparator(context); + } + + /** Leaf comparator for {@link IntComparator} that provides skipping functionality */ + public class IntLeafComparator extends NumericLeafComparator { + + public IntLeafComparator(LeafReaderContext context) throws IOException { + super(context); + } + + private int getValueForDoc(int doc) throws IOException { + if (docValues.advanceExact(doc)) { + return (int) docValues.longValue(); + } else { + return missingValue; + } + } + + @Override + public void setBottom(int slot) throws IOException { + bottom = values.getOrDefault(slot, 0); + super.setBottom(slot); + } + + @Override + public int compareBottom(int doc) throws IOException { + return Integer.compare(bottom, getValueForDoc(doc)); + } + + @Override + public int compareTop(int doc) throws IOException { + return Integer.compare(topValue, getValueForDoc(doc)); + } + + @Override + public void copy(int slot, int doc) throws IOException { + values.set(slot, getValueForDoc(doc)); + super.copy(slot, doc); + } + + @Override + protected boolean isMissingValueCompetitive() { + int result = Integer.compare(missingValue, bottom); + // in reverse (desc) sort missingValue is competitive when it's greater or equal to bottom, + // in asc sort missingValue is competitive when it's smaller or equal to bottom + return reverse ? (result >= 0) : (result <= 0); + } + + @Override + protected void encodeBottom(byte[] packedValue) { + IntPoint.encodeDimension(bottom, packedValue, 0); + } + + @Override + protected void encodeTop(byte[] packedValue) { + IntPoint.encodeDimension(topValue, packedValue, 0); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/comparators/LongComparator.java b/src/main/java/it/cavallium/dbengine/lucene/comparators/LongComparator.java new file mode 100644 index 0000000..b5d6e8d --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/comparators/LongComparator.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package it.cavallium.dbengine.lucene.comparators; + +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.IArray; +import it.cavallium.dbengine.lucene.IntCodec; +import it.cavallium.dbengine.lucene.LMDBArray; +import it.cavallium.dbengine.lucene.LongCodec; +import java.io.IOException; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.LeafFieldComparator; + +/** + * Comparator based on {@link Long#compare} for {@code numHits}. This comparator provides a skipping + * functionality – an iterator that can skip over non-competitive documents. + */ +public class LongComparator extends NumericComparator { + private final IArray values; + protected long topValue; + protected long bottom; + + public LongComparator(LLTempLMDBEnv env, + int numHits, String field, Long missingValue, boolean reverse, int sortPos) { + super(field, missingValue != null ? missingValue : 0L, reverse, sortPos, Long.BYTES); + values = new LMDBArray<>(env, new LongCodec(), numHits, 0L); + } + + @Override + public int compare(int slot1, int slot2) { + return Long.compare(values.getOrDefault(slot1, 0L), values.getOrDefault(slot2, 0L)); + } + + @Override + public void setTopValue(Long value) { + super.setTopValue(value); + topValue = value; + } + + @Override + public Long value(int slot) { + return values.getOrDefault(slot, 0L); + } + + @Override + public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { + return new LongLeafComparator(context); + } + + /** Leaf comparator for {@link LongComparator} that provides skipping functionality */ + public class LongLeafComparator extends NumericLeafComparator { + + public LongLeafComparator(LeafReaderContext context) throws IOException { + super(context); + } + + private long getValueForDoc(int doc) throws IOException { + if (docValues.advanceExact(doc)) { + return docValues.longValue(); + } else { + return missingValue; + } + } + + @Override + public void setBottom(int slot) throws IOException { + bottom = values.getOrDefault(slot, 0L); + super.setBottom(slot); + } + + @Override + public int compareBottom(int doc) throws IOException { + return Long.compare(bottom, getValueForDoc(doc)); + } + + @Override + public int compareTop(int doc) throws IOException { + return Long.compare(topValue, getValueForDoc(doc)); + } + + @Override + public void copy(int slot, int doc) throws IOException { + values.set(slot, getValueForDoc(doc)); + super.copy(slot, doc); + } + + @Override + protected boolean isMissingValueCompetitive() { + int result = Long.compare(missingValue, bottom); + // in reverse (desc) sort missingValue is competitive when it's greater or equal to bottom, + // in asc sort missingValue is competitive when it's smaller or equal to bottom + return reverse ? (result >= 0) : (result <= 0); + } + + @Override + protected void encodeBottom(byte[] packedValue) { + LongPoint.encodeDimension(bottom, packedValue, 0); + } + + @Override + protected void encodeTop(byte[] packedValue) { + LongPoint.encodeDimension(topValue, packedValue, 0); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/comparators/MinDocIterator.java b/src/main/java/it/cavallium/dbengine/lucene/comparators/MinDocIterator.java new file mode 100644 index 0000000..15a7bba --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/comparators/MinDocIterator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package it.cavallium.dbengine.lucene.comparators; + +import java.io.IOException; +import org.apache.lucene.search.DocIdSetIterator; + +/** Docs iterator that starts iterating from a configurable minimum document */ +public class MinDocIterator extends DocIdSetIterator { + final int segmentMinDoc; + final int maxDoc; + int doc = -1; + + MinDocIterator(int segmentMinDoc, int maxDoc) { + this.segmentMinDoc = segmentMinDoc; + this.maxDoc = maxDoc; + } + + @Override + public int docID() { + return doc; + } + + @Override + public int nextDoc() throws IOException { + return advance(doc + 1); + } + + @Override + public int advance(int target) throws IOException { + assert target > doc; + if (doc == -1) { + // skip directly to minDoc + doc = Math.max(target, segmentMinDoc); + } else { + doc = target; + } + if (doc >= maxDoc) { + doc = NO_MORE_DOCS; + } + return doc; + } + + @Override + public long cost() { + return maxDoc - segmentMinDoc; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/comparators/NumericComparator.java b/src/main/java/it/cavallium/dbengine/lucene/comparators/NumericComparator.java new file mode 100644 index 0000000..721e6ab --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/comparators/NumericComparator.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package it.cavallium.dbengine.lucene.comparators; + +import java.io.IOException; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.FieldComparator; +import org.apache.lucene.search.LeafFieldComparator; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.ArrayUtil.ByteArrayComparator; +import org.apache.lucene.util.DocIdSetBuilder; + +/** + * Abstract numeric comparator for comparing numeric values. This comparator provides a skipping + * functionality – an iterator that can skip over non-competitive documents. + */ +public abstract class NumericComparator extends FieldComparator { + protected final T missingValue; + protected final String field; + protected final boolean reverse; + private final int bytesCount; // how many bytes are used to encode this number + private final ByteArrayComparator bytesComparator; + + protected boolean topValueSet; + protected boolean singleSort; // singleSort is true, if sort is based on a single sort field. + protected boolean hitsThresholdReached; + protected boolean queueFull; + private boolean canSkipDocuments; + + protected NumericComparator( + String field, T missingValue, boolean reverse, int sortPos, int bytesCount) { + this.field = field; + this.missingValue = missingValue; + this.reverse = reverse; + // skipping functionality is only relevant for primary sort + this.canSkipDocuments = (sortPos == 0); + this.bytesCount = bytesCount; + this.bytesComparator = ArrayUtil.getUnsignedComparator(bytesCount); + } + + @Override + public void setTopValue(T value) { + topValueSet = true; + } + + @Override + public void setSingleSort() { + singleSort = true; + } + + @Override + public void disableSkipping() { + canSkipDocuments = false; + } + + /** Leaf comparator for {@link NumericComparator} that provides skipping functionality */ + public abstract class NumericLeafComparator implements LeafFieldComparator { + protected final NumericDocValues docValues; + private final PointValues pointValues; + // if skipping functionality should be enabled on this segment + private final boolean enableSkipping; + private final int maxDoc; + private final byte[] minValueAsBytes; + private final byte[] maxValueAsBytes; + + private DocIdSetIterator competitiveIterator; + private long iteratorCost; + private int maxDocVisited = -1; + private int updateCounter = 0; + + public NumericLeafComparator(LeafReaderContext context) throws IOException { + this.docValues = getNumericDocValues(context, field); + this.pointValues = canSkipDocuments ? context.reader().getPointValues(field) : null; + if (pointValues != null) { + FieldInfo info = context.reader().getFieldInfos().fieldInfo(field); + if (info == null || info.getPointDimensionCount() == 0) { + throw new IllegalStateException( + "Field " + + field + + " doesn't index points according to FieldInfos yet returns non-null PointValues"); + } else if (info.getPointDimensionCount() > 1) { + throw new IllegalArgumentException( + "Field " + field + " is indexed with multiple dimensions, sorting is not supported"); + } else if (info.getPointNumBytes() != bytesCount) { + throw new IllegalArgumentException( + "Field " + + field + + " is indexed with " + + info.getPointNumBytes() + + " bytes per dimension, but " + + NumericComparator.this + + " expected " + + bytesCount); + } + this.enableSkipping = true; // skipping is enabled when points are available + this.maxDoc = context.reader().maxDoc(); + this.maxValueAsBytes = + reverse == false ? new byte[bytesCount] : topValueSet ? new byte[bytesCount] : null; + this.minValueAsBytes = + reverse ? new byte[bytesCount] : topValueSet ? new byte[bytesCount] : null; + this.competitiveIterator = DocIdSetIterator.all(maxDoc); + this.iteratorCost = maxDoc; + } else { + this.enableSkipping = false; + this.maxDoc = 0; + this.maxValueAsBytes = null; + this.minValueAsBytes = null; + } + } + + /** Retrieves the NumericDocValues for the field in this segment */ + protected NumericDocValues getNumericDocValues(LeafReaderContext context, String field) + throws IOException { + return DocValues.getNumeric(context.reader(), field); + } + + @Override + public void setBottom(int slot) throws IOException { + queueFull = true; // if we are setting bottom, it means that we have collected enough hits + updateCompetitiveIterator(); // update an iterator if we set a new bottom + } + + @Override + public void copy(int slot, int doc) throws IOException { + maxDocVisited = doc; + } + + @Override + public void setScorer(Scorable scorer) throws IOException { + if (scorer instanceof Scorer) { + iteratorCost = + ((Scorer) scorer).iterator().cost(); // starting iterator cost is the scorer's cost + updateCompetitiveIterator(); // update an iterator when we have a new segment + } + } + + @Override + public void setHitsThresholdReached() throws IOException { + hitsThresholdReached = true; + updateCompetitiveIterator(); + } + + // update its iterator to include possibly only docs that are "stronger" than the current bottom + // entry + private void updateCompetitiveIterator() throws IOException { + if (enableSkipping == false || hitsThresholdReached == false || queueFull == false) return; + // if some documents have missing points, check that missing values prohibits optimization + if ((pointValues.getDocCount() < maxDoc) && isMissingValueCompetitive()) { + return; // we can't filter out documents, as documents with missing values are competitive + } + + updateCounter++; + if (updateCounter > 256 + && (updateCounter & 0x1f) != 0x1f) { // Start sampling if we get called too much + return; + } + if (reverse == false) { + encodeBottom(maxValueAsBytes); + if (topValueSet) { + encodeTop(minValueAsBytes); + } + } else { + encodeBottom(minValueAsBytes); + if (topValueSet) { + encodeTop(maxValueAsBytes); + } + } + + DocIdSetBuilder result = new DocIdSetBuilder(maxDoc); + PointValues.IntersectVisitor visitor = + new PointValues.IntersectVisitor() { + DocIdSetBuilder.BulkAdder adder; + + @Override + public void grow(int count) { + adder = result.grow(count); + } + + @Override + public void visit(int docID) { + if (docID <= maxDocVisited) { + return; // Already visited or skipped + } + adder.add(docID); + } + + @Override + public void visit(int docID, byte[] packedValue) { + if (docID <= maxDocVisited) { + return; // already visited or skipped + } + if (maxValueAsBytes != null) { + int cmp = bytesComparator.compare(packedValue, 0, maxValueAsBytes, 0); + // if doc's value is too high or for single sort even equal, it is not competitive + // and the doc can be skipped + if (cmp > 0 || (singleSort && cmp == 0)) return; + } + if (minValueAsBytes != null) { + int cmp = bytesComparator.compare(packedValue, 0, minValueAsBytes, 0); + // if doc's value is too low or for single sort even equal, it is not competitive + // and the doc can be skipped + if (cmp < 0 || (singleSort && cmp == 0)) return; + } + adder.add(docID); // doc is competitive + } + + @Override + public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) { + if (maxValueAsBytes != null) { + int cmp = bytesComparator.compare(minPackedValue, 0, maxValueAsBytes, 0); + if (cmp > 0 || (singleSort && cmp == 0)) + return PointValues.Relation.CELL_OUTSIDE_QUERY; + } + if (minValueAsBytes != null) { + int cmp = bytesComparator.compare(maxPackedValue, 0, minValueAsBytes, 0); + if (cmp < 0 || (singleSort && cmp == 0)) + return PointValues.Relation.CELL_OUTSIDE_QUERY; + } + if ((maxValueAsBytes != null + && bytesComparator.compare(maxPackedValue, 0, maxValueAsBytes, 0) > 0) + || (minValueAsBytes != null + && bytesComparator.compare(minPackedValue, 0, minValueAsBytes, 0) < 0)) { + return PointValues.Relation.CELL_CROSSES_QUERY; + } + return PointValues.Relation.CELL_INSIDE_QUERY; + } + }; + final long threshold = iteratorCost >>> 3; + long estimatedNumberOfMatches = + pointValues.estimatePointCount(visitor); // runs in O(log(numPoints)) + if (estimatedNumberOfMatches >= threshold) { + // the new range is not selective enough to be worth materializing, it doesn't reduce number + // of docs at least 8x + return; + } + pointValues.intersect(visitor); + competitiveIterator = result.build().iterator(); + iteratorCost = competitiveIterator.cost(); + } + + @Override + public DocIdSetIterator competitiveIterator() { + if (enableSkipping == false) return null; + return new DocIdSetIterator() { + private int docID = competitiveIterator.docID(); + + @Override + public int nextDoc() throws IOException { + return advance(docID + 1); + } + + @Override + public int docID() { + return docID; + } + + @Override + public long cost() { + return competitiveIterator.cost(); + } + + @Override + public int advance(int target) throws IOException { + return docID = competitiveIterator.advance(target); + } + }; + } + + protected abstract boolean isMissingValueCompetitive(); + + protected abstract void encodeBottom(byte[] packedValue); + + protected abstract void encodeTop(byte[] packedValue); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/comparators/RelevanceComparator.java b/src/main/java/it/cavallium/dbengine/lucene/comparators/RelevanceComparator.java new file mode 100644 index 0000000..11414b3 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/comparators/RelevanceComparator.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package it.cavallium.dbengine.lucene.comparators; + +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.FloatCodec; +import it.cavallium.dbengine.lucene.IArray; +import it.cavallium.dbengine.lucene.LMDBArray; +import it.cavallium.dbengine.lucene.LongCodec; +import java.io.IOException; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.search.FieldComparator; +import org.apache.lucene.search.LeafFieldComparator; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.ScoreCachingWrappingScorer; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; + +/** + * Sorts by descending relevance. NOTE: if you are sorting only by descending relevance and then secondarily by + * ascending docID, performance is faster using {@link org.apache.lucene.search.TopScoreDocCollector} directly (which {@link + * org.apache.lucene.search.IndexSearcher#search(Query, int)} uses when no {@link org.apache.lucene.search.Sort} is specified). + */ +public final class RelevanceComparator extends FieldComparator implements LeafFieldComparator { + + private final IArray scores; + private float bottom; + private Scorable scorer; + private float topValue; + + /** + * Creates a new comparator based on relevance for {@code numHits}. + */ + public RelevanceComparator(LLTempLMDBEnv env, int numHits) { + scores = new LMDBArray<>(env, new FloatCodec(), numHits, 0f); + } + + @Override + public int compare(int slot1, int slot2) { + return Float.compare(scores.getOrDefault(slot2, 0f), scores.getOrDefault(slot1, 0f)); + } + + @Override + public int compareBottom(int doc) throws IOException { + float score = scorer.score(); + assert !Float.isNaN(score); + return Float.compare(score, bottom); + } + + @Override + public void copy(int slot, int doc) throws IOException { + var score = scorer.score(); + scores.set(slot, score); + assert !Float.isNaN(score); + } + + @Override + public LeafFieldComparator getLeafComparator(LeafReaderContext context) { + return this; + } + + @Override + public void setBottom(final int bottom) { + this.bottom = scores.getOrDefault(bottom, 0f); + } + + @Override + public void setTopValue(Float value) { + topValue = value; + } + + @Override + public void setScorer(Scorable scorer) { + // wrap with a ScoreCachingWrappingScorer so that successive calls to + // score() will not incur score computation over and + // over again. + this.scorer = ScoreCachingWrappingScorer.wrap(scorer); + } + + @Override + public Float value(int slot) { + return scores.getOrDefault(slot, 0f); + } + + // Override because we sort reverse of natural Float order: + @Override + public int compareValues(Float first, Float second) { + // Reversed intentionally because relevance by default + // sorts descending: + return second.compareTo(first); + } + + @Override + public int compareTop(int doc) throws IOException { + float docValue = scorer.score(); + assert !Float.isNaN(docValue); + return Float.compare(docValue, topValue); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/comparators/TermOrdValComparator.java b/src/main/java/it/cavallium/dbengine/lucene/comparators/TermOrdValComparator.java new file mode 100644 index 0000000..057f594 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/comparators/TermOrdValComparator.java @@ -0,0 +1,298 @@ +package it.cavallium.dbengine.lucene.comparators; + +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.ByteArrayCodec; +import it.cavallium.dbengine.lucene.BytesRefCodec; +import it.cavallium.dbengine.lucene.FloatCodec; +import it.cavallium.dbengine.lucene.IArray; +import it.cavallium.dbengine.lucene.IntCodec; +import it.cavallium.dbengine.lucene.LMDBArray; +import java.io.IOException; +import java.util.Arrays; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.search.FieldComparator; +import org.apache.lucene.search.LeafFieldComparator; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; + +/** + * Sorts by field's natural Term sort order, using ordinals. This is functionally equivalent to + * {@link org.apache.lucene.search.FieldComparator.TermValComparator}, but it first resolves the + * string to their relative ordinal positions (using the index returned by {@link + * org.apache.lucene.index.LeafReader#getSortedDocValues(String)}), and does most comparisons + * using the ordinals. For medium to large results, this comparator will be much faster than + * {@link org.apache.lucene.search.FieldComparator.TermValComparator}. For very small result sets + * it may be slower. + */ +public class TermOrdValComparator extends FieldComparator implements LeafFieldComparator { + /* Ords for each slot. + @lucene.internal */ + final IArray ords; + + /* Values for each slot. + @lucene.internal */ + final IArray values; + + /* Which reader last copied a value into the slot. When + we compare two slots, we just compare-by-ord if the + readerGen is the same; else we must compare the + values (slower). + @lucene.internal */ + final IArray readerGen; + + /* Gen of current reader we are on. + @lucene.internal */ + int currentReaderGen = -1; + + /* Current reader's doc ord/values. + @lucene.internal */ + SortedDocValues termsIndex; + + private final String field; + + /* Bottom slot, or -1 if queue isn't full yet + @lucene.internal */ + int bottomSlot = -1; + + /* Bottom ord (same as ords[bottomSlot] once bottomSlot + is set). Cached for faster compares. + @lucene.internal */ + int bottomOrd; + + /* True if current bottom slot matches the current + reader. + @lucene.internal */ + boolean bottomSameReader; + + /* Bottom value (same as values[bottomSlot] once + bottomSlot is set). Cached for faster compares. + @lucene.internal */ + byte[] bottomValue; + + /** Set by setTopValue. */ + byte[] topValue; + + boolean topSameReader; + int topOrd; + + /** -1 if missing values are sorted first, 1 if they are sorted last */ + final int missingSortCmp; + + /** Which ordinal to use for a missing value. */ + final int missingOrd; + + /** Creates this, sorting missing values first. */ + public TermOrdValComparator(LLTempLMDBEnv env, int numHits, String field) { + this(env, numHits, field, false); + } + + /** + * Creates this, with control over how missing values are sorted. Pass sortMissingLast=true to + * put missing values at the end. + */ + public TermOrdValComparator(LLTempLMDBEnv env, int numHits, String field, boolean sortMissingLast) { + ords = new LMDBArray<>(env, new IntCodec(), numHits, 0); + values = new LMDBArray<>(env, new ByteArrayCodec(), numHits, null); + readerGen = new LMDBArray<>(env, new IntCodec(), numHits, 0); + this.field = field; + if (sortMissingLast) { + missingSortCmp = 1; + missingOrd = Integer.MAX_VALUE; + } else { + missingSortCmp = -1; + missingOrd = -1; + } + } + + private int getOrdForDoc(int doc) throws IOException { + if (termsIndex.advanceExact(doc)) { + return termsIndex.ordValue(); + } else { + return -1; + } + } + + @Override + public int compare(int slot1, int slot2) { + if ((int) readerGen.getOrDefault(slot2, 0) == readerGen.getOrDefault(slot1, 0)) { + return ords.getOrDefault(slot1, 0) - ords.getOrDefault(slot2, 0); + } + + final var val1 = values.get(slot1); + final var val2 = values.get(slot2); + if (val1 == null) { + if (val2 == null) { + return 0; + } + return missingSortCmp; + } else if (val2 == null) { + return -missingSortCmp; + } + return Arrays.compare(val1, val2); + } + + @Override + public int compareBottom(int doc) throws IOException { + assert bottomSlot != -1; + int docOrd = getOrdForDoc(doc); + if (docOrd == -1) { + docOrd = missingOrd; + } + if (bottomSameReader) { + // ord is precisely comparable, even in the equal case + return bottomOrd - docOrd; + } else if (bottomOrd >= docOrd) { + // the equals case always means bottom is > doc + // (because we set bottomOrd to the lower bound in + // setBottom): + return 1; + } else { + return -1; + } + } + + @Override + public void copy(int slot, int doc) throws IOException { + int ord = getOrdForDoc(doc); + if (ord == -1) { + ord = missingOrd; + values.reset(slot); + } else { + assert ord >= 0; + values.set(slot, copyBytes(termsIndex.lookupOrd(ord))); + } + ords.set(slot, ord); + readerGen.set(slot, currentReaderGen); + } + + private byte[] copyBytes(BytesRef lookupOrd) { + if (lookupOrd == null) return null; + return Arrays.copyOfRange(lookupOrd.bytes, lookupOrd.offset, lookupOrd.length); + } + + /** Retrieves the SortedDocValues for the field in this segment */ + protected SortedDocValues getSortedDocValues(LeafReaderContext context, String field) + throws IOException { + return DocValues.getSorted(context.reader(), field); + } + + @Override + public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { + termsIndex = getSortedDocValues(context, field); + currentReaderGen++; + + if (topValue != null) { + // Recompute topOrd/SameReader + int ord = termsIndex.lookupTerm(new BytesRef(topValue)); + if (ord >= 0) { + topSameReader = true; + topOrd = ord; + } else { + topSameReader = false; + topOrd = -ord - 2; + } + } else { + topOrd = missingOrd; + topSameReader = true; + } + // System.out.println(" getLeafComparator topOrd=" + topOrd + " topSameReader=" + + // topSameReader); + + if (bottomSlot != -1) { + // Recompute bottomOrd/SameReader + setBottom(bottomSlot); + } + + return this; + } + + @Override + public void setBottom(final int bottom) throws IOException { + bottomSlot = bottom; + + bottomValue = values.get(bottomSlot); + if (currentReaderGen == readerGen.getOrDefault(bottomSlot, 0)) { + bottomOrd = ords.getOrDefault(bottomSlot, 0); + bottomSameReader = true; + } else { + if (bottomValue == null) { + // missingOrd is null for all segments + assert ords.getOrDefault(bottomSlot, 0) == missingOrd; + bottomOrd = missingOrd; + bottomSameReader = true; + readerGen.set(bottomSlot, currentReaderGen); + } else { + final int ord = termsIndex.lookupTerm(new BytesRef(bottomValue)); + if (ord < 0) { + bottomOrd = -ord - 2; + bottomSameReader = false; + } else { + bottomOrd = ord; + // exact value match + bottomSameReader = true; + readerGen.set(bottomSlot, currentReaderGen); + ords.set(bottomSlot, bottomOrd); + } + } + } + } + + @Override + public void setTopValue(BytesRef value) { + // null is fine: it means the last doc of the prior + // search was missing this value + topValue = copyBytes(value); + // System.out.println("setTopValue " + topValue); + } + + @Override + public BytesRef value(int slot) { + return getBytesRef(values.get(slot)); + } + + private BytesRef getBytesRef(byte[] bytes) { + if (bytes == null) return null; + return new BytesRef(bytes); + } + + @Override + public int compareTop(int doc) throws IOException { + + int ord = getOrdForDoc(doc); + if (ord == -1) { + ord = missingOrd; + } + + if (topSameReader) { + // ord is precisely comparable, even in the equal + // case + // System.out.println("compareTop doc=" + doc + " ord=" + ord + " ret=" + (topOrd-ord)); + return topOrd - ord; + } else if (ord <= topOrd) { + // the equals case always means doc is < value + // (because we set lastOrd to the lower bound) + return 1; + } else { + return -1; + } + } + + @Override + public int compareValues(BytesRef val1, BytesRef val2) { + if (val1 == null) { + if (val2 == null) { + return 0; + } + return missingSortCmp; + } else if (val2 == null) { + return -missingSortCmp; + } + return val1.compareTo(val2); + } + + @Override + public void setScorer(Scorable scorer) {} +} \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java index d06a2be..7be152b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java @@ -43,7 +43,7 @@ public class AdaptiveLocalSearcher implements LocalSearcher { LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer) { - if (queryParams.limit() == 0) { + if (queryParams.limitLong() == 0) { return countSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer); } else { return localSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java index 6a77d75..6d4d072 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java @@ -13,7 +13,7 @@ public class AdaptiveMultiSearcher implements MultiSearcher, Closeable { private static final MultiSearcher count = new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher()); - private static final MultiSearcher scoredSimple = new ScoredPagedMultiSearcher(); + private static final MultiSearcher scoredPaged = new ScoredPagedMultiSearcher(); private static final MultiSearcher unsortedUnscoredPaged = new UnsortedUnscoredSimpleMultiSearcher(new PagedLocalSearcher()); @@ -23,8 +23,11 @@ public class AdaptiveMultiSearcher implements MultiSearcher, Closeable { private final UnsortedScoredFullMultiSearcher unsortedScoredFull; + private final SortedScoredFullMultiSearcher sortedScoredFull; + public AdaptiveMultiSearcher() throws IOException { unsortedScoredFull = new UnsortedScoredFullMultiSearcher(); + sortedScoredFull = new SortedScoredFullMultiSearcher(); } @Override @@ -48,17 +51,20 @@ public class AdaptiveMultiSearcher implements MultiSearcher, Closeable { String keyFieldName, LLSearchTransformer transformer) { // offset + limit - long realLimit = ((long) queryParams.offset() + (long) queryParams.limit()); + long realLimit = queryParams.offsetLong() + queryParams.limitLong(); return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { - if (queryParams.limit() == 0) { + if (queryParams.limitLong() == 0) { return count.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } else if (queryParams.isSorted() || queryParams.needsScores()) { - if ((queryParams.isSorted() && !queryParams.isSortedByScore()) - || realLimit <= (long) queryParams.pageLimits().getPageLimit(0)) { - return scoredSimple.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + if (realLimit <= (long) queryParams.pageLimits().getPageLimit(0)) { + return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } else { - return unsortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + if ((queryParams.isSorted() && !queryParams.isSortedByScore())) { + return sortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + } else { + return unsortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + } } } else if (realLimit <= (long) queryParams.pageLimits().getPageLimit(0)) { // Run single-page searches using the paged multi searcher @@ -72,6 +78,7 @@ public class AdaptiveMultiSearcher implements MultiSearcher, Closeable { @Override public void close() throws IOException { + sortedScoredFull.close(); unsortedScoredFull.close(); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalQueryParams.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalQueryParams.java index fdc36d1..be97ef9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalQueryParams.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalQueryParams.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.lucene.searcher; +import static it.cavallium.dbengine.lucene.LuceneUtils.safeLongToInt; + import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.PageLimits; import java.util.Objects; @@ -10,9 +12,31 @@ import org.apache.lucene.search.Sort; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -public record LocalQueryParams(@NotNull Query query, int offset, int limit, @NotNull PageLimits pageLimits, +public record LocalQueryParams(@NotNull Query query, int offsetInt, long offsetLong, int limitInt, long limitLong, + @NotNull PageLimits pageLimits, @Nullable Float minCompetitiveScore, @Nullable Sort sort, boolean complete) { + public LocalQueryParams(@NotNull Query query, + long offsetLong, + long limitLong, + @NotNull PageLimits pageLimits, + @Nullable Float minCompetitiveScore, + @Nullable Sort sort, + boolean complete) { + this(query, safeLongToInt(offsetLong), offsetLong, safeLongToInt(limitLong), limitLong, pageLimits, + minCompetitiveScore, sort, complete); + } + + public LocalQueryParams(@NotNull Query query, + int offsetInt, + int limitInt, + @NotNull PageLimits pageLimits, + @Nullable Float minCompetitiveScore, + @Nullable Sort sort, + boolean complete) { + this(query, offsetInt, offsetInt, limitInt, limitInt, pageLimits, minCompetitiveScore, sort, complete); + } + public boolean isSorted() { return sort != null; } @@ -46,7 +70,11 @@ public record LocalQueryParams(@NotNull Query query, int offset, int limit, @Not } } - public int getTotalHitsThreshold() { + public int getTotalHitsThresholdInt() { return LuceneUtils.totalHitsThreshold(this.complete); } + + public long getTotalHitsThresholdLong() { + return LuceneUtils.totalHitsThresholdLong(this.complete); + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/OfficialSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/OfficialSearcher.java new file mode 100644 index 0000000..1ade91a --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/OfficialSearcher.java @@ -0,0 +1,129 @@ +package it.cavallium.dbengine.lucene.searcher; + +import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.FullDocs; +import it.cavallium.dbengine.lucene.LLFieldDoc; +import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.collector.LMDBFullFieldDocCollector; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopDocsCollector; +import org.apache.lucene.search.TopFieldCollector; +import org.apache.lucene.search.TopScoreDocCollector; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class OfficialSearcher implements MultiSearcher, Closeable { + + protected static final Logger logger = LoggerFactory.getLogger(OfficialSearcher.class); + + private final LLTempLMDBEnv env; + + public OfficialSearcher() throws IOException { + this.env = new LLTempLMDBEnv(); + } + + @Override + public Mono> collectMulti(Mono> indexSearchersMono, + LocalQueryParams queryParams, + String keyFieldName, + LLSearchTransformer transformer) { + Mono queryParamsMono; + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono + .fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true); + } + + return queryParamsMono.flatMap(queryParams2 -> LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this + // Search results + .search(indexSearchers.shards(), queryParams2) + // Compute the results + .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, + keyFieldName, queryParams2)) + // Ensure that one LuceneSearchResult is always returned + .single(), + false)); + } + + /** + * Search effectively the raw results + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private Mono search(Iterable indexSearchers, + LocalQueryParams queryParams) { + return Mono + .fromCallable(() -> { + LLUtils.ensureBlocking(); + var totalHitsThreshold = queryParams.getTotalHitsThresholdInt(); + if (queryParams.isSorted() && !queryParams.isSortedByScore()) { + return TopFieldCollector.createSharedManager(queryParams.sort(), queryParams.limitInt(), null, + totalHitsThreshold); + } else { + return TopScoreDocCollector.createSharedManager(queryParams.limitInt(), null, totalHitsThreshold); + } + }) + .flatMap(sharedManager -> Flux + .fromIterable(indexSearchers) + .flatMap(shard -> Mono.fromCallable(() -> { + LLUtils.ensureBlocking(); + + var collector = sharedManager.newCollector(); + assert queryParams.complete() == collector.scoreMode().isExhaustive(); + queryParams.getScoreModeOptional().ifPresent(scoreMode -> { + assert scoreMode == collector.scoreMode(); + }); + + shard.search(queryParams.query(), collector); + return collector; + })) + .collectList() + .flatMap(collectors -> Mono.fromCallable(() -> { + LLUtils.ensureBlocking(); + return sharedManager.reduce((List) collectors); + })) + ); + } + + /** + * Compute the results, extracting useful data + */ + private Mono> computeResults(Mono dataMono, + LLIndexSearchers indexSearchers, + String keyFieldName, + LocalQueryParams queryParams) { + return dataMono.map(data -> { + var totalHitsCount = LuceneUtils.convertTotalHitsCount(data.totalHits); + + Flux hitsFlux = LuceneUtils + .convertHits(Flux.fromArray(data.scoreDocs), + indexSearchers.shards(), keyFieldName, true) + .skip(queryParams.offsetLong()) + .take(queryParams.limitLong(), true); + + return new LuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers::close).send(); + }); + } + + @Override + public void close() throws IOException { + env.close(); + } + + @Override + public String getName() { + return "official"; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java index 468e70d..89cc7a3 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java @@ -15,10 +15,12 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Objects; +import org.apache.lucene.search.Collector; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocsCollector; +import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits.Relation; import reactor.core.publisher.Flux; @@ -71,10 +73,10 @@ public class PagedLocalSearcher implements LocalSearcher { * Get the pagination info */ private PaginationInfo getPaginationInfo(LocalQueryParams queryParams) { - if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) { - return new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.pageLimits(), true); + if (queryParams.limitInt() <= MAX_SINGLE_SEARCH_LIMIT) { + return new PaginationInfo(queryParams.limitInt(), queryParams.offsetInt(), queryParams.pageLimits(), true); } else { - return new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.pageLimits(), false); + return new PaginationInfo(queryParams.limitInt(), queryParams.offsetInt(), queryParams.pageLimits(), false); } } @@ -109,7 +111,7 @@ public class PagedLocalSearcher implements LocalSearcher { Flux firstPageHitsFlux = LuceneUtils.convertHits(Flux.fromArray(scoreDocs), indexSearchers, keyFieldName, true) - .take(queryParams.limit(), true); + .take(queryParams.limitInt(), true); CurrentPageInfo nextPageInfo = firstPageData.nextPageInfo(); @@ -183,7 +185,7 @@ public class PagedLocalSearcher implements LocalSearcher { TopDocs pageTopDocs; try { TopDocsCollector collector = TopDocsCollectorUtils.getTopDocsCollector(queryParams.sort(), - currentPageLimit, s.last(), queryParams.getTotalHitsThreshold(), + currentPageLimit, s.last(), queryParams.getTotalHitsThresholdInt(), allowPagination, queryParams.needsScores()); assert queryParams.complete() == collector.scoreMode().isExhaustive(); queryParams.getScoreModeOptional().ifPresent(scoreMode -> { @@ -196,6 +198,10 @@ public class PagedLocalSearcher implements LocalSearcher { } else { pageTopDocs = collector.topDocs(); } + // Populate scores of topfieldcollector. By default it doesn't popupate the scores + if (queryParams.needsScores() && ((Collector) collector) instanceof TopFieldCollector) { + TopFieldCollector.populateScores(pageTopDocs.scoreDocs, indexSearchers.get(0), queryParams.query()); + } } catch (IOException e) { sink.error(e); return EMPTY_STATUS; diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java index 66c51ed..0d6f8ac 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java @@ -12,11 +12,9 @@ import it.cavallium.dbengine.lucene.collector.ScoringShardsCollectorManager; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import java.util.Arrays; import java.util.List; -import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Sort; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.log.Logger; @@ -71,17 +69,17 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { * Get the pagination info */ private PaginationInfo getPaginationInfo(LocalQueryParams queryParams) { - if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) { - return new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.pageLimits(), true); + if (queryParams.limitInt() <= MAX_SINGLE_SEARCH_LIMIT) { + return new PaginationInfo(queryParams.limitInt(), queryParams.offsetInt(), queryParams.pageLimits(), true); } else { - return new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.pageLimits(), false); + return new PaginationInfo(queryParams.limitInt(), queryParams.offsetInt(), queryParams.pageLimits(), false); } } /** * Search effectively the raw results of the first page */ - private Mono searchFirstPage(Iterable indexSearchers, + private Mono searchFirstPage(List indexSearchers, LocalQueryParams queryParams, PaginationInfo paginationInfo) { var limit = paginationInfo.totalLimit(); @@ -107,7 +105,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { Flux firstPageHitsFlux = LuceneUtils.convertHits(Flux.fromArray(scoreDocs), indexSearchers.shards(), keyFieldName, true) - .take(queryParams.limit(), true); + .take(queryParams.limitInt(), true); CurrentPageInfo nextPageInfo = firstPageData.nextPageInfo(); @@ -160,7 +158,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { * skip the first n results in the first page */ private Mono searchPage(LocalQueryParams queryParams, - Iterable indexSearchers, + List indexSearchers, boolean allowPagination, PageLimits pageLimits, int resultsOffset, @@ -172,11 +170,12 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { throw new IndexOutOfBoundsException(resultsOffset); } if (s.pageIndex() == 0 || (s.last() != null && s.remainingLimit() > 0)) { + var query = queryParams.query(); @Nullable var sort = getSort(queryParams); var pageLimit = pageLimits.getPageLimit(s.pageIndex()); var after = (FieldDoc) s.last(); - var totalHitsThreshold = queryParams.getTotalHitsThreshold(); - return new ScoringShardsCollectorManager(sort, pageLimit, after, totalHitsThreshold, + var totalHitsThreshold = queryParams.getTotalHitsThresholdInt(); + return new ScoringShardsCollectorManager(query, sort, pageLimit, after, totalHitsThreshold, resultsOffset); } else { return null; @@ -199,7 +198,9 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { .collectList() .flatMap(collectors -> Mono.fromCallable(() -> { LLUtils.ensureBlocking(); + sharedManager.setIndexSearchers(indexSearchers); var pageTopDocs = sharedManager.reduce(collectors); + var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs); long nextRemainingLimit; if (allowPagination) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java new file mode 100644 index 0000000..d9b0c48 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java @@ -0,0 +1,119 @@ +package it.cavallium.dbengine.lucene.searcher; + +import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.FullDocs; +import it.cavallium.dbengine.lucene.LLFieldDoc; +import it.cavallium.dbengine.lucene.LLScoreDoc; +import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.collector.LMDBFullFieldDocCollector; +import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; +import java.io.Closeable; +import java.io.IOException; +import org.apache.lucene.search.IndexSearcher; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class SortedScoredFullMultiSearcher implements MultiSearcher, Closeable { + + protected static final Logger logger = LoggerFactory.getLogger(SortedScoredFullMultiSearcher.class); + + private final LLTempLMDBEnv env; + + public SortedScoredFullMultiSearcher() throws IOException { + this.env = new LLTempLMDBEnv(); + } + + @Override + public Mono> collectMulti(Mono> indexSearchersMono, + LocalQueryParams queryParams, + String keyFieldName, + LLSearchTransformer transformer) { + Mono queryParamsMono; + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono + .fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true); + } + + return queryParamsMono.flatMap(queryParams2 -> LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this + // Search results + .search(indexSearchers.shards(), queryParams2) + // Compute the results + .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, + keyFieldName, queryParams2)) + // Ensure that one LuceneSearchResult is always returned + .single(), + false)); + } + + /** + * Search effectively the raw results + */ + private Mono> search(Iterable indexSearchers, + LocalQueryParams queryParams) { + return Mono + .fromCallable(() -> { + LLUtils.ensureBlocking(); + var totalHitsThreshold = queryParams.getTotalHitsThresholdLong(); + return LMDBFullFieldDocCollector.createSharedManager(env, queryParams.sort(), queryParams.limitInt(), + totalHitsThreshold); + }) + .flatMap(sharedManager -> Flux + .fromIterable(indexSearchers) + .flatMap(shard -> Mono.fromCallable(() -> { + LLUtils.ensureBlocking(); + + var collector = sharedManager.newCollector(); + assert queryParams.complete() == collector.scoreMode().isExhaustive(); + queryParams.getScoreModeOptional().ifPresent(scoreMode -> { + assert scoreMode == collector.scoreMode(); + }); + + shard.search(queryParams.query(), collector); + return collector; + })) + .collectList() + .flatMap(collectors -> Mono.fromCallable(() -> { + LLUtils.ensureBlocking(); + return sharedManager.reduce(collectors); + })) + ); + } + + /** + * Compute the results, extracting useful data + */ + private Mono> computeResults(Mono> dataMono, + LLIndexSearchers indexSearchers, + String keyFieldName, + LocalQueryParams queryParams) { + return dataMono.map(data -> { + var totalHitsCount = LuceneUtils.convertTotalHitsCount(data.totalHits()); + + Flux hitsFlux = LuceneUtils + .convertHits(data.iterate(queryParams.offsetLong()).map(LLFieldDoc::toFieldDoc), + indexSearchers.shards(), keyFieldName, true) + .take(queryParams.limitLong(), true); + + return new LuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers::close).send(); + }); + } + + @Override + public void close() throws IOException { + env.close(); + } + + @Override + public String getName() { + return "sorted scored full multi"; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java index 8330048..0ca8e66 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java @@ -69,8 +69,8 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher, Closeable return Mono .fromCallable(() -> { LLUtils.ensureBlocking(); - var totalHitsThreshold = queryParams.getTotalHitsThreshold(); - return LMDBFullScoreDocCollector.createSharedManager(env, queryParams.limit(), totalHitsThreshold); + var totalHitsThreshold = queryParams.getTotalHitsThresholdLong(); + return LMDBFullScoreDocCollector.createSharedManager(env, queryParams.limitLong(), totalHitsThreshold); }) .flatMap(sharedManager -> Flux .fromIterable(indexSearchers) @@ -105,9 +105,9 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher, Closeable var totalHitsCount = LuceneUtils.convertTotalHitsCount(data.totalHits()); Flux hitsFlux = LuceneUtils - .convertHits(data.iterate(queryParams.offset()).map(LLScoreDoc::toScoreDoc), + .convertHits(data.iterate(queryParams.offsetLong()).map(LLScoreDoc::toScoreDoc), indexSearchers.shards(), keyFieldName, true) - .take(queryParams.limit(), true); + .take(queryParams.limitLong(), true); return new LuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers::close).send(); }); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredSimpleMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredSimpleMultiSearcher.java index 5d8326b..63293d4 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredSimpleMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredSimpleMultiSearcher.java @@ -42,11 +42,11 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { return Mono .fromRunnable(() -> { LLUtils.ensureBlocking(); - if (queryParams2.isSorted() && queryParams2.limit() > 0) { + if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { throw new UnsupportedOperationException("Sorted queries are not supported" + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); } - if (queryParams2.needsScores() && queryParams2.limit() > 0) { + if (queryParams2.needsScores() && queryParams2.limitLong() > 0) { throw new UnsupportedOperationException("Scored queries are not supported" + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); } @@ -73,8 +73,8 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount); Flux mergedFluxes = Flux .merge(resultsFluxes) - .skip(queryParams2.offset()) - .take(queryParams2.limit(), true); + .skip(queryParams2.offsetLong()) + .take(queryParams2.limitLong(), true); return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { for (LuceneSearchResult luceneSearchResult : resultsToDrop) { @@ -92,8 +92,8 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) { return new LocalQueryParams(queryParams.query(), - 0, - LuceneUtils.safeLongToInt((long) queryParams.offset() + (long) queryParams.limit()), + 0L, + queryParams.offsetLong() + queryParams.limitLong(), queryParams.pageLimits(), queryParams.minCompetitiveScore(), queryParams.sort(), diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java index a86ff7a..e216cac 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java @@ -50,11 +50,11 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { return queryParamsMono .flatMap(queryParams2 -> { var localQueryParams = getLocalQueryParams(queryParams2); - if (queryParams2.isSorted() && queryParams2.limit() > 0) { + if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { return Mono.error(new UnsupportedOperationException("Sorted queries are not supported" + " by UnsortedUnscoredContinuousLuceneMultiSearcher")); } - if (queryParams2.needsScores() && queryParams2.limit() > 0) { + if (queryParams2.needsScores() && queryParams2.limitLong() > 0) { return Mono.error(new UnsupportedOperationException("Scored queries are not supported" + " by UnsortedUnscoredContinuousLuceneMultiSearcher")); } @@ -101,8 +101,8 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { var totalHitsCount = new TotalHitsCount(0, false); Flux mergedFluxes = resultsFlux - .skip(queryParams2.offset()) - .take(queryParams2.limit(), true); + .skip(queryParams2.offsetLong()) + .take(queryParams2.limitLong(), true); return new LuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers::close).send(); }); @@ -112,8 +112,8 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) { return new LocalQueryParams(queryParams.query(), - 0, - LuceneUtils.safeLongToInt((long) queryParams.offset() + (long) queryParams.limit()), + 0L, + queryParams.offsetLong() + queryParams.limitLong(), queryParams.pageLimits(), queryParams.minCompetitiveScore(), queryParams.sort(), diff --git a/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java b/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java index b5faad8..fbe5406 100644 --- a/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java +++ b/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java @@ -7,6 +7,7 @@ import static it.cavallium.dbengine.SyncUtils.*; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; +import io.net5.buffer.PooledByteBufAllocator; import it.cavallium.dbengine.DbTestUtils.TempDb; import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.client.LuceneIndex; @@ -30,8 +31,10 @@ import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher; import it.cavallium.dbengine.lucene.searcher.CountLocalSearcher; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; import it.cavallium.dbengine.lucene.searcher.MultiSearcher; +import it.cavallium.dbengine.lucene.searcher.OfficialSearcher; import it.cavallium.dbengine.lucene.searcher.ScoredPagedMultiSearcher; import it.cavallium.dbengine.lucene.searcher.PagedLocalSearcher; +import it.cavallium.dbengine.lucene.searcher.SortedScoredFullMultiSearcher; import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredSimpleMultiSearcher; import it.cavallium.dbengine.lucene.searcher.UnsortedScoredFullMultiSearcher; import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredStreamingMultiSearcher; @@ -76,6 +79,9 @@ public class TestLuceneSearches { private static final Map ELEMENTS; static { + // Start the pool by creating and deleting a direct buffer + PooledByteBufAllocator.DEFAULT.directBuffer().release(); + var modifiableElements = new HashMap(); modifiableElements.put("test-key-1", "0123456789"); modifiableElements.put("test-key-2", "test 0123456789 test word"); @@ -148,7 +154,9 @@ public class TestLuceneSearches { sink.next(new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher())); } else { sink.next(new ScoredPagedMultiSearcher()); - if (!info.sorted() || info.sortedByScore()) { + if (info.sorted() && !info.sortedByScore()) { + sink.next(new SortedScoredFullMultiSearcher()); + } else { sink.next(new UnsortedScoredFullMultiSearcher()); } if (!info.sorted()) { @@ -183,15 +191,12 @@ public class TestLuceneSearches { .toStream(); } - private static void runSearchers(ExpectedQueryType expectedQueryType, FailableConsumer consumer) { + private static void runSearchers(ExpectedQueryType expectedQueryType, FailableConsumer consumer) + throws Throwable { var searchers = run(getSearchers(expectedQueryType).collectList()); for (LocalSearcher searcher : searchers) { log.info("Using searcher \"{}\"", searcher.getName()); - try { - consumer.accept(searcher); - } catch (Throwable e) { - Assertions.fail(e); - } + consumer.accept(searcher); } } @@ -248,7 +253,7 @@ public class TestLuceneSearches { @ParameterizedTest @MethodSource("provideQueryArgumentsScoreModeAndSort") - public void testSearchNoDocs(boolean shards, MultiSort> multiSort) { + public void testSearchNoDocs(boolean shards, MultiSort> multiSort) throws Throwable { var sorted = multiSort.isSorted(); var sortedByScore = multiSort.getQuerySort().getBasicType$() == BasicType.ScoreSort; runSearchers(new ExpectedQueryType(shards, sorted, sortedByScore, true, false), searcher -> { @@ -271,7 +276,7 @@ public class TestLuceneSearches { @ParameterizedTest @MethodSource("provideQueryArgumentsScoreModeAndSort") - public void testSearchAllDocs(boolean shards, MultiSort> multiSort) { + public void testSearchAllDocs(boolean shards, MultiSort> multiSort) throws Throwable { var sorted = multiSort.isSorted(); var sortedByScore = multiSort.getQuerySort().getBasicType$() == BasicType.ScoreSort; runSearchers(new ExpectedQueryType(shards, sorted, sortedByScore, true, false), (LocalSearcher searcher) -> { @@ -287,12 +292,21 @@ public class TestLuceneSearches { assertHitsIfPossible(ELEMENTS.size(), hits); var keys = getResults(results); - assertResults(ELEMENTS.keySet().stream().toList(), keys, false, sortedByScore); + + var officialSearcher = new OfficialSearcher(); + luceneIndex = getLuceneIndex(shards, officialSearcher); + var officialQuery = queryBuilder.limit(ELEMENTS.size() * 2L).build(); + try (var officialResults = run(luceneIndex.search(officialQuery)).receive()) { + var officialKeys = getResults(officialResults).stream().toList(); + + assertResults(officialKeys, keys, sorted, sortedByScore); + } + } }); } - private void assertResults(List expectedKeys, List resultKeys, boolean sorted, boolean sortedByScore) { + private void assertResults(List expectedKeys, List resultKeys, boolean sorted, boolean sortedByScore) { if (sortedByScore) { float lastScore = Float.NEGATIVE_INFINITY; @@ -304,12 +318,14 @@ public class TestLuceneSearches { } } - if (!sorted) { - var results = resultKeys.stream().map(Scored::key).collect(Collectors.toSet()); - Assertions.assertEquals(new HashSet<>(expectedKeys), results); - } else { + if (sortedByScore) { + Assertions.assertEquals(expectedKeys, resultKeys); + } else if (sorted) { var results = resultKeys.stream().map(Scored::key).toList(); - Assertions.assertEquals(expectedKeys, results); + Assertions.assertEquals(expectedKeys.stream().map(Scored::key).toList(), results); + } else { + var results = resultKeys.stream().map(Scored::key).collect(Collectors.toSet()); + Assertions.assertEquals(new HashSet<>(expectedKeys.stream().map(Scored::key).toList()), results); } }