Implement memory-mapped sorted searcher for streaming huge query results

This commit is contained in:
Andrea Cavalli 2021-10-13 00:23:56 +02:00
parent ab9a8a0da1
commit 09f60a3a99
57 changed files with 2792 additions and 77 deletions

View File

@ -224,6 +224,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.novasearch</groupId>
@ -248,6 +249,11 @@
<artifactId>micrometer-registry-jmx</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.lmdbjava</groupId>
<artifactId>lmdbjava</artifactId>
<version>0.8.2</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>

View File

@ -0,0 +1,81 @@
package it.cavallium.dbengine;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElseGet;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLLocalSingleton;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LuceneLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult;
import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.publisher.Mono;
public class SwappableLuceneSearcher implements LuceneLocalSearcher, LuceneMultiSearcher, Closeable {
private final AtomicReference<LuceneLocalSearcher> single = new AtomicReference<>(null);
private final AtomicReference<LuceneMultiSearcher> multi = new AtomicReference<>(null);
public SwappableLuceneSearcher() {
}
@Override
public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
var single = requireNonNullElseGet(this.single.get(), this.multi::get);
requireNonNull(single, "LuceneLocalSearcher not set");
return single.collect(indexSearcherMono, queryParams, keyFieldName, transformer);
}
@Override
public String getName() {
var single = this.single.get();
var multi = this.multi.get();
if (single == multi) {
if (single == null) {
return "swappable";
} else {
return single.getName();
}
} else {
return "swappable[single=" + single.getName() + ",multi=" + multi.getName() + "]";
}
}
@Override
public Mono<Send<LuceneSearchResult>> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
var multi = requireNonNull(this.multi.get(), "LuceneMultiSearcher not set");
return multi.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
}
public void setSingle(LuceneLocalSearcher single) {
this.single.set(single);
}
public void setMulti(LuceneMultiSearcher multi) {
this.multi.set(multi);
}
@Override
public void close() throws IOException {
if (this.single.get() instanceof Closeable closeable) {
closeable.close();
}
if (this.multi.get() instanceof Closeable closeable) {
closeable.close();
}
}
}

View File

@ -188,7 +188,9 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
queryParams.toQueryParams(),
indicizer.getKeyFieldName()
)
.transform(this::transformLuceneResultWithTransformer);
.single()
.transform(this::transformLuceneResultWithTransformer)
.single();
}
@Override
@ -217,6 +219,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
public Mono<TotalHitsCount> count(@Nullable CompositeSnapshot snapshot, Query query) {
return this
.search(ClientQueryParams.<SearchResultKey<T>>builder().snapshot(snapshot).query(query).limit(0).build())
.single()
.map(searchResultKeysSend -> {
try (var searchResultKeys = searchResultKeysSend.receive()) {
return searchResultKeys.totalHitsCount();

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.client.query.current.data.DocSort;
import it.cavallium.dbengine.client.query.current.data.NoSort;
import it.cavallium.dbengine.client.query.current.data.NumericSort;
import it.cavallium.dbengine.client.query.current.data.RandomSort;
@ -7,6 +8,8 @@ import it.cavallium.dbengine.client.query.current.data.ScoreSort;
import it.cavallium.dbengine.client.query.current.data.Sort;
import it.cavallium.dbengine.database.LLKeyScore;
import java.util.Comparator;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.function.Function;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
@ -63,6 +66,18 @@ public class MultiSort<T> {
return new MultiSort<>(ScoreSort.of());
}
public static <T> MultiSort<SearchResultKey<T>> noSort() {
return new MultiSort<>(NoSort.of());
}
public static <T> MultiSort<SearchResultKey<T>> docSort() {
return new MultiSort<>(DocSort.of());
}
public static <T> MultiSort<SearchResultKey<T>> numericSort(String field, boolean reverse) {
return new MultiSort<>(NumericSort.of(field, reverse));
}
public static <T, U> MultiSort<SearchResultItem<T, U>> topScoreWithValues() {
return new MultiSort<>(ScoreSort.of());
}
@ -74,4 +89,26 @@ public class MultiSort<T> {
public Sort getQuerySort() {
return querySort;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MultiSort<?> multiSort = (MultiSort<?>) o;
return Objects.equals(querySort, multiSort.querySort);
}
@Override
public int hashCode() {
return Objects.hash(querySort);
}
@Override
public String toString() {
return querySort.toString();
}
}

View File

@ -18,6 +18,7 @@ import it.cavallium.dbengine.client.query.current.data.TermAndBoost;
import it.cavallium.dbengine.client.query.current.data.TermPosition;
import it.cavallium.dbengine.client.query.current.data.TermQuery;
import it.cavallium.dbengine.client.query.current.data.WildcardQuery;
import it.cavallium.dbengine.lucene.RandomSortField;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
@ -146,6 +147,8 @@ public class QueryParser {
case NumericSort:
NumericSort numericSort = (NumericSort) sort;
return new Sort(new SortedNumericSortField(numericSort.field(), Type.LONG, numericSort.reverse()));
case RandomSort:
return new Sort(new RandomSortField());
default:
throw new IllegalStateException("Unexpected value: " + sort.getBasicType$());
}

View File

@ -5,7 +5,9 @@ import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.database.lucene.LuceneHacks;
import java.util.List;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
@SuppressWarnings("UnusedReturnValue")
@ -23,7 +25,8 @@ public interface LLDatabaseConnection {
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
LuceneOptions luceneOptions);
LuceneOptions luceneOptions,
@Nullable LuceneHacks luceneHacks);
Mono<Void> disconnect();
}

View File

@ -122,7 +122,7 @@ public class LLUtils {
case COMPLETE -> ScoreMode.COMPLETE;
case TOP_SCORES -> ScoreMode.TOP_SCORES;
case COMPLETE_NO_SCORES -> ScoreMode.COMPLETE_NO_SCORES;
default -> throw new IllegalStateException("Unexpected value: " + scoreMode);
case NO_SCORES -> ScoreMode.TOP_DOCS;
};
}

View File

@ -8,11 +8,14 @@ import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneMultiSearcher;
import it.cavallium.dbengine.netty.JMXNettyMonitoringManager;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedList;
import java.util.List;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -68,7 +71,8 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
LuceneOptions luceneOptions) {
LuceneOptions luceneOptions,
@Nullable LuceneHacks luceneHacks) {
return Mono
.fromCallable(() -> {
if (instancesCount != 1) {
@ -77,14 +81,16 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
instancesCount,
indicizerAnalyzers,
indicizerSimilarities,
luceneOptions
luceneOptions,
luceneHacks
);
} else {
return new LLLocalLuceneIndex(basePath.resolve("lucene"),
name,
indicizerAnalyzers,
indicizerSimilarities,
luceneOptions
luceneOptions,
luceneHacks
);
}
})

View File

@ -16,6 +16,7 @@ import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneLocalSearcher;
@ -61,7 +62,7 @@ import reactor.util.function.Tuple2;
public class LLLocalLuceneIndex implements LLLuceneIndex {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalLuceneIndex.class);
private static final LuceneLocalSearcher localSearcher = new AdaptiveLuceneLocalSearcher();
private final LuceneLocalSearcher localSearcher;
/**
* Global lucene index scheduler.
* There is only a single thread globally to not overwhelm the disk with
@ -85,7 +86,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
String name,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
LuceneOptions luceneOptions) throws IOException {
LuceneOptions luceneOptions,
@Nullable LuceneHacks luceneHacks) throws IOException {
Path directoryPath;
if (luceneOptions.inMemory() != (luceneBasePath == null)) {
throw new IllegalArgumentException();
@ -165,6 +167,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.lowMemory = lowMemory;
this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers);
this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
if (luceneHacks != null && luceneHacks.customLocalSearcher() != null) {
localSearcher = luceneHacks.customLocalSearcher().get();
} else {
localSearcher = new AdaptiveLuceneLocalSearcher();
}
var indexWriterConfig = new IndexWriterConfig(luceneAnalyzer);
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
@ -188,7 +195,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
logger.trace("WriterSchedulerMaxThreadCount: {}", writerSchedulerMaxThreadCount);
indexWriterConfig.setMergeScheduler(mergeScheduler);
indexWriterConfig.setRAMBufferSizeMB(luceneOptions.indexWriterBufferSize() / 1024D / 1024D);
if (luceneOptions.indexWriterBufferSize() == -1) {
//todo: allow to configure maxbuffereddocs fallback
indexWriterConfig.setMaxBufferedDocs(1000);
// disable ram buffer size after enabling maxBufferedDocs
indexWriterConfig.setRAMBufferSizeMB(-1);
} else {
indexWriterConfig.setRAMBufferSizeMB(luceneOptions.indexWriterBufferSize() / 1024D / 1024D);
}
indexWriterConfig.setReaderPooling(false);
indexWriterConfig.setSimilarity(getLuceneSimilarity());
this.indexWriter = new IndexWriter(directory, indexWriterConfig);

View File

@ -10,11 +10,13 @@ import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
@ -33,6 +35,7 @@ import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@ -43,14 +46,15 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
private final PerFieldAnalyzerWrapper luceneAnalyzer;
private final PerFieldSimilarityWrapper luceneSimilarity;
private final LuceneMultiSearcher multiSearcher = new AdaptiveLuceneMultiSearcher();
private final LuceneMultiSearcher multiSearcher;
public LLLocalMultiLuceneIndex(Path lucene,
String name,
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
LuceneOptions luceneOptions) throws IOException {
LuceneOptions luceneOptions,
@Nullable LuceneHacks luceneHacks) throws IOException {
if (instancesCount <= 1 || instancesCount > 100) {
throw new IOException("Unsupported instances count: " + instancesCount);
@ -68,12 +72,19 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
instanceName,
indicizerAnalyzers,
indicizerSimilarities,
luceneOptions
luceneOptions,
luceneHacks
);
}
this.luceneIndices = luceneIndices;
this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers);
this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
if (luceneHacks != null && luceneHacks.customMultiSearcher() != null) {
multiSearcher = luceneHacks.customMultiSearcher().get();
} else {
multiSearcher = new AdaptiveLuceneMultiSearcher();
}
}
private LLLocalLuceneIndex getLuceneIndex(LLTerm id) {
@ -234,6 +245,12 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
return Flux
.fromArray(luceneIndices)
.flatMap(LLLocalLuceneIndex::close)
.then(Mono.fromCallable(() -> {
if (multiSearcher instanceof Closeable closeable) {
closeable.close();
}
return null;
}).subscribeOn(Schedulers.boundedElastic()))
.then();
}

View File

@ -0,0 +1,54 @@
package it.cavallium.dbengine.database.disk;
import io.net5.buffer.ByteBuf;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.concurrent.Phaser;
import org.lmdbjava.Net5ByteBufProxy;
import org.lmdbjava.Env;
import static org.lmdbjava.EnvFlags.*;
public class LLTempLMDBEnv implements Closeable {
private static final long TEN_MEBYBYTES = 10_485_760;
private static final int MAX_DATABASES = 1024;
private final Phaser resources = new Phaser(1);
private final Path tempDirectory;
private final Env<ByteBuf> env;
public LLTempLMDBEnv() throws IOException {
tempDirectory = Files.createTempDirectory("lmdb");
var envBuilder = Env.create(Net5ByteBufProxy.PROXY_NETTY)
.setMapSize(TEN_MEBYBYTES)
.setMaxDbs(MAX_DATABASES);
//env = envBuilder.open(tempDirectory.toFile(), MDB_NOLOCK, MDB_NOSYNC, MDB_NOTLS, MDB_NORDAHEAD, MDB_WRITEMAP);
env = envBuilder.open(tempDirectory.toFile(), MDB_NOTLS, MDB_WRITEMAP, MDB_NORDAHEAD);
}
public Env<ByteBuf> getEnvAndIncrementRef() {
resources.register();
return env;
}
public void decrementRef() {
resources.arriveAndDeregister();
}
@Override
public void close() throws IOException {
resources.arriveAndAwaitAdvance();
env.close();
//noinspection ResultOfMethodCallIgnored
Files.walk(tempDirectory)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
}
}

View File

@ -0,0 +1,10 @@
package it.cavallium.dbengine.database.lucene;
import it.cavallium.dbengine.lucene.searcher.LuceneLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher;
import java.util.function.Supplier;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public record LuceneHacks(@Nullable Supplier<@NotNull LuceneLocalSearcher> customLocalSearcher,
@Nullable Supplier<@NotNull LuceneMultiSearcher> customMultiSearcher) {}

View File

@ -10,8 +10,10 @@ import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex;
import it.cavallium.dbengine.database.lucene.LuceneHacks;
import it.cavallium.dbengine.netty.JMXNettyMonitoringManager;
import java.util.List;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -55,13 +57,15 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection {
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
LuceneOptions luceneOptions) {
LuceneOptions luceneOptions,
@Nullable LuceneHacks luceneHacks) {
return Mono
.<LLLuceneIndex>fromCallable(() -> new LLLocalLuceneIndex(null,
name,
indicizerAnalyzers,
indicizerSimilarities,
luceneOptions
luceneOptions,
luceneHacks
))
.subscribeOn(Schedulers.boundedElastic());
}

View File

@ -0,0 +1,16 @@
package it.cavallium.dbengine.lucene;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import org.jetbrains.annotations.NotNull;
public interface CloseableIterable<T> extends Iterable<T>, Closeable {
@Override
void close();
@NotNull
@Override
Iterator<T> iterator();
}

View File

@ -0,0 +1,60 @@
package it.cavallium.dbengine.lucene;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
public class EmptyPriorityQueue<T> implements PriorityQueue<T> {
@Override
public void add(T element) {
throw new UnsupportedOperationException();
}
@Override
public T top() {
return null;
}
@Override
public T pop() {
return null;
}
@Override
public void updateTop() {
}
@Override
public void updateTop(T newTop) {
assert newTop == null;
}
@Override
public long size() {
return 0;
}
@Override
public void clear() {
}
@Override
public boolean remove(T element) {
throw new UnsupportedOperationException();
}
@Override
public Flux<T> iterate() {
return Flux.empty();
}
@Override
public void close() throws IOException {
}
}

View File

@ -0,0 +1,148 @@
package it.cavallium.dbengine.lucene;
import static it.cavallium.dbengine.lucene.LLDocElementScoreComparator.SCORE_DOC_SCORE_ELEM_COMPARATOR;
import static org.apache.lucene.search.TotalHits.Relation.*;
import java.util.Comparator;
import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
public interface FullDocs<T extends LLDocElement> extends ResourceIterable<T> {
Comparator<LLDocElement> SHARD_INDEX_TIE_BREAKER = Comparator.comparingInt(LLDocElement::shardIndex);
Comparator<LLDocElement> DOC_ID_TIE_BREAKER = Comparator.comparingInt(LLDocElement::doc);
Comparator<LLDocElement> DEFAULT_TIE_BREAKER = SHARD_INDEX_TIE_BREAKER.thenComparing(DOC_ID_TIE_BREAKER);
@Override
Flux<T> iterate();
@Override
Flux<T> iterate(long skips);
TotalHits totalHits();
static <T extends LLDocElement> FullDocs<T> merge(@Nullable Sort sort, FullDocs<T>[] fullDocs) {
ResourceIterable<T> mergedIterable = mergeResourceIterable(sort, fullDocs);
TotalHits mergedTotalHits = mergeTotalHits(fullDocs);
return new FullDocs<>() {
@Override
public Flux<T> iterate() {
return mergedIterable.iterate();
}
@Override
public Flux<T> iterate(long skips) {
return mergedIterable.iterate(skips);
}
@Override
public TotalHits totalHits() {
return mergedTotalHits;
}
};
}
static <T extends LLDocElement> int tieBreakCompare(
T firstDoc,
T secondDoc,
Comparator<T> tieBreaker) {
assert tieBreaker != null;
int value = tieBreaker.compare(firstDoc, secondDoc);
if (value == 0) {
throw new IllegalStateException();
} else {
return value;
}
}
static <T extends LLDocElement> ResourceIterable<T> mergeResourceIterable(
@Nullable Sort sort,
FullDocs<T>[] fullDocs) {
return () -> {
@SuppressWarnings("unchecked")
Flux<T>[] iterables = new Flux[fullDocs.length];
for (int i = 0; i < fullDocs.length; i++) {
var singleFullDocs = fullDocs[i].iterate();
iterables[i] = singleFullDocs;
}
Comparator<LLDocElement> comp;
if (sort == null) {
// Merge maintaining sorting order (Algorithm taken from TopDocs.ScoreMergeSortQueue)
comp = SCORE_DOC_SCORE_ELEM_COMPARATOR.thenComparing(DEFAULT_TIE_BREAKER);
} else {
// Merge maintaining sorting order (Algorithm taken from TopDocs.MergeSortQueue)
SortField[] sortFields = sort.getSort();
var comparators = new FieldComparator[sortFields.length];
var reverseMul = new int[sortFields.length];
for(int compIDX = 0; compIDX < sortFields.length; ++compIDX) {
SortField sortField = sortFields[compIDX];
comparators[compIDX] = sortField.getComparator(1, compIDX);
reverseMul[compIDX] = sortField.getReverse() ? -1 : 1;
}
comp = (first, second) -> {
assert first != second;
LLFieldDoc firstFD = (LLFieldDoc) first;
LLFieldDoc secondFD = (LLFieldDoc) second;
for(int compIDX = 0; compIDX < comparators.length; ++compIDX) {
//noinspection rawtypes
FieldComparator fieldComp = comparators[compIDX];
//noinspection unchecked
int cmp = reverseMul[compIDX] * fieldComp.compareValues(firstFD.fields().get(compIDX), secondFD.fields().get(compIDX));
if (cmp != 0) {
return cmp;
}
}
return tieBreakCompare(first, second, DEFAULT_TIE_BREAKER);
};
}
@SuppressWarnings("unchecked")
Flux<T>[] fluxes = new Flux[fullDocs.length];
for (int i = 0; i < iterables.length; i++) {
var shardIndex = i;
fluxes[i] = iterables[i].<T>map(shard -> {
if (shard instanceof LLScoreDoc scoreDoc) {
//noinspection unchecked
return (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex);
} else {
throw new UnsupportedOperationException("Unsupported type " + shard.getClass());
}
});
if (fullDocs[i].totalHits().relation == EQUAL_TO) {
fluxes[i] = fluxes[i].take(fullDocs[i].totalHits().value, true);
}
}
return Flux.mergeComparing(comp, fluxes);
};
}
static <T extends LLDocElement> TotalHits mergeTotalHits(FullDocs<T>[] fullDocs) {
long totalCount = 0;
Relation totalRelation = EQUAL_TO;
for (FullDocs<T> fullDoc : fullDocs) {
var totalHits = fullDoc.totalHits();
totalCount += totalHits.value;
totalRelation = switch (totalHits.relation) {
case EQUAL_TO -> totalRelation;
case GREATER_THAN_OR_EQUAL_TO -> totalRelation == EQUAL_TO ? GREATER_THAN_OR_EQUAL_TO : totalRelation;
};
}
return new TotalHits(totalCount, totalRelation);
}
}

View File

@ -0,0 +1,10 @@
package it.cavallium.dbengine.lucene;
public sealed interface LLDocElement permits LLFieldDoc, LLScoreDoc {
int doc();
float score();
int shardIndex();
}

View File

@ -0,0 +1,13 @@
package it.cavallium.dbengine.lucene;
import java.util.Comparator;
class LLDocElementScoreComparator implements Comparator<LLDocElement> {
public static final Comparator<LLDocElement> SCORE_DOC_SCORE_ELEM_COMPARATOR = new LLDocElementScoreComparator();
@Override
public int compare(LLDocElement hitA, LLDocElement hitB) {
return Float.compare(hitA.score(), hitB.score());
}
}

View File

@ -0,0 +1,5 @@
package it.cavallium.dbengine.lucene;
import java.util.List;
public record LLFieldDoc(int doc, float score, int shardIndex, List<Object> fields) implements LLDocElement {}

View File

@ -0,0 +1,151 @@
package it.cavallium.dbengine.lucene;
import io.net5.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.function.Function;
public class LLFieldDocCodec implements LMDBCodec<LLFieldDoc> {
private enum FieldType {
FLOAT,
DOUBLE,
INT,
LONG;
public byte ordinalByte() {
return (byte) ordinal();
}
}
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, LLFieldDoc data) {
int fieldsDataSize = 0;
byte[] fieldTypes = new byte[data.fields().size()];
int fieldId = 0;
for (Object field : data.fields()) {
assert field != null;
if (field instanceof Float) {
fieldsDataSize += Float.BYTES;
fieldTypes[fieldId] = FieldType.FLOAT.ordinalByte();
} else if (field instanceof Double) {
fieldsDataSize += Double.BYTES;
fieldTypes[fieldId] = FieldType.DOUBLE.ordinalByte();
} else if (field instanceof Integer) {
fieldsDataSize += Integer.BYTES;
fieldTypes[fieldId] = FieldType.INT.ordinalByte();
} else if (field instanceof Long) {
fieldsDataSize += Long.BYTES;
fieldTypes[fieldId] = FieldType.LONG.ordinalByte();
} else {
throw new UnsupportedOperationException("Unsupported field type " + field.getClass());
}
fieldId++;
}
int size = Float.BYTES + Integer.BYTES + Integer.BYTES + Character.BYTES + (data.fields().size() + Byte.BYTES) + fieldsDataSize;
var buf = allocator.apply(size);
setScore(buf, data.score());
setDoc(buf, data.doc());
setShardIndex(buf, data.shardIndex());
setFieldsCount(buf, data.fields().size());
buf.writerIndex(size);
fieldId = 0;
for (Object field : data.fields()) {
assert field != null;
buf.writeByte(fieldTypes[fieldId]);
if (field instanceof Float val) {
buf.writeFloat(val);
} else if (field instanceof Double val) {
buf.writeDouble(val);
} else if (field instanceof Integer val) {
buf.writeInt(val);
} else if (field instanceof Long val) {
buf.writeLong(val);
} else {
throw new UnsupportedOperationException("Unsupported field type " + field.getClass());
}
fieldId++;
}
assert buf.writableBytes() == 0;
return buf.asReadOnly();
}
@Override
public LLFieldDoc deserialize(ByteBuf buf) {
var fieldsCount = getFieldsCount(buf);
ArrayList<Object> fields = new ArrayList<>(fieldsCount);
buf.readerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES + Character.BYTES);
for (char i = 0; i < fieldsCount; i++) {
fields.add(switch (FieldType.values()[buf.readByte()]) {
case FLOAT -> buf.readFloat();
case DOUBLE -> buf.readDouble();
case INT -> buf.readInt();
case LONG -> buf.readLong();
});
}
assert buf.readableBytes() == 0;
return new LLFieldDoc(getDoc(buf), getScore(buf), getShardIndex(buf), fields);
}
@Override
public int compare(LLFieldDoc hitA, LLFieldDoc hitB) {
if (hitA.score() == hitB.score()) {
if (hitA.doc() == hitB.doc()) {
return Integer.compare(hitA.shardIndex(), hitB.shardIndex());
} else {
return Integer.compare(hitB.doc(), hitA.doc());
}
} else {
return Float.compare(hitA.score(), hitB.score());
}
}
@Override
public int compareDirect(ByteBuf hitA, ByteBuf hitB) {
var scoreA = getScore(hitA);
var scoreB = getScore(hitB);
if (scoreA == scoreB) {
var docA = getDoc(hitA);
var docB = getDoc(hitB);
if (docA == docB) {
return Integer.compare(getShardIndex(hitA), getShardIndex(hitB));
} else {
return Integer.compare(docB, docA);
}
} else {
return Float.compare(scoreA, scoreB);
}
}
private static float getScore(ByteBuf hit) {
return hit.getFloat(0);
}
private static int getDoc(ByteBuf hit) {
return hit.getInt(Float.BYTES);
}
private static int getShardIndex(ByteBuf hit) {
return hit.getInt(Float.BYTES + Integer.BYTES);
}
private char getFieldsCount(ByteBuf hit) {
return hit.getChar(Float.BYTES + Integer.BYTES + Integer.BYTES);
}
private static void setScore(ByteBuf hit, float score) {
hit.setFloat(0, score);
}
private static void setDoc(ByteBuf hit, int doc) {
hit.setInt(Float.BYTES, doc);
}
private static void setShardIndex(ByteBuf hit, int shardIndex) {
hit.setInt(Float.BYTES + Integer.BYTES, shardIndex);
}
private void setFieldsCount(ByteBuf hit, int size) {
hit.setChar(Float.BYTES + Integer.BYTES + Integer.BYTES, (char) size);
}
}

View File

@ -0,0 +1,10 @@
package it.cavallium.dbengine.lucene;
import org.apache.lucene.search.ScoreDoc;
public record LLScoreDoc(int doc, float score, int shardIndex) implements LLDocElement {
public ScoreDoc toScoreDoc() {
return new ScoreDoc(doc, score, shardIndex);
}
}

View File

@ -0,0 +1,76 @@
package it.cavallium.dbengine.lucene;
import io.net5.buffer.ByteBuf;
import java.util.function.Function;
public class LLScoreDocCodec implements LMDBCodec<LLScoreDoc> {
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, LLScoreDoc data) {
var buf = allocator.apply(Float.BYTES + Integer.BYTES + Integer.BYTES);
setScore(buf, data.score());
setDoc(buf, data.doc());
setShardIndex(buf, data.shardIndex());
buf.writerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES);
return buf.asReadOnly();
}
@Override
public LLScoreDoc deserialize(ByteBuf buf) {
return new LLScoreDoc(getDoc(buf), getScore(buf), getShardIndex(buf));
}
@Override
public int compare(LLScoreDoc hitA, LLScoreDoc hitB) {
if (hitA.score() == hitB.score()) {
if (hitA.doc() == hitB.doc()) {
return Integer.compare(hitA.shardIndex(), hitB.shardIndex());
} else {
return Integer.compare(hitB.doc(), hitA.doc());
}
} else {
return Float.compare(hitA.score(), hitB.score());
}
}
@Override
public int compareDirect(ByteBuf hitA, ByteBuf hitB) {
var scoreA = getScore(hitA);
var scoreB = getScore(hitB);
if (scoreA == scoreB) {
var docA = getDoc(hitA);
var docB = getDoc(hitB);
if (docA == docB) {
return Integer.compare(getShardIndex(hitA), getShardIndex(hitB));
} else {
return Integer.compare(docB, docA);
}
} else {
return Float.compare(scoreA, scoreB);
}
}
private static float getScore(ByteBuf hit) {
return hit.getFloat(0);
}
private static int getDoc(ByteBuf hit) {
return hit.getInt(Float.BYTES);
}
private static int getShardIndex(ByteBuf hit) {
return hit.getInt(Float.BYTES + Integer.BYTES);
}
private static void setScore(ByteBuf hit, float score) {
hit.setFloat(0, score);
}
private static void setDoc(ByteBuf hit, int doc) {
hit.setInt(Float.BYTES, doc);
}
private static void setShardIndex(ByteBuf hit, int shardIndex) {
hit.setInt(Float.BYTES + Integer.BYTES, shardIndex);
}
}

View File

@ -0,0 +1,17 @@
package it.cavallium.dbengine.lucene;
import io.net5.buffer.ByteBuf;
import java.util.Comparator;
import java.util.function.Function;
public interface LMDBCodec<T> {
ByteBuf serialize(Function<Integer, ByteBuf> allocator, T data);
T deserialize(ByteBuf b);
int compare(T o1, T o2);
int compareDirect(ByteBuf o1, ByteBuf o2);
}

View File

@ -0,0 +1,403 @@
package it.cavallium.dbengine.lucene;
import static org.lmdbjava.DbiFlags.*;
import io.net5.buffer.ByteBuf;
import io.net5.buffer.PooledByteBufAllocator;
import io.net5.buffer.Unpooled;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.NotNull;
import org.lmdbjava.Cursor;
import org.lmdbjava.CursorIterable;
import org.lmdbjava.CursorIterable.KeyVal;
import org.lmdbjava.Dbi;
import org.lmdbjava.Env;
import org.lmdbjava.PutFlags;
import org.lmdbjava.Txn;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
public class LMDBPriorityQueue<T> implements PriorityQueue<T> {
private static final boolean FORCE_SYNC = false;
private static final boolean FORCE_THREAD_LOCAL = true;
private static final AtomicLong NEXT_LMDB_QUEUE_ID = new AtomicLong(0);
private static final ByteBuf EMPTY = Unpooled.directBuffer(1, 1).writeByte(1).asReadOnly();
private final AtomicBoolean closed = new AtomicBoolean();
private final Runnable onClose;
private final LMDBCodec<T> codec;
private final Env<ByteBuf> env;
private final Dbi<ByteBuf> lmdb;
private final Scheduler scheduler = Schedulers.newBoundedElastic(1,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, LMDBThread::new, Integer.MAX_VALUE);
private boolean writing;
private boolean iterating;
private Txn<ByteBuf> readTxn;
private Txn<ByteBuf> rwTxn;
private Cursor<ByteBuf> cur;
private boolean topValid = true;
private T top = null;
private long size = 0;
public LMDBPriorityQueue(LLTempLMDBEnv env, LMDBCodec<T> codec) {
this.onClose = env::decrementRef;
var name = "$queue_" + NEXT_LMDB_QUEUE_ID.getAndIncrement();
this.codec = codec;
this.env = env.getEnvAndIncrementRef();
this.lmdb = this.env.openDbi(name, codec::compareDirect, MDB_CREATE);
this.writing = true;
this.iterating = false;
if (FORCE_THREAD_LOCAL) {
this.rwTxn = null;
} else {
this.rwTxn = this.env.txnWrite();
}
this.readTxn = null;
this.cur = null;
}
private ByteBuf allocate(int size) {
return PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
}
private void switchToMode(boolean write, boolean wantCursor) {
if (iterating) {
throw new IllegalStateException("Tried to " + (write ? "write" : "read") + " while still iterating");
}
boolean changedMode = false;
if (write) {
if (!writing) {
changedMode = true;
writing = true;
if (cur != null) {
cur.close();
cur = null;
}
readTxn.close();
readTxn = null;
assert rwTxn == null;
rwTxn = env.txnWrite();
} else if (rwTxn == null) {
assert readTxn == null;
rwTxn = env.txnWrite();
}
} else {
if (writing) {
changedMode = true;
writing = false;
if (cur != null) {
cur.close();
cur = null;
}
if (rwTxn != null) {
rwTxn.commit();
rwTxn.close();
rwTxn = null;
}
if (FORCE_SYNC) {
env.sync(true);
}
assert rwTxn == null;
assert readTxn == null;
readTxn = env.txnRead();
}
}
if (cur == null) {
if (wantCursor) {
cur = lmdb.openCursor(Objects.requireNonNull(writing ? rwTxn : readTxn));
}
} else {
if (changedMode) {
cur.close();
cur = null;
}
}
}
private void endMode() {
if (FORCE_THREAD_LOCAL) {
if (cur != null) {
cur.close();
cur = null;
}
writing = true;
if (readTxn != null) {
readTxn.commit();
readTxn.close();
readTxn = null;
}
if (rwTxn != null) {
rwTxn.commit();
rwTxn.close();
rwTxn = null;
}
}
assert cur == null;
assert rwTxn == null;
assert readTxn == null;
}
private static void ensureThread() {
}
private static void ensureItThread() {
if (!(Thread.currentThread() instanceof LMDBThread)) {
throw new IllegalStateException("Must run in LMDB scheduler");
}
}
@Override
public void add(T element) {
ensureThread();
switchToMode(true, false);
var buf = codec.serialize(this::allocate, element);
try {
if (lmdb.put(rwTxn, buf, EMPTY, PutFlags.MDB_NOOVERWRITE)) {
if (++size == 1) {
topValid = true;
top = element;
} else {
topValid = false;
}
}
} finally {
endMode();
}
assert topSingleValid(element);
}
private boolean topSingleValid(T element) {
if (size == 1) {
var top = databaseTop();
return codec.compare(top, element) == 0;
} else {
return true;
}
}
@Override
public T top() {
ensureThread();
if (topValid) {
return top;
} else {
var top = databaseTop();
this.top = top;
topValid = true;
return top;
}
}
private T databaseTop() {
ensureThread();
switchToMode(false, true);
try {
if (cur.first()) {
return codec.deserialize(cur.key());
} else {
return null;
}
} finally {
endMode();
}
}
@Override
public T pop() {
ensureThread();
switchToMode(true, true);
try {
if (cur.first()) {
var data = codec.deserialize(cur.key());
if (--size == 0) {
topValid = true;
top = null;
} else {
topValid = false;
}
cur.delete();
return data;
} else {
return null;
}
} finally {
endMode();
}
}
@Override
public void updateTop() {
// do nothing
}
@Override
public void updateTop(T newTop) {
ensureThread();
assert codec.compare(newTop, databaseTop()) == 0;
}
@Override
public long size() {
ensureThread();
return size;
}
@Override
public void clear() {
ensureThread();
switchToMode(true, false);
try {
lmdb.drop(rwTxn);
topValid = true;
top = null;
size = 0;
} finally {
endMode();
}
}
@Override
public boolean remove(@NotNull T element) {
ensureThread();
Objects.requireNonNull(element);
switchToMode(true, false);
var buf = codec.serialize(this::allocate, element);
try {
var deleted = lmdb.delete(rwTxn, buf);
if (deleted) {
if (topValid && codec.compare(top, element) == 0) {
if (--size == 0) {
top = null;
}
} else {
if (--size == 0) {
topValid = true;
top = null;
} else {
topValid = false;
}
}
}
return deleted;
} finally {
endMode();
}
}
@Override
public Flux<T> iterate() {
return Flux
.<T, Tuple2<CursorIterable<ByteBuf>, Iterator<KeyVal<ByteBuf>>>>generate(() -> {
ensureItThread();
switchToMode(false, false);
iterating = true;
if (cur != null) {
cur.close();
cur = null;
}
CursorIterable<ByteBuf> cit = lmdb.iterate(readTxn);
var it = cit.iterator();
return Tuples.of(cit, it);
}, (t, sink) -> {
ensureItThread();
var it = t.getT2();
if (it.hasNext()) {
sink.next(codec.deserialize(it.next().key()));
} else {
sink.complete();
}
return t;
}, t -> {
ensureItThread();
var cit = t.getT1();
cit.close();
iterating = false;
endMode();
})
.subscribeOn(scheduler, false);
}
@Override
public Flux<T> iterate(long skips) {
return Flux
.<T, Tuple3<CursorIterable<ByteBuf>, Iterator<KeyVal<ByteBuf>>, Long>>generate(() -> {
ensureItThread();
switchToMode(false, false);
iterating = true;
if (cur != null) {
cur.close();
cur = null;
}
CursorIterable<ByteBuf> cit = lmdb.iterate(readTxn);
var it = cit.iterator();
return Tuples.of(cit, it, skips);
}, (t, sink) -> {
ensureItThread();
var it = t.getT2();
var remainingSkips = t.getT3();
while (remainingSkips-- > 0 && it.hasNext()) {
it.next();
}
if (it.hasNext()) {
sink.next(codec.deserialize(it.next().key()));
} else {
sink.complete();
}
return t.getT3() == 0L ? t : t.mapT3(s -> 0L);
}, t -> {
ensureItThread();
var cit = t.getT1();
cit.close();
iterating = false;
endMode();
})
.subscribeOn(scheduler, false);
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
ensureThread();
if (cur != null) {
cur.close();
}
if (rwTxn != null) {
rwTxn.close();
}
if (readTxn != null) {
readTxn.close();
}
try (var txn = env.txnWrite()) {
lmdb.drop(txn, true);
txn.commit();
}
lmdb.close();
} finally {
onClose.run();
}
}
scheduler.dispose();
}
public Scheduler getScheduler() {
return scheduler;
}
}

View File

@ -0,0 +1,8 @@
package it.cavallium.dbengine.lucene;
public class LMDBThread extends Thread {
public LMDBThread(Runnable r) {
super(r);
}
}

View File

@ -406,7 +406,8 @@ public class LuceneUtils {
}
}
public static TopDocs mergeTopDocs(Sort sort,
public static TopDocs mergeTopDocs(
@Nullable Sort sort,
@Nullable Integer startN,
@Nullable Integer topN,
TopDocs[] topDocs,

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package it.cavallium.dbengine.lucene;
import java.util.Objects;
import java.util.concurrent.atomic.LongAccumulator;
/** Maintains the maximum score and its corresponding document id concurrently */
public final class MaxScoreAccumulator {
// we use 2^10-1 to check the remainder with a bitwise operation
static final int DEFAULT_INTERVAL = 0x3ff;
// scores are always positive
final LongAccumulator acc = new LongAccumulator(Long::max, Long.MIN_VALUE);
// non-final and visible for tests
public long modInterval;
public MaxScoreAccumulator() {
this.modInterval = DEFAULT_INTERVAL;
}
public void accumulate(int docID, float score) {
assert docID >= 0 && score >= 0;
long encode = (((long) Float.floatToIntBits(score)) << 32) | docID;
acc.accumulate(encode);
}
public DocAndScore get() {
long value = acc.get();
if (value == Long.MIN_VALUE) {
return null;
}
float score = Float.intBitsToFloat((int) (value >> 32));
int docID = (int) value;
return new DocAndScore(docID, score);
}
public static class DocAndScore implements Comparable<DocAndScore> {
public final int docID;
public final float score;
public DocAndScore(int docID, float score) {
this.docID = docID;
this.score = score;
}
@Override
public int compareTo(DocAndScore o) {
int cmp = Float.compare(score, o.score);
if (cmp == 0) {
return Integer.compare(docID, o.docID);
}
return cmp;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DocAndScore result = (DocAndScore) o;
return docID == result.docID && Float.compare(result.score, score) == 0;
}
@Override
public int hashCode() {
return Objects.hash(docID, score);
}
@Override
public String toString() {
return "DocAndScore{" + "docID=" + docID + ", score=" + score + '}';
}
}
}

View File

@ -0,0 +1,30 @@
package it.cavallium.dbengine.lucene;
import org.apache.lucene.search.TotalHits;
import reactor.core.publisher.Flux;
public class PqFullDocs<T extends LLDocElement> implements FullDocs<T> {
private final PriorityQueue<T> pq;
private final TotalHits totalHits;
public PqFullDocs(PriorityQueue<T> pq, TotalHits totalHits) {
this.pq = pq;
this.totalHits = totalHits;
}
@Override
public Flux<T> iterate() {
return pq.iterate();
}
@Override
public Flux<T> iterate(long skips) {
return pq.iterate(skips);
}
@Override
public TotalHits totalHits() {
return totalHits;
}
}

View File

@ -0,0 +1,64 @@
package it.cavallium.dbengine.lucene;
import java.io.Closeable;
import java.util.Iterator;
public interface PriorityQueue<T> extends ResourceIterable<T>, Closeable {
/**
* Adds an Object to a PriorityQueue in log(size) time. If one tries to add more objects than maxSize from initialize
* an {@link ArrayIndexOutOfBoundsException} is thrown.
*/
void add(T element);
/**
* Returns the least element of the PriorityQueue in constant time.
*/
T top();
/**
* Removes and returns the least element of the PriorityQueue in log(size) time.
*/
T pop();
/**
* Should be called when the Object at top changes values. Still log(n) worst case, but it's at least twice as fast
* to
*
* <pre class="prettyprint">
* pq.top().change();
* pq.updateTop();
* </pre>
* <p>
* instead of
*
* <pre class="prettyprint">
* o = pq.pop();
* o.change();
* pq.push(o);
* </pre>
*/
void updateTop();
/**
* Replace the top of the pq with {@code newTop} and run {@link #updateTop()}.
*/
void updateTop(T newTop);
/**
* Returns the number of elements currently stored in the PriorityQueue.
*/
long size();
/**
* Removes all entries from the PriorityQueue.
*/
void clear();
/**
* Removes an existing element currently stored in the PriorityQueue. Cost is linear with the size of the queue. (A
* specialization of PriorityQueue which tracks element positions would provide a constant remove time but the
* trade-off would be extra cost to all additions/insertions)
*/
boolean remove(T element);
}

View File

@ -74,11 +74,7 @@ public class RandomFieldComparator extends FieldComparator<Float> implements Lea
return scorer.docID();
}
};
if (!(scorer instanceof ScoreCachingWrappingScorer)) {
this.scorer = new ScoreCachingWrappingScorer(randomizedScorer);
} else {
this.scorer = randomizedScorer;
}
this.scorer = ScoreCachingWrappingScorer.wrap(randomizedScorer);
}
@SuppressWarnings("RedundantCast")

View File

@ -0,0 +1,24 @@
package it.cavallium.dbengine.lucene;
import java.util.Iterator;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
public interface ResourceIterable<T> {
/**
* Iterate this PriorityQueue
*/
Flux<T> iterate();
/**
* Iterate this PriorityQueue
*/
default Flux<T> iterate(long skips) {
if (skips == 0) {
return iterate();
} else {
return iterate().skip(skips);
}
}
}

View File

@ -0,0 +1,18 @@
package it.cavallium.dbengine.lucene;
import java.util.Comparator;
import org.apache.lucene.search.ScoreDoc;
class ScoreDocPartialComparator implements Comparator<ScoreDoc> {
public static final Comparator<ScoreDoc> SCORE_DOC_PARTIAL_COMPARATOR = new ScoreDocPartialComparator();
@Override
public int compare(ScoreDoc hitA, ScoreDoc hitB) {
if (hitA.score == hitB.score) {
return Integer.compare(hitB.doc, hitA.doc);
} else {
return Float.compare(hitA.score, hitB.score);
}
}
}

View File

@ -0,0 +1,21 @@
package it.cavallium.dbengine.lucene;
import java.util.Comparator;
class ScoreDocShardComparator implements Comparator<LLScoreDoc> {
public static final Comparator<LLScoreDoc> SCORE_DOC_SHARD_COMPARATOR = new ScoreDocShardComparator();
@Override
public int compare(LLScoreDoc hitA, LLScoreDoc hitB) {
if (hitA.score() == hitB.score()) {
if (hitA.doc() == hitB.doc()) {
return Integer.compare(hitA.shardIndex(), hitB.shardIndex());
} else {
return Integer.compare(hitB.doc(), hitA.doc());
}
} else {
return Float.compare(hitA.score(), hitB.score());
}
}
}

View File

@ -0,0 +1,68 @@
package it.cavallium.dbengine.lucene;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import it.cavallium.dbengine.client.Indicizer;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLItem;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import java.util.LinkedList;
import java.util.Map;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
public class StringIndicizer extends Indicizer<String, String> {
@Override
public @NotNull Mono<LLDocument> toDocument(@NotNull String key, @NotNull String value) {
return Mono.fromCallable(() -> {
var fields = new LinkedList<LLItem>();
fields.add(LLItem.newStringField("uid", key, Field.Store.YES));
fields.add(LLItem.newTextField("text", value, Store.NO));
@SuppressWarnings("UnstableApiUsage")
var numInt = Ints.tryParse(value);
if (numInt != null) {
fields.add(LLItem.newIntPoint("intpoint", numInt));
fields.add(LLItem.newSortedNumericDocValuesField("intsort", numInt));
}
@SuppressWarnings("UnstableApiUsage")
var numLong = Longs.tryParse(value);
if (numLong != null) {
fields.add(LLItem.newLongPoint("longpoint", numLong));
fields.add(LLItem.newSortedNumericDocValuesField("longsort", numLong));
}
return new LLDocument(fields.toArray(LLItem[]::new));
});
}
@Override
public @NotNull LLTerm toIndex(@NotNull String key) {
return new LLTerm("uid", key);
}
@Override
public @NotNull String getKeyFieldName() {
return "uid";
}
@Override
public @NotNull String getKey(String key) {
return key;
}
@Override
public IndicizerAnalyzers getPerFieldAnalyzer() {
return IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple);
}
@Override
public IndicizerSimilarities getPerFieldSimilarity() {
return IndicizerSimilarities.of(TextFieldsSimilarity.Boolean);
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package it.cavallium.dbengine.lucene.collector;
import it.cavallium.dbengine.lucene.EmptyPriorityQueue;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLDocElement;
import it.cavallium.dbengine.lucene.PqFullDocs;
import it.cavallium.dbengine.lucene.PriorityQueue;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
/**
* A base class for all collectors that return a {@link TopDocs} output. This collector allows easy
* extension by providing a single constructor which accepts a {@link PriorityQueue} as well as
* protected members for that priority queue and a counter of the number of total hits.<br>
* Extending classes can override any of the methods to provide their own implementation, as well as
* avoid the use of the priority queue entirely by passing null to {@link
* #FullDocsCollector(PriorityQueue)}. In that case however, you might want to consider overriding
* all methods, in order to avoid a NullPointerException.
*/
public abstract class FullDocsCollector<T extends LLDocElement> implements Collector, AutoCloseable {
/**
* This is used in case topDocs() is called with illegal parameters, or there simply aren't
* (enough) results.
*/
private static final FullDocs<?> EMPTY_FULLDOCS =
new PqFullDocs(new EmptyPriorityQueue<>(), new TotalHits(0, TotalHits.Relation.EQUAL_TO));
/**
* The priority queue which holds the top documents. Note that different implementations of
* PriorityQueue give different meaning to 'top documents'. HitQueue for example aggregates the
* top scoring documents, while other PQ implementations may hold documents sorted by other
* criteria.
*/
protected final PriorityQueue<T> pq;
/** The total number of documents that the collector encountered. */
protected int totalHits;
/** Whether {@link #totalHits} is exact or a lower bound. */
protected TotalHits.Relation totalHitsRelation = TotalHits.Relation.EQUAL_TO;
protected FullDocsCollector(PriorityQueue<T> pq) {
this.pq = pq;
}
/** The total number of documents that matched this query. */
public int getTotalHits() {
return totalHits;
}
/** Returns the top docs that were collected by this collector. */
public FullDocs<T> fullDocs() {
return new PqFullDocs<>(this.pq, new TotalHits(totalHits, totalHitsRelation));
}
@Override
public void close() throws Exception {
pq.close();
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package it.cavallium.dbengine.lucene.collector;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.search.ScoreMode;
/** Used for defining custom algorithms to allow searches to early terminate */
abstract class HitsThresholdChecker {
/** Implementation of HitsThresholdChecker which allows global hit counting */
private static class GlobalHitsThresholdChecker extends HitsThresholdChecker {
private final int totalHitsThreshold;
private final AtomicLong globalHitCount;
public GlobalHitsThresholdChecker(int totalHitsThreshold) {
if (totalHitsThreshold < 0) {
throw new IllegalArgumentException(
"totalHitsThreshold must be >= 0, got " + totalHitsThreshold);
}
this.totalHitsThreshold = totalHitsThreshold;
this.globalHitCount = new AtomicLong();
}
@Override
public void incrementHitCount() {
globalHitCount.incrementAndGet();
}
@Override
public boolean isThresholdReached() {
return globalHitCount.getAcquire() > totalHitsThreshold;
}
@Override
public ScoreMode scoreMode() {
return totalHitsThreshold == Integer.MAX_VALUE ? ScoreMode.COMPLETE : ScoreMode.TOP_SCORES;
}
@Override
public int getHitsThreshold() {
return totalHitsThreshold;
}
}
/** Default implementation of HitsThresholdChecker to be used for single threaded execution */
private static class LocalHitsThresholdChecker extends HitsThresholdChecker {
private final int totalHitsThreshold;
private int hitCount;
public LocalHitsThresholdChecker(int totalHitsThreshold) {
if (totalHitsThreshold < 0) {
throw new IllegalArgumentException(
"totalHitsThreshold must be >= 0, got " + totalHitsThreshold);
}
this.totalHitsThreshold = totalHitsThreshold;
}
@Override
public void incrementHitCount() {
++hitCount;
}
@Override
public boolean isThresholdReached() {
return hitCount > totalHitsThreshold;
}
@Override
public ScoreMode scoreMode() {
return totalHitsThreshold == Integer.MAX_VALUE ? ScoreMode.COMPLETE : ScoreMode.TOP_SCORES;
}
@Override
public int getHitsThreshold() {
return totalHitsThreshold;
}
}
/*
* Returns a threshold checker that is useful for single threaded searches
*/
public static HitsThresholdChecker create(final int totalHitsThreshold) {
return new LocalHitsThresholdChecker(totalHitsThreshold);
}
/*
* Returns a threshold checker that is based on a shared counter
*/
public static HitsThresholdChecker createShared(final int totalHitsThreshold) {
return new GlobalHitsThresholdChecker(totalHitsThreshold);
}
public abstract void incrementHitCount();
public abstract ScoreMode scoreMode();
public abstract int getHitsThreshold();
public abstract boolean isThresholdReached();
}

View File

@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package it.cavallium.dbengine.lucene.collector;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLScoreDoc;
import it.cavallium.dbengine.lucene.LLScoreDocCodec;
import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
import it.cavallium.dbengine.lucene.MaxScoreAccumulator;
import java.io.IOException;
import java.util.Collection;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import it.cavallium.dbengine.lucene.MaxScoreAccumulator.DocAndScore;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.TotalHits;
/**
* A {@link Collector} implementation that collects the top-scoring hits, returning them as a {@link
* FullDocs}. This is used by {@link IndexSearcher} to implement {@link FullDocs}-based search. Hits
* are sorted by score descending and then (when the scores are tied) docID ascending. When you
* create an instance of this collector you should know in advance whether documents are going to be
* collected in doc Id order or not.
*
* <p><b>NOTE</b>: The values {@link Float#NaN} and {@link Float#NEGATIVE_INFINITY} are not valid
* scores. This collector will not properly collect hits with such scores.
*/
public abstract class LMDBFullScoreDocCollector extends FullDocsCollector<LLScoreDoc> {
/** Scorable leaf collector */
public abstract static class ScorerLeafCollector implements LeafCollector {
protected Scorable scorer;
@Override
public void setScorer(Scorable scorer) throws IOException {
this.scorer = scorer;
}
}
private static class SimpleLMDBFullScoreDocCollector extends LMDBFullScoreDocCollector {
SimpleLMDBFullScoreDocCollector(LLTempLMDBEnv env,
HitsThresholdChecker hitsThresholdChecker, MaxScoreAccumulator minScoreAcc) {
super(env, hitsThresholdChecker, minScoreAcc);
}
@Override
public LeafCollector getLeafCollector(LeafReaderContext context) {
// reset the minimum competitive score
docBase = context.docBase;
return new ScorerLeafCollector() {
@Override
public void setScorer(Scorable scorer) throws IOException {
super.setScorer(scorer);
minCompetitiveScore = 0f;
updateMinCompetitiveScore(scorer);
if (minScoreAcc != null) {
updateGlobalMinCompetitiveScore(scorer);
}
}
@Override
public void collect(int doc) throws IOException {
float score = scorer.score();
// This collector relies on the fact that scorers produce positive values:
assert score >= 0; // NOTE: false for NaN
totalHits++;
hitsThresholdChecker.incrementHitCount();
if (minScoreAcc != null && (totalHits & minScoreAcc.modInterval) == 0) {
updateGlobalMinCompetitiveScore(scorer);
}
var pqTop = pq.top();
if (pqTop != null) {
if (score <= pqTop.score()) {
if (totalHitsRelation == TotalHits.Relation.EQUAL_TO) {
// we just reached totalHitsThreshold, we can start setting the min
// competitive score now
updateMinCompetitiveScore(scorer);
}
// Since docs are returned in-order (i.e., increasing doc Id), a document
// with equal score to pqTop.score cannot compete since HitQueue favors
// documents with lower doc Ids. Therefore reject those docs too.
return;
}
}
pq.add(new LLScoreDoc(doc + docBase, score, -1));
pq.updateTop();
updateMinCompetitiveScore(scorer);
}
};
}
}
/**
* Creates a new {@link LMDBFullScoreDocCollector} given the number of hits to collect and the number
* of hits to count accurately.
*
* <p><b>NOTE</b>: If the total hit count of the top docs is less than or exactly {@code
* totalHitsThreshold} then this value is accurate. On the other hand, if the {@link
* FullDocs#totalHits} value is greater than {@code totalHitsThreshold} then its value is a lower
* bound of the hit count. A value of {@link Integer#MAX_VALUE} will make the hit count accurate
* but will also likely make query processing slower.
*
* <p><b>NOTE</b>: The instances returned by this method pre-allocate a full array of length
* <code>numHits</code>, and fill the array with sentinel objects.
*/
public static LMDBFullScoreDocCollector create(LLTempLMDBEnv env, int totalHitsThreshold) {
return create(env, HitsThresholdChecker.create(totalHitsThreshold), null);
}
static LMDBFullScoreDocCollector create(
LLTempLMDBEnv env,
HitsThresholdChecker hitsThresholdChecker,
MaxScoreAccumulator minScoreAcc) {
if (hitsThresholdChecker == null) {
throw new IllegalArgumentException("hitsThresholdChecker must be non null");
}
return new SimpleLMDBFullScoreDocCollector(env, hitsThresholdChecker, minScoreAcc);
}
/**
* Create a CollectorManager which uses a shared hit counter to maintain number of hits and a
* shared {@link MaxScoreAccumulator} to propagate the minimum score accross segments
*/
public static CollectorManager<LMDBFullScoreDocCollector, FullDocs<LLScoreDoc>> createSharedManager(
LLTempLMDBEnv env,
int totalHitsThreshold) {
return new CollectorManager<>() {
private final HitsThresholdChecker hitsThresholdChecker =
HitsThresholdChecker.createShared(totalHitsThreshold);
private final MaxScoreAccumulator minScoreAcc = new MaxScoreAccumulator();
@Override
public LMDBFullScoreDocCollector newCollector() {
return LMDBFullScoreDocCollector.create(env, hitsThresholdChecker, minScoreAcc);
}
@Override
public FullDocs<LLScoreDoc> reduce(Collection<LMDBFullScoreDocCollector> collectors) {
@SuppressWarnings("unchecked")
final FullDocs<LLScoreDoc>[] fullDocs = new FullDocs[collectors.size()];
int i = 0;
for (LMDBFullScoreDocCollector collector : collectors) {
fullDocs[i++] = collector.fullDocs();
}
return FullDocs.merge(null, fullDocs);
}
};
}
int docBase;
final HitsThresholdChecker hitsThresholdChecker;
final MaxScoreAccumulator minScoreAcc;
float minCompetitiveScore;
// prevents instantiation
LMDBFullScoreDocCollector(LLTempLMDBEnv env,
HitsThresholdChecker hitsThresholdChecker, MaxScoreAccumulator minScoreAcc) {
super(new LMDBPriorityQueue<>(env, new LLScoreDocCodec()));
assert hitsThresholdChecker != null;
this.hitsThresholdChecker = hitsThresholdChecker;
this.minScoreAcc = minScoreAcc;
}
@Override
public ScoreMode scoreMode() {
return hitsThresholdChecker.scoreMode();
}
protected void updateGlobalMinCompetitiveScore(Scorable scorer) throws IOException {
assert minScoreAcc != null;
DocAndScore maxMinScore = minScoreAcc.get();
if (maxMinScore != null) {
// since we tie-break on doc id and collect in doc id order we can require
// the next float if the global minimum score is set on a document id that is
// smaller than the ids in the current leaf
float score =
docBase > maxMinScore.docID ? Math.nextUp(maxMinScore.score) : maxMinScore.score;
if (score > minCompetitiveScore) {
assert hitsThresholdChecker.isThresholdReached();
scorer.setMinCompetitiveScore(score);
minCompetitiveScore = score;
totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO;
}
}
}
protected void updateMinCompetitiveScore(Scorable scorer) throws IOException {
var pqTop = pq.top();
if (hitsThresholdChecker.isThresholdReached()
&& pqTop != null
&& pqTop.score() != Float.NEGATIVE_INFINITY) { // -Infinity is the score of sentinels
// since we tie-break on doc id and collect in doc id order, we can require
// the next float
float localMinScore = Math.nextUp(pqTop.score());
if (localMinScore > minCompetitiveScore) {
scorer.setMinCompetitiveScore(localMinScore);
totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO;
minCompetitiveScore = localMinScore;
if (minScoreAcc != null) {
// we don't use the next float but we register the document
// id so that other leaves can require it if they are after
// the current maximum
minScoreAcc.accumulate(pqTop.doc(), pqTop.score());
}
}
}
}
}

View File

@ -1,4 +1,4 @@
package it.cavallium.dbengine.lucene;
package it.cavallium.dbengine.lucene.collector;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.ALLOW_UNSCORED_PAGINATION_MODE;

View File

@ -35,6 +35,12 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher {
true);
}
}
@Override
public String getName() {
return "adaptivelocal";
}
public Mono<Send<LuceneSearchResult>> transformedCollect(Mono<Send<LLIndexSearcher>> indexSearcher,
LocalQueryParams queryParams,
String keyFieldName,

View File

@ -4,15 +4,16 @@ import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.io.Closeable;
import java.io.IOException;
import reactor.core.publisher.Mono;
public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher {
public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher, Closeable {
private static final LuceneMultiSearcher count
= new SimpleUnsortedUnscoredLuceneMultiSearcher(new CountLuceneLocalSearcher());
private static final LuceneMultiSearcher scoredSimple
= new ScoredSimpleLuceneShardSearcher();
private static final LuceneMultiSearcher scoredSimple = new ScoredSimpleLuceneMultiSearcher();
private static final LuceneMultiSearcher unsortedUnscoredPaged
= new SimpleUnsortedUnscoredLuceneMultiSearcher(new SimpleLuceneLocalSearcher());
@ -20,6 +21,12 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher {
private static final LuceneMultiSearcher unsortedUnscoredContinuous
= new UnsortedUnscoredContinuousLuceneMultiSearcher();
private final UnsortedScoredFullLuceneMultiSearcher scoredFull;
public AdaptiveLuceneMultiSearcher() throws IOException {
scoredFull = new UnsortedScoredFullLuceneMultiSearcher();
}
@Override
public Mono<Send<LuceneSearchResult>> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
@ -47,7 +54,11 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher {
if (queryParams.limit() == 0) {
return count.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
} else if (queryParams.isSorted() || queryParams.isScored()) {
return scoredSimple.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
if (queryParams.isSorted() || realLimit <= (long) queryParams.pageLimits().getPageLimit(0)) {
return scoredSimple.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
} else {
return scoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
}
} else if (realLimit <= (long) queryParams.pageLimits().getPageLimit(0)) {
// Run single-page searches using the paged multi searcher
return unsortedUnscoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
@ -57,4 +68,14 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher {
}
}, true);
}
@Override
public void close() throws IOException {
scoredFull.close();
}
@Override
public String getName() {
return "adaptivemulti";
}
}

View File

@ -0,0 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import reactor.core.publisher.Flux;
record CalculatedResults(TotalHitsCount totalHitsCount, Flux<LLKeyScore> firstPageHitsFlux) {}

View File

@ -42,4 +42,9 @@ public class CountLuceneLocalSearcher implements LuceneLocalSearcher {
.map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), null).send())
.doOnDiscard(Send.class, Send::close);
}
@Override
public String getName() {
return "count";
}
}

View File

@ -16,4 +16,10 @@ public interface LuceneLocalSearcher {
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer);
/**
* Get the name of this searcher type
* @return searcher type name
*/
String getName();
}

View File

@ -1,34 +1,38 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT;
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLLocalGroupedReactiveRocksIterator;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher {
public class ScoredSimpleLuceneMultiSearcher implements LuceneMultiSearcher {
protected static final Logger logger = LoggerFactory.getLogger(ScoredSimpleLuceneShardSearcher.class);
protected static final Logger logger = LoggerFactory.getLogger(ScoredSimpleLuceneMultiSearcher.class);
public ScoredSimpleLuceneShardSearcher() {
public ScoredSimpleLuceneMultiSearcher() {
}
@Override
@ -64,11 +68,7 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher {
}
private Sort getSort(LocalQueryParams queryParams) {
Sort luceneSort = queryParams.sort();
if (luceneSort == null) {
luceneSort = Sort.RELEVANCE;
}
return luceneSort;
return queryParams.sort();
}
/**
@ -175,8 +175,8 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher {
if (resultsOffset < 0) {
throw new IndexOutOfBoundsException(resultsOffset);
}
if ((s.pageIndex() == 0 || s.last() != null) && s.remainingLimit() > 0) {
var sort = getSort(queryParams);
if (s.pageIndex() == 0 || (s.last() != null && s.remainingLimit() > 0)) {
@Nullable var sort = getSort(queryParams);
var pageLimit = pageLimits.getPageLimit(s.pageIndex());
var after = (FieldDoc) s.last();
var totalHitsThreshold = LuceneUtils.totalHitsThreshold();
@ -211,4 +211,9 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher {
}))
);
}
@Override
public String getName() {
return "scoredsimplemulti";
}
}

View File

@ -18,6 +18,7 @@ import reactor.core.scheduler.Schedulers;
public class ScoringShardsCollectorManager implements CollectorManager<TopFieldCollector, TopDocs> {
@Nullable
private final Sort sort;
private final int numHits;
private final FieldDoc after;
@ -26,7 +27,7 @@ public class ScoringShardsCollectorManager implements CollectorManager<TopFieldC
private final @Nullable Integer topN;
private final CollectorManager<TopFieldCollector, TopFieldDocs> sharedCollectorManager;
public ScoringShardsCollectorManager(final Sort sort,
public ScoringShardsCollectorManager(@Nullable final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold,
@ -35,7 +36,7 @@ public class ScoringShardsCollectorManager implements CollectorManager<TopFieldC
this(sort, numHits, after, totalHitsThreshold, (Integer) startN, (Integer) topN);
}
public ScoringShardsCollectorManager(final Sort sort,
public ScoringShardsCollectorManager(@Nullable final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold,
@ -43,14 +44,14 @@ public class ScoringShardsCollectorManager implements CollectorManager<TopFieldC
this(sort, numHits, after, totalHitsThreshold, (Integer) startN, (Integer) 2147483630);
}
public ScoringShardsCollectorManager(final Sort sort,
public ScoringShardsCollectorManager(@Nullable final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold) {
this(sort, numHits, after, totalHitsThreshold, null, null);
}
private ScoringShardsCollectorManager(final Sort sort,
private ScoringShardsCollectorManager(@Nullable final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold,
@ -68,7 +69,7 @@ public class ScoringShardsCollectorManager implements CollectorManager<TopFieldC
} else {
this.topN = topN;
}
this.sharedCollectorManager = TopFieldCollector.createSharedManager(sort, numHits, after, totalHitsThreshold);
this.sharedCollectorManager = TopFieldCollector.createSharedManager(sort == null ? Sort.RELEVANCE : sort, numHits, after, totalHitsThreshold);
}
@Override

View File

@ -16,11 +16,14 @@ import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInpu
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
@ -64,6 +67,11 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
false);
}
@Override
public String getName() {
return "simplelocal";
}
/**
* Get the pagination info
*/
@ -84,9 +92,12 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
var limit = paginationInfo.totalLimit();
var pagination = !paginationInfo.forceSinglePage();
var resultsOffset = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset());
var currentPageInfo = new CurrentPageInfo(null, limit, 0);
return Mono
.fromSupplier(() -> new CurrentPageInfo(null, limit, 0))
.handle((s, sink) -> this.searchPageSync(queryParams, indexSearchers, pagination, resultsOffset, s, sink));
.just(currentPageInfo)
.<PageData>handle((s, sink) -> this.searchPageSync(queryParams, indexSearchers, pagination, resultsOffset, s, sink))
//defaultIfEmpty(new PageData(new TopDocs(new TotalHits(0, Relation.EQUAL_TO), new ScoreDoc[0]), currentPageInfo))
.single();
}
/**
@ -108,7 +119,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
CurrentPageInfo nextPageInfo = firstPageData.nextPageInfo();
return new FirstPageResults(totalHitsCount, firstPageHitsFlux, nextPageInfo);
});
}).single();
}
private Mono<Send<LuceneSearchResult>> computeOtherResults(Mono<FirstPageResults> firstResultMono,
@ -125,7 +136,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
Flux<LLKeyScore> combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux);
return new LuceneSearchResult(totalHitsCount, combinedFlux, onClose).send();
});
}).single();
}
/**
@ -162,7 +173,18 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
throw new IndexOutOfBoundsException(resultsOffset);
}
var currentPageLimit = queryParams.pageLimits().getPageLimit(s.pageIndex());
if ((s.pageIndex() == 0 || s.last() != null) && s.remainingLimit() > 0) {
if (s.pageIndex() == 0 && s.remainingLimit() == 0) {
int count;
try {
count = indexSearchers.get(0).count(queryParams.query());
} catch (IOException e) {
sink.error(e);
return EMPTY_STATUS;
}
var nextPageInfo = new CurrentPageInfo(null, 0, 1);
sink.next(new PageData(new TopDocs(new TotalHits(count, Relation.EQUAL_TO), new ScoreDoc[0]), nextPageInfo));
return EMPTY_STATUS;
} else if (s.pageIndex() == 0 || (s.last() != null && s.remainingLimit() > 0)) {
TopDocs pageTopDocs;
try {
TopDocsCollector<ScoreDoc> collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(),

View File

@ -100,4 +100,9 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea
queryParams.scoreMode()
);
}
@Override
public String getName() {
return "simpleunsortedunscoredmulti";
}
}

View File

@ -2,28 +2,13 @@ package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.ALLOW_UNSCORED_PAGINATION_MODE;
import it.cavallium.dbengine.lucene.UnscoredCollector;
import java.io.IOException;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.misc.search.DiversifiedTopDocsCollector;
import org.apache.lucene.search.BulkScorer;
import org.apache.lucene.search.Collector;
import it.cavallium.dbengine.lucene.collector.UnscoredCollector;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.HitQueue;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopScoreDocCollector;
import org.apache.lucene.search.TotalHits.Relation;
import reactor.core.scheduler.Schedulers;
class TopDocsSearcher {

View File

@ -0,0 +1,120 @@
package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLScoreDoc;
import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Sort;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class UnsortedScoredFullLuceneMultiSearcher implements LuceneMultiSearcher, Closeable {
protected static final Logger logger = LoggerFactory.getLogger(UnsortedScoredFullLuceneMultiSearcher.class);
private final LLTempLMDBEnv env;
public UnsortedScoredFullLuceneMultiSearcher() throws IOException {
this.env = new LLTempLMDBEnv();
}
@Override
public Mono<Send<LuceneSearchResult>> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
LLSearchTransformer transformer) {
Mono<LocalQueryParams> queryParamsMono;
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono
.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true);
}
return queryParamsMono.flatMap(queryParams2 -> {
Objects.requireNonNull(queryParams2.scoreMode(), "ScoreMode must not be null");
if (queryParams2.sort() != null && queryParams2.sort() != Sort.RELEVANCE) {
throw new IllegalArgumentException(UnsortedScoredFullLuceneMultiSearcher.this.getClass().getSimpleName()
+ " doesn't support sorted queries");
}
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this
// Search results
.search(indexSearchers.shards(), queryParams2)
// Compute the results
.transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers,
keyFieldName, queryParams2))
// Ensure that one LuceneSearchResult is always returned
.single(),
false);
});
}
/**
* Search effectively the raw results
*/
private Mono<FullDocs<LLScoreDoc>> search(Iterable<IndexSearcher> indexSearchers,
LocalQueryParams queryParams) {
return Mono
.fromCallable(() -> {
LLUtils.ensureBlocking();
var totalHitsThreshold = LuceneUtils.totalHitsThreshold();
return LMDBFullScoreDocCollector.createSharedManager(env, totalHitsThreshold);
})
.flatMap(sharedManager -> Flux
.fromIterable(indexSearchers)
.flatMap(shard -> Mono.fromCallable(() -> {
LLUtils.ensureBlocking();
var collector = sharedManager.newCollector();
shard.search(queryParams.query(), collector);
return collector;
}))
.collectList()
.flatMap(collectors -> Mono.fromCallable(() -> {
LLUtils.ensureBlocking();
return sharedManager.reduce(collectors);
}))
);
}
/**
* Compute the results, extracting useful data
*/
private Mono<Send<LuceneSearchResult>> computeResults(Mono<FullDocs<LLScoreDoc>> dataMono,
LLIndexSearchers indexSearchers,
String keyFieldName,
LocalQueryParams queryParams) {
return dataMono.map(data -> {
var totalHitsCount = LuceneUtils.convertTotalHitsCount(data.totalHits());
Flux<LLKeyScore> hitsFlux = LuceneUtils
.convertHits(data.iterate(queryParams.offset()).map(LLScoreDoc::toScoreDoc),
indexSearchers.shards(), keyFieldName, true)
.take(queryParams.limit(), true);
return new LuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers::close).send();
});
}
@Override
public void close() throws IOException {
env.close();
}
@Override
public String getName() {
return "scoredfullmulti";
}
}

View File

@ -114,4 +114,9 @@ public class UnsortedUnscoredContinuousLuceneMultiSearcher implements LuceneMult
queryParams.scoreMode()
);
}
@Override
public String getName() {
return "unsortedunscoredcontinuousmulti";
}
}

View File

@ -0,0 +1,154 @@
/*-
* #%L
* LmdbJava
* %%
* Copyright (C) 2016 - 2021 The LmdbJava Open Source Project
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package org.lmdbjava;
import static io.net5.buffer.PooledByteBufAllocator.DEFAULT;
import static java.lang.Class.forName;
import static org.lmdbjava.UnsafeAccess.UNSAFE;
import io.net5.buffer.ByteBuf;
import java.lang.reflect.Field;
import io.net5.buffer.ByteBuf;
import io.net5.buffer.PooledByteBufAllocator;
import jnr.ffi.Pointer;
/**
* A buffer proxy backed by Netty's {@link ByteBuf}.
*
* <p>
* This class requires {@link UnsafeAccess} and netty-buffer must be in the
* classpath.
*/
public final class Net5ByteBufProxy extends BufferProxy<ByteBuf> {
/**
* A proxy for using Netty {@link ByteBuf}. Guaranteed to never be null,
* although a class initialization exception will occur if an attempt is made
* to access this field when Netty is unavailable.
*/
public static final BufferProxy<ByteBuf> PROXY_NETTY = new Net5ByteBufProxy();
private static final int BUFFER_RETRIES = 10;
private static final String FIELD_NAME_ADDRESS = "memoryAddress";
private static final String FIELD_NAME_LENGTH = "length";
private static final String NAME = "io.net5.buffer.PooledUnsafeDirectByteBuf";
private final long lengthOffset;
private final long addressOffset;
private final PooledByteBufAllocator nettyAllocator;
private Net5ByteBufProxy() {
this(DEFAULT);
}
public Net5ByteBufProxy(final PooledByteBufAllocator allocator) {
this.nettyAllocator = allocator;
try {
final ByteBuf initBuf = this.allocate();
initBuf.release();
final Field address = findField(NAME, FIELD_NAME_ADDRESS);
final Field length = findField(NAME, FIELD_NAME_LENGTH);
addressOffset = UNSAFE.objectFieldOffset(address);
lengthOffset = UNSAFE.objectFieldOffset(length);
} catch (final SecurityException e) {
throw new LmdbException("Field access error", e);
}
}
static Field findField(final String c, final String name) {
Class<?> clazz;
try {
clazz = forName(c);
} catch (final ClassNotFoundException e) {
throw new LmdbException(c + " class unavailable", e);
}
do {
try {
final Field field = clazz.getDeclaredField(name);
field.setAccessible(true);
return field;
} catch (final NoSuchFieldException e) {
clazz = clazz.getSuperclass();
}
} while (clazz != null);
throw new LmdbException(name + " not found");
}
@Override
protected ByteBuf allocate() {
for (int i = 0; i < BUFFER_RETRIES; i++) {
final ByteBuf bb = nettyAllocator.directBuffer();
if (NAME.equals(bb.getClass().getName())) {
return bb;
} else {
bb.release();
}
}
throw new IllegalStateException("Netty buffer must be " + NAME);
}
@Override
protected int compare(final ByteBuf o1, final ByteBuf o2) {
return o1.compareTo(o2);
}
@Override
protected void deallocate(final ByteBuf buff) {
buff.release();
}
@Override
protected byte[] getBytes(final ByteBuf buffer) {
final byte[] dest = new byte[buffer.capacity()];
buffer.getBytes(0, dest);
return dest;
}
@Override
protected void in(final ByteBuf buffer, final Pointer ptr, final long ptrAddr) {
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE,
buffer.writerIndex() - buffer.readerIndex());
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA,
buffer.memoryAddress() + buffer.readerIndex());
}
@Override
protected void in(final ByteBuf buffer, final int size, final Pointer ptr,
final long ptrAddr) {
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE,
size);
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA,
buffer.memoryAddress() + buffer.readerIndex());
}
@Override
protected ByteBuf out(final ByteBuf buffer, final Pointer ptr,
final long ptrAddr) {
final long addr = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA);
final long size = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE);
UNSAFE.putLong(buffer, addressOffset, addr);
UNSAFE.putInt(buffer, lengthOffset, (int) size);
buffer.writerIndex((int) size).readerIndex(0);
return buffer;
}
}

View File

@ -9,9 +9,12 @@ import io.net5.buffer.api.pool.MetricUtils;
import io.net5.buffer.api.pool.PoolArenaMetric;
import io.net5.buffer.api.pool.PooledBufferAllocator;
import io.net5.util.internal.PlatformDependent;
import it.cavallium.dbengine.client.LuceneIndex;
import it.cavallium.dbengine.client.LuceneIndexImpl;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
@ -23,6 +26,7 @@ import it.cavallium.dbengine.database.collections.SubStageGetterMap;
import it.cavallium.dbengine.database.disk.MemorySegmentUtils;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import it.cavallium.dbengine.lucene.StringIndicizer;
import java.nio.file.Path;
import java.util.Map;
import java.util.Objects;
@ -121,6 +125,9 @@ public class DbTestUtils {
}
public static record TempDb(TestAllocator allocator, LLDatabaseConnection connection, LLKeyValueDatabase db,
LLLuceneIndex luceneSingle,
LLLuceneIndex luceneMulti,
SwappableLuceneSearcher swappableLuceneSearcher,
Path path) {}
static boolean computeCanUseNettyDirect() {
@ -166,6 +173,10 @@ public class DbTestUtils {
return database.getDictionary(name, updateMode);
}
public static Mono<? extends LuceneIndex<String, String>> tempLuceneIndex(LLLuceneIndex index) {
return Mono.fromCallable(() -> new LuceneIndexImpl<>(index, new StringIndicizer()));
}
public enum MapType {
MAP,

View File

@ -0,0 +1,3 @@
package it.cavallium.dbengine;
record ExpectedQueryType(boolean shard, boolean sorted, boolean scored, boolean unlimited, boolean onlyCount) {}

View File

@ -5,15 +5,26 @@ import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
import it.cavallium.dbengine.DbTestUtils.TempDb;
import it.cavallium.dbengine.DbTestUtils.TestAllocator;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.NRTCachingOptions;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.database.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneMultiSearcher;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.publisher.Mono;
@ -23,6 +34,10 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator {
private static final AtomicInteger dbId = new AtomicInteger(0);
private static final Optional<NRTCachingOptions> NRT = Optional.empty();
private static final LuceneOptions LUCENE_OPTS = new LuceneOptions(Map.of(), Duration.ofSeconds(5), Duration.ofSeconds(5),
false, true, Optional.empty(), true, NRT, -1, true, true);
@Override
public Mono<TempDb> openTempDb(TestAllocator allocator) {
boolean canUseNettyDirect = DbTestUtils.computeCanUseNettyDirect();
@ -44,13 +59,33 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator {
})
.subscribeOn(Schedulers.boundedElastic())
.then(new LLLocalDatabaseConnection(allocator.allocator(), wrkspcPath).connect())
.flatMap(conn -> conn
.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(Map.of(), true, false, true, false, true, canUseNettyDirect, canUseNettyDirect, -1)
)
.map(db -> new TempDb(allocator, conn, db, wrkspcPath))
);
.flatMap(conn -> {
SwappableLuceneSearcher searcher = new SwappableLuceneSearcher();
var luceneHacks = new LuceneHacks(() -> searcher, () -> searcher);
return Mono.zip(
conn.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(Map.of(), true, false, true, false,
true, canUseNettyDirect, canUseNettyDirect, -1)
),
conn.getLuceneIndex("testluceneindex1",
1,
IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple),
IndicizerSimilarities.of(TextFieldsSimilarity.Boolean),
LUCENE_OPTS,
luceneHacks
),
conn.getLuceneIndex("testluceneindex16",
1,
IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple),
IndicizerSimilarities.of(TextFieldsSimilarity.Boolean),
LUCENE_OPTS,
luceneHacks
),
Mono.just(searcher)
)
.map(tuple -> new TempDb(allocator, conn, tuple.getT1(), tuple.getT2(), tuple.getT3(), tuple.getT4(), wrkspcPath));
});
});
}

View File

@ -3,25 +3,59 @@ package it.cavallium.dbengine;
import it.cavallium.dbengine.DbTestUtils.TempDb;
import it.cavallium.dbengine.DbTestUtils.TestAllocator;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.NRTCachingOptions;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.lucene.LuceneHacks;
import it.cavallium.dbengine.database.memory.LLMemoryDatabaseConnection;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import reactor.core.publisher.Mono;
public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator {
private static final Optional<NRTCachingOptions> NRT = Optional.empty();
private static final LuceneOptions LUCENE_OPTS = new LuceneOptions(Map.of(), Duration.ofSeconds(5), Duration.ofSeconds(5),
false, true, Optional.empty(), true, NRT, -1, true, true);
@Override
public Mono<TempDb> openTempDb(TestAllocator allocator) {
boolean canUseNettyDirect = DbTestUtils.computeCanUseNettyDirect();
return Mono
.fromCallable(() -> new LLMemoryDatabaseConnection(allocator.allocator()))
.flatMap(conn -> conn
.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(Map.of(), true, false, true, false, true, canUseNettyDirect, canUseNettyDirect, -1)
)
.map(db -> new TempDb(allocator, conn, db, null)));
.flatMap(conn -> {
SwappableLuceneSearcher searcher = new SwappableLuceneSearcher();
var luceneHacks = new LuceneHacks(() -> searcher, () -> searcher);
return Mono
.zip(
conn.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(Map.of(), true, false, true, false, true, canUseNettyDirect, canUseNettyDirect, -1)
),
conn.getLuceneIndex("testluceneindex1",
1,
IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple),
IndicizerSimilarities.of(TextFieldsSimilarity.Boolean),
LUCENE_OPTS,
luceneHacks
),
conn.getLuceneIndex("testluceneindex16",
1,
IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple),
IndicizerSimilarities.of(TextFieldsSimilarity.Boolean),
LUCENE_OPTS,
luceneHacks
),
Mono.just(searcher)
)
.map(tuple -> new TempDb(allocator, conn, tuple.getT1(), tuple.getT2(), tuple.getT3(), tuple.getT4(), null));
});
}
@Override

View File

@ -0,0 +1,3 @@
package it.cavallium.dbengine;
record Scored(String key, float score) {}

View File

@ -0,0 +1,385 @@
package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.destroyAllocator;
import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
import static it.cavallium.dbengine.DbTestUtils.newAllocator;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import it.cavallium.dbengine.DbTestUtils.TempDb;
import it.cavallium.dbengine.DbTestUtils.TestAllocator;
import it.cavallium.dbengine.client.LuceneIndex;
import it.cavallium.dbengine.client.MultiSort;
import it.cavallium.dbengine.client.SearchResultKey;
import it.cavallium.dbengine.client.SearchResultKeys;
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.ClientQueryParamsBuilder;
import it.cavallium.dbengine.client.query.QueryParser;
import it.cavallium.dbengine.client.query.current.data.MatchAllDocsQuery;
import it.cavallium.dbengine.client.query.current.data.MatchNoDocsQuery;
import it.cavallium.dbengine.client.query.current.data.NoSort;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.CountLuceneLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.UnsortedScoredFullLuceneMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.ScoredSimpleLuceneMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.SimpleLuceneLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.SimpleUnsortedUnscoredLuceneMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredContinuousLuceneMultiSearcher;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuples;
public class TestLuceneIndex {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private TestAllocator allocator;
private TempDb tempDb;
private LLLuceneIndex luceneSingle;
private LLLuceneIndex luceneMulti;
protected TemporaryDbGenerator getTempDbGenerator() {
return new MemoryTemporaryDbGenerator();
}
@BeforeEach
public void beforeEach() {
this.allocator = newAllocator();
ensureNoLeaks(allocator.allocator(), false, false);
tempDb = Objects.requireNonNull(getTempDbGenerator().openTempDb(allocator).block(), "TempDB");
luceneSingle = tempDb.luceneSingle();
luceneMulti = tempDb.luceneMulti();
}
public static Stream<Arguments> provideArguments() {
return Stream.of(false, true).map(Arguments::of);
}
private static final Flux<Boolean> multi = Flux.just(false, true);
private static final Flux<LLScoreMode> scoreModes = Flux.just(LLScoreMode.NO_SCORES,
LLScoreMode.TOP_SCORES,
LLScoreMode.COMPLETE_NO_SCORES,
LLScoreMode.COMPLETE
);
private static final Flux<MultiSort<SearchResultKey<String>>> multiSort = Flux.just(MultiSort.topScore(),
MultiSort.randomSortField(),
MultiSort.noSort(),
MultiSort.docSort(),
MultiSort.numericSort("longsort", false),
MultiSort.numericSort("longsort", true)
);
private static Flux<LuceneLocalSearcher> getSearchers(ExpectedQueryType info) {
return Flux.push(sink -> {
try {
if (info.shard()) {
sink.next(new AdaptiveLuceneMultiSearcher());
if (info.onlyCount()) {
sink.next(new SimpleUnsortedUnscoredLuceneMultiSearcher(new CountLuceneLocalSearcher()));
} else {
sink.next(new ScoredSimpleLuceneMultiSearcher());
if (!info.sorted()) {
sink.next(new UnsortedScoredFullLuceneMultiSearcher());
}
if (!info.scored() && !info.sorted()) {
sink.next(new SimpleUnsortedUnscoredLuceneMultiSearcher(new SimpleLuceneLocalSearcher()));
sink.next(new UnsortedUnscoredContinuousLuceneMultiSearcher());
}
}
} else {
sink.next(new AdaptiveLuceneLocalSearcher());
if (info.onlyCount()) {
sink.next(new CountLuceneLocalSearcher());
} else {
sink.next(new SimpleLuceneLocalSearcher());
}
}
sink.complete();
} catch (IOException e) {
sink.error(e);
}
}, OverflowStrategy.BUFFER);
}
public static Stream<Arguments> provideQueryArgumentsScoreMode() {
return multi
.concatMap(shard -> scoreModes.map(scoreMode -> Tuples.of(shard, scoreMode)))
.map(tuple -> Arguments.of(tuple.toArray()))
.toStream();
}
public static Stream<Arguments> provideQueryArgumentsSort() {
return multi
.concatMap(shard -> multiSort.map(multiSort -> Tuples.of(shard, multiSort)))
.map(tuple -> Arguments.of(tuple.toArray()))
.toStream();
}
public static Stream<Arguments> provideQueryArgumentsScoreModeAndSort() {
return multi
.concatMap(shard -> scoreModes.map(scoreMode -> Tuples.of(shard, scoreMode)))
.concatMap(tuple -> multiSort.map(multiSort -> Tuples.of(tuple.getT1(), tuple.getT2(), multiSort)))
.map(tuple -> Arguments.of(tuple.toArray()))
.toStream();
}
@AfterEach
public void afterEach() {
getTempDbGenerator().closeTempDb(tempDb).block();
ensureNoLeaks(allocator.allocator(), true, false);
destroyAllocator(allocator);
}
private LuceneIndex<String, String> getLuceneIndex(boolean shards, @Nullable LuceneLocalSearcher customSearcher) {
LuceneIndex<String, String> index = run(DbTestUtils.tempLuceneIndex(shards ? luceneSingle : luceneMulti));
index.updateDocument("test-key-1", "0123456789").block();
index.updateDocument("test-key-2", "test 0123456789 test word").block();
index.updateDocument("test-key-3", "0123456789 test example string").block();
index.updateDocument("test-key-4", "hello world the quick brown fox jumps over the lazy dog").block();
index.updateDocument("test-key-5", "hello the quick brown fox jumps over the lazy dog").block();
index.updateDocument("test-key-6", "hello the quick brown fox jumps over the world dog").block();
index.updateDocument("test-key-7", "the quick brown fox jumps over the world dog").block();
index.updateDocument("test-key-8", "the quick brown fox jumps over the lazy dog").block();
index.updateDocument("test-key-9", "Example1").block();
index.updateDocument("test-key-10", "Example2").block();
index.updateDocument("test-key-11", "Example3").block();
index.updateDocument("test-key-12", "-234").block();
index.updateDocument("test-key-13", "2111").block();
index.updateDocument("test-key-14", "2999").block();
index.updateDocument("test-key-15", "3902").block();
Flux.range(1, 1000).concatMap(i -> index.updateDocument("test-key-" + (15 + i), "" + i)).blockLast();
tempDb.swappableLuceneSearcher().setSingle(new CountLuceneLocalSearcher());
tempDb.swappableLuceneSearcher().setMulti(new SimpleUnsortedUnscoredLuceneMultiSearcher(new CountLuceneLocalSearcher()));
assertCount(index, 1000 + 15);
try {
if (customSearcher != null) {
tempDb.swappableLuceneSearcher().setSingle(customSearcher);
if (shards) {
if (customSearcher instanceof LuceneMultiSearcher multiSearcher) {
tempDb.swappableLuceneSearcher().setMulti(multiSearcher);
} else {
throw new IllegalArgumentException("Expected a LuceneMultiSearcher, got a LuceneLocalSearcher: " + customSearcher.getName());
}
}
} else {
tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLuceneLocalSearcher());
tempDb.swappableLuceneSearcher().setMulti(new AdaptiveLuceneMultiSearcher());
}
} catch (IOException e) {
fail(e);
}
return index;
}
private void run(Flux<?> publisher) {
publisher.subscribeOn(Schedulers.immediate()).blockLast();
}
private void runVoid(Mono<Void> publisher) {
publisher.then().subscribeOn(Schedulers.immediate()).block();
}
private <T> T run(Mono<T> publisher) {
return publisher.subscribeOn(Schedulers.immediate()).block();
}
private <T> T run(boolean shouldFail, Mono<T> publisher) {
return publisher.subscribeOn(Schedulers.immediate()).transform(mono -> {
if (shouldFail) {
return mono.onErrorResume(ex -> Mono.empty());
} else {
return mono;
}
}).block();
}
private void runVoid(boolean shouldFail, Mono<Void> publisher) {
publisher.then().subscribeOn(Schedulers.immediate()).transform(mono -> {
if (shouldFail) {
return mono.onErrorResume(ex -> Mono.empty());
} else {
return mono;
}
}).block();
}
private void assertCount(LuceneIndex<String, String> luceneIndex, long expected) {
Assertions.assertEquals(expected, getCount(luceneIndex));
}
private long getCount(LuceneIndex<String, String> luceneIndex) {
luceneIndex.refresh(true).block();
var totalHitsCount = run(luceneIndex.count(null, new MatchAllDocsQuery()));
Assertions.assertTrue(totalHitsCount.exact(), "Can't get count because the total hits count is not exact");
return totalHitsCount.value();
}
@Test
public void testNoOp() {
}
@Test
public void testNoOpAllocation() {
for (int i = 0; i < 10; i++) {
var a = allocator.allocator().allocate(i * 512);
a.send().receive().close();
}
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testGetLuceneIndex(boolean shards) {
var luceneIndex = getLuceneIndex(shards, null);
Assertions.assertNotNull(luceneIndex);
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testDeleteAll(boolean shards) {
var luceneIndex = getLuceneIndex(shards, null);
runVoid(luceneIndex.deleteAll());
assertCount(luceneIndex, 0);
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testDelete(boolean shards) {
var luceneIndex = getLuceneIndex(shards, null);
var prevCount = getCount(luceneIndex);
runVoid(luceneIndex.deleteDocument("test-key-1"));
assertCount(luceneIndex, prevCount - 1);
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testUpdateSameDoc(boolean shards) {
var luceneIndex = getLuceneIndex(shards, null);
var prevCount = getCount(luceneIndex);
runVoid(luceneIndex.updateDocument("test-key-1", "new-value"));
assertCount(luceneIndex, prevCount );
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testUpdateNewDoc(boolean shards) {
var luceneIndex = getLuceneIndex(shards, null);
var prevCount = getCount(luceneIndex);
runVoid(luceneIndex.updateDocument("test-key-new", "new-value"));
assertCount(luceneIndex, prevCount + 1);
}
@ParameterizedTest
@MethodSource("provideQueryArgumentsScoreModeAndSort")
public void testSearchNoDocs(boolean shards, LLScoreMode scoreMode, MultiSort<SearchResultKey<String>> multiSort) {
var searchers = run(getSearchers(new ExpectedQueryType(shards, isSorted(multiSort), isScored(scoreMode, multiSort), true, false)).collectList());
for (LuceneLocalSearcher searcher : searchers) {
log.info("Using searcher \"{}\"", searcher.getName());
var luceneIndex = getLuceneIndex(shards, searcher);
ClientQueryParamsBuilder<SearchResultKey<String>> queryBuilder = ClientQueryParams.builder();
queryBuilder.query(new MatchNoDocsQuery());
queryBuilder.snapshot(null);
queryBuilder.scoreMode(scoreMode);
queryBuilder.sort(multiSort);
var query = queryBuilder.build();
try (var results = run(luceneIndex.search(query)).receive()) {
var hits = results.totalHitsCount();
if (supportsPreciseHitsCount(searcher, query)) {
assertEquals(new TotalHitsCount(0, true), hits);
}
var keys = getResults(results);
assertEquals(List.of(), keys);
}
}
}
private boolean supportsPreciseHitsCount(LuceneLocalSearcher searcher,
ClientQueryParams<SearchResultKey<String>> query) {
if (searcher instanceof UnsortedUnscoredContinuousLuceneMultiSearcher) {
return false;
}
var scored = isScored(query.scoreMode(), Objects.requireNonNullElse(query.sort(), MultiSort.noSort()));
var sorted = isSorted(Objects.requireNonNullElse(query.sort(), MultiSort.noSort()));
if (!sorted && !scored) {
if (searcher instanceof AdaptiveLuceneMultiSearcher || searcher instanceof AdaptiveLuceneLocalSearcher) {
return false;
}
}
return true;
}
@ParameterizedTest
@MethodSource("provideQueryArgumentsScoreModeAndSort")
public void testSearchAllDocs(boolean shards, LLScoreMode scoreMode, MultiSort<SearchResultKey<String>> multiSort) {
var searchers = run(getSearchers(new ExpectedQueryType(shards, isSorted(multiSort), isScored(scoreMode, multiSort), true, false)).collectList());
for (LuceneLocalSearcher searcher : searchers) {
log.info("Using searcher \"{}\"", searcher.getName());
var luceneIndex = getLuceneIndex(shards, searcher);
ClientQueryParamsBuilder<SearchResultKey<String>> queryBuilder = ClientQueryParams.builder();
queryBuilder.query(new MatchNoDocsQuery());
queryBuilder.snapshot(null);
queryBuilder.scoreMode(scoreMode);
queryBuilder.sort(multiSort);
var query = queryBuilder.build();
try (var results = run(luceneIndex.search(query)).receive()) {
var hits = results.totalHitsCount();
if (supportsPreciseHitsCount(searcher, query)) {
assertEquals(new TotalHitsCount(0, true), hits);
}
var keys = getResults(results);
assertEquals(List.of(), keys);
}
}
}
private boolean isSorted(MultiSort<SearchResultKey<String>> multiSort) {
return !(multiSort.getQuerySort() instanceof NoSort);
}
private boolean isScored(LLScoreMode scoreMode, MultiSort<SearchResultKey<String>> multiSort) {
var needsScores = LLUtils.toScoreMode(scoreMode).needsScores();
var sort =QueryParser.toSort(multiSort.getQuerySort());
if (sort != null) {
needsScores |= sort.needsScores();
}
return needsScores;
}
private List<Scored> getResults(SearchResultKeys<String> results) {
return run(results
.results()
.flatMapSequential(searchResultKey -> searchResultKey
.key()
.single()
.map(key -> new Scored(key, searchResultKey.score()))
)
.collectList());
}
}