diff --git a/pom.xml b/pom.xml index 9f15c95..9ed8d6f 100644 --- a/pom.xml +++ b/pom.xml @@ -224,6 +224,7 @@ io.projectreactor reactor-test + test org.novasearch @@ -248,6 +249,11 @@ micrometer-registry-jmx true + + org.lmdbjava + lmdbjava + 0.8.2 + diff --git a/src/main/java/it/cavallium/dbengine/SwappableLuceneSearcher.java b/src/main/java/it/cavallium/dbengine/SwappableLuceneSearcher.java new file mode 100644 index 0000000..bc14b08 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/SwappableLuceneSearcher.java @@ -0,0 +1,81 @@ +package it.cavallium.dbengine; + +import static java.util.Objects.requireNonNull; +import static java.util.Objects.requireNonNullElseGet; + +import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.disk.LLIndexSearcher; +import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.database.disk.LLLocalSingleton; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; +import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; +import it.cavallium.dbengine.lucene.searcher.LuceneLocalSearcher; +import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher; +import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult; +import java.io.Closeable; +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import reactor.core.publisher.Mono; + +public class SwappableLuceneSearcher implements LuceneLocalSearcher, LuceneMultiSearcher, Closeable { + + private final AtomicReference single = new AtomicReference<>(null); + private final AtomicReference multi = new AtomicReference<>(null); + + public SwappableLuceneSearcher() { + + } + + @Override + public Mono> collect(Mono> indexSearcherMono, + LocalQueryParams queryParams, + String keyFieldName, + LLSearchTransformer transformer) { + var single = requireNonNullElseGet(this.single.get(), this.multi::get); + requireNonNull(single, "LuceneLocalSearcher not set"); + return single.collect(indexSearcherMono, queryParams, keyFieldName, transformer); + } + + @Override + public String getName() { + var single = this.single.get(); + var multi = this.multi.get(); + if (single == multi) { + if (single == null) { + return "swappable"; + } else { + return single.getName(); + } + } else { + return "swappable[single=" + single.getName() + ",multi=" + multi.getName() + "]"; + } + } + + @Override + public Mono> collectMulti(Mono> indexSearchersMono, + LocalQueryParams queryParams, + String keyFieldName, + LLSearchTransformer transformer) { + var multi = requireNonNull(this.multi.get(), "LuceneMultiSearcher not set"); + return multi.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + } + + public void setSingle(LuceneLocalSearcher single) { + this.single.set(single); + } + + public void setMulti(LuceneMultiSearcher multi) { + this.multi.set(multi); + } + + @Override + public void close() throws IOException { + if (this.single.get() instanceof Closeable closeable) { + closeable.close(); + } + if (this.multi.get() instanceof Closeable closeable) { + closeable.close(); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index f34168b..cb9e349 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -188,7 +188,9 @@ public class LuceneIndexImpl implements LuceneIndex { queryParams.toQueryParams(), indicizer.getKeyFieldName() ) - .transform(this::transformLuceneResultWithTransformer); + .single() + .transform(this::transformLuceneResultWithTransformer) + .single(); } @Override @@ -217,6 +219,7 @@ public class LuceneIndexImpl implements LuceneIndex { public Mono count(@Nullable CompositeSnapshot snapshot, Query query) { return this .search(ClientQueryParams.>builder().snapshot(snapshot).query(query).limit(0).build()) + .single() .map(searchResultKeysSend -> { try (var searchResultKeys = searchResultKeysSend.receive()) { return searchResultKeys.totalHitsCount(); diff --git a/src/main/java/it/cavallium/dbengine/client/MultiSort.java b/src/main/java/it/cavallium/dbengine/client/MultiSort.java index 8acfadf..86d42ff 100644 --- a/src/main/java/it/cavallium/dbengine/client/MultiSort.java +++ b/src/main/java/it/cavallium/dbengine/client/MultiSort.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.client; +import it.cavallium.dbengine.client.query.current.data.DocSort; import it.cavallium.dbengine.client.query.current.data.NoSort; import it.cavallium.dbengine.client.query.current.data.NumericSort; import it.cavallium.dbengine.client.query.current.data.RandomSort; @@ -7,6 +8,8 @@ import it.cavallium.dbengine.client.query.current.data.ScoreSort; import it.cavallium.dbengine.client.query.current.data.Sort; import it.cavallium.dbengine.database.LLKeyScore; import java.util.Comparator; +import java.util.Objects; +import java.util.StringJoiner; import java.util.function.Function; import java.util.function.ToIntFunction; import java.util.function.ToLongFunction; @@ -63,6 +66,18 @@ public class MultiSort { return new MultiSort<>(ScoreSort.of()); } + public static MultiSort> noSort() { + return new MultiSort<>(NoSort.of()); + } + + public static MultiSort> docSort() { + return new MultiSort<>(DocSort.of()); + } + + public static MultiSort> numericSort(String field, boolean reverse) { + return new MultiSort<>(NumericSort.of(field, reverse)); + } + public static MultiSort> topScoreWithValues() { return new MultiSort<>(ScoreSort.of()); } @@ -74,4 +89,26 @@ public class MultiSort { public Sort getQuerySort() { return querySort; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MultiSort multiSort = (MultiSort) o; + return Objects.equals(querySort, multiSort.querySort); + } + + @Override + public int hashCode() { + return Objects.hash(querySort); + } + + @Override + public String toString() { + return querySort.toString(); + } } diff --git a/src/main/java/it/cavallium/dbengine/client/query/QueryParser.java b/src/main/java/it/cavallium/dbengine/client/query/QueryParser.java index 7b60568..7b4052b 100644 --- a/src/main/java/it/cavallium/dbengine/client/query/QueryParser.java +++ b/src/main/java/it/cavallium/dbengine/client/query/QueryParser.java @@ -18,6 +18,7 @@ import it.cavallium.dbengine.client.query.current.data.TermAndBoost; import it.cavallium.dbengine.client.query.current.data.TermPosition; import it.cavallium.dbengine.client.query.current.data.TermQuery; import it.cavallium.dbengine.client.query.current.data.WildcardQuery; +import it.cavallium.dbengine.lucene.RandomSortField; import org.apache.lucene.document.IntPoint; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedNumericDocValuesField; @@ -146,6 +147,8 @@ public class QueryParser { case NumericSort: NumericSort numericSort = (NumericSort) sort; return new Sort(new SortedNumericSortField(numericSort.field(), Type.LONG, numericSort.reverse())); + case RandomSort: + return new Sort(new RandomSortField()); default: throw new IllegalStateException("Unexpected value: " + sort.getBasicType$()); } diff --git a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java index 2743398..ce293d4 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java @@ -5,7 +5,9 @@ import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.client.LuceneOptions; +import it.cavallium.dbengine.database.lucene.LuceneHacks; import java.util.List; +import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; @SuppressWarnings("UnusedReturnValue") @@ -23,7 +25,8 @@ public interface LLDatabaseConnection { int instancesCount, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, - LuceneOptions luceneOptions); + LuceneOptions luceneOptions, + @Nullable LuceneHacks luceneHacks); Mono disconnect(); } diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 5d1249b..00e97f1 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -122,7 +122,7 @@ public class LLUtils { case COMPLETE -> ScoreMode.COMPLETE; case TOP_SCORES -> ScoreMode.TOP_SCORES; case COMPLETE_NO_SCORES -> ScoreMode.COMPLETE_NO_SCORES; - default -> throw new IllegalStateException("Unexpected value: " + scoreMode); + case NO_SCORES -> ScoreMode.TOP_DOCS; }; } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java index 4b263d9..a3c853c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java @@ -8,11 +8,14 @@ import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLLuceneIndex; +import it.cavallium.dbengine.database.lucene.LuceneHacks; +import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneMultiSearcher; import it.cavallium.dbengine.netty.JMXNettyMonitoringManager; import java.nio.file.Files; import java.nio.file.Path; import java.util.LinkedList; import java.util.List; +import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -68,7 +71,8 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { int instancesCount, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, - LuceneOptions luceneOptions) { + LuceneOptions luceneOptions, + @Nullable LuceneHacks luceneHacks) { return Mono .fromCallable(() -> { if (instancesCount != 1) { @@ -77,14 +81,16 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { instancesCount, indicizerAnalyzers, indicizerSimilarities, - luceneOptions + luceneOptions, + luceneHacks ); } else { return new LLLocalLuceneIndex(basePath.resolve("lucene"), name, indicizerAnalyzers, indicizerSimilarities, - luceneOptions + luceneOptions, + luceneHacks ); } }) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index a2480a9..ba109c0 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -16,6 +16,7 @@ import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneLocalSearcher; @@ -61,7 +62,7 @@ import reactor.util.function.Tuple2; public class LLLocalLuceneIndex implements LLLuceneIndex { protected static final Logger logger = LoggerFactory.getLogger(LLLocalLuceneIndex.class); - private static final LuceneLocalSearcher localSearcher = new AdaptiveLuceneLocalSearcher(); + private final LuceneLocalSearcher localSearcher; /** * Global lucene index scheduler. * There is only a single thread globally to not overwhelm the disk with @@ -85,7 +86,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { String name, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, - LuceneOptions luceneOptions) throws IOException { + LuceneOptions luceneOptions, + @Nullable LuceneHacks luceneHacks) throws IOException { Path directoryPath; if (luceneOptions.inMemory() != (luceneBasePath == null)) { throw new IllegalArgumentException(); @@ -165,6 +167,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { this.lowMemory = lowMemory; this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers); this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities); + if (luceneHacks != null && luceneHacks.customLocalSearcher() != null) { + localSearcher = luceneHacks.customLocalSearcher().get(); + } else { + localSearcher = new AdaptiveLuceneLocalSearcher(); + } var indexWriterConfig = new IndexWriterConfig(luceneAnalyzer); indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND); @@ -188,7 +195,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } logger.trace("WriterSchedulerMaxThreadCount: {}", writerSchedulerMaxThreadCount); indexWriterConfig.setMergeScheduler(mergeScheduler); - indexWriterConfig.setRAMBufferSizeMB(luceneOptions.indexWriterBufferSize() / 1024D / 1024D); + if (luceneOptions.indexWriterBufferSize() == -1) { + //todo: allow to configure maxbuffereddocs fallback + indexWriterConfig.setMaxBufferedDocs(1000); + // disable ram buffer size after enabling maxBufferedDocs + indexWriterConfig.setRAMBufferSizeMB(-1); + } else { + indexWriterConfig.setRAMBufferSizeMB(luceneOptions.indexWriterBufferSize() / 1024D / 1024D); + } indexWriterConfig.setReaderPooling(false); indexWriterConfig.setSimilarity(getLuceneSimilarity()); this.indexWriter = new IndexWriter(directory, indexWriterConfig); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 319551d..aa218fa 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -10,11 +10,13 @@ import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; +import it.cavallium.dbengine.database.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneMultiSearcher; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher; +import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; import java.time.Duration; @@ -33,6 +35,7 @@ import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @@ -43,14 +46,15 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { private final PerFieldAnalyzerWrapper luceneAnalyzer; private final PerFieldSimilarityWrapper luceneSimilarity; - private final LuceneMultiSearcher multiSearcher = new AdaptiveLuceneMultiSearcher(); + private final LuceneMultiSearcher multiSearcher; public LLLocalMultiLuceneIndex(Path lucene, String name, int instancesCount, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, - LuceneOptions luceneOptions) throws IOException { + LuceneOptions luceneOptions, + @Nullable LuceneHacks luceneHacks) throws IOException { if (instancesCount <= 1 || instancesCount > 100) { throw new IOException("Unsupported instances count: " + instancesCount); @@ -68,12 +72,19 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { instanceName, indicizerAnalyzers, indicizerSimilarities, - luceneOptions + luceneOptions, + luceneHacks ); } this.luceneIndices = luceneIndices; this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers); this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities); + + if (luceneHacks != null && luceneHacks.customMultiSearcher() != null) { + multiSearcher = luceneHacks.customMultiSearcher().get(); + } else { + multiSearcher = new AdaptiveLuceneMultiSearcher(); + } } private LLLocalLuceneIndex getLuceneIndex(LLTerm id) { @@ -234,6 +245,12 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { return Flux .fromArray(luceneIndices) .flatMap(LLLocalLuceneIndex::close) + .then(Mono.fromCallable(() -> { + if (multiSearcher instanceof Closeable closeable) { + closeable.close(); + } + return null; + }).subscribeOn(Schedulers.boundedElastic())) .then(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java b/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java new file mode 100644 index 0000000..af19fb9 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java @@ -0,0 +1,54 @@ +package it.cavallium.dbengine.database.disk; + +import io.net5.buffer.ByteBuf; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.concurrent.Phaser; +import org.lmdbjava.Net5ByteBufProxy; +import org.lmdbjava.Env; +import static org.lmdbjava.EnvFlags.*; + +public class LLTempLMDBEnv implements Closeable { + + private static final long TEN_MEBYBYTES = 10_485_760; + private static final int MAX_DATABASES = 1024; + + private final Phaser resources = new Phaser(1); + + private final Path tempDirectory; + private final Env env; + + public LLTempLMDBEnv() throws IOException { + tempDirectory = Files.createTempDirectory("lmdb"); + var envBuilder = Env.create(Net5ByteBufProxy.PROXY_NETTY) + .setMapSize(TEN_MEBYBYTES) + .setMaxDbs(MAX_DATABASES); + //env = envBuilder.open(tempDirectory.toFile(), MDB_NOLOCK, MDB_NOSYNC, MDB_NOTLS, MDB_NORDAHEAD, MDB_WRITEMAP); + env = envBuilder.open(tempDirectory.toFile(), MDB_NOTLS, MDB_WRITEMAP, MDB_NORDAHEAD); + } + + public Env getEnvAndIncrementRef() { + resources.register(); + return env; + } + + public void decrementRef() { + resources.arriveAndDeregister(); + } + + @Override + public void close() throws IOException { + resources.arriveAndAwaitAdvance(); + + env.close(); + //noinspection ResultOfMethodCallIgnored + Files.walk(tempDirectory) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/lucene/LuceneHacks.java b/src/main/java/it/cavallium/dbengine/database/lucene/LuceneHacks.java new file mode 100644 index 0000000..3552a73 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/lucene/LuceneHacks.java @@ -0,0 +1,10 @@ +package it.cavallium.dbengine.database.lucene; + +import it.cavallium.dbengine.lucene.searcher.LuceneLocalSearcher; +import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher; +import java.util.function.Supplier; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public record LuceneHacks(@Nullable Supplier<@NotNull LuceneLocalSearcher> customLocalSearcher, + @Nullable Supplier<@NotNull LuceneMultiSearcher> customMultiSearcher) {} diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java index 7c09987..84d4d34 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java @@ -10,8 +10,10 @@ import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex; +import it.cavallium.dbengine.database.lucene.LuceneHacks; import it.cavallium.dbengine.netty.JMXNettyMonitoringManager; import java.util.List; +import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -55,13 +57,15 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { int instancesCount, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, - LuceneOptions luceneOptions) { + LuceneOptions luceneOptions, + @Nullable LuceneHacks luceneHacks) { return Mono .fromCallable(() -> new LLLocalLuceneIndex(null, name, indicizerAnalyzers, indicizerSimilarities, - luceneOptions + luceneOptions, + luceneHacks )) .subscribeOn(Schedulers.boundedElastic()); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/CloseableIterable.java b/src/main/java/it/cavallium/dbengine/lucene/CloseableIterable.java new file mode 100644 index 0000000..f9ba6b6 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/CloseableIterable.java @@ -0,0 +1,16 @@ +package it.cavallium.dbengine.lucene; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; +import org.jetbrains.annotations.NotNull; + +public interface CloseableIterable extends Iterable, Closeable { + + @Override + void close(); + + @NotNull + @Override + Iterator iterator(); +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/EmptyPriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/EmptyPriorityQueue.java new file mode 100644 index 0000000..2a0c690 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/EmptyPriorityQueue.java @@ -0,0 +1,60 @@ +package it.cavallium.dbengine.lucene; + +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Flux; + +public class EmptyPriorityQueue implements PriorityQueue { + + @Override + public void add(T element) { + throw new UnsupportedOperationException(); + } + + @Override + public T top() { + return null; + } + + @Override + public T pop() { + return null; + } + + @Override + public void updateTop() { + + } + + @Override + public void updateTop(T newTop) { + assert newTop == null; + } + + @Override + public long size() { + return 0; + } + + @Override + public void clear() { + + } + + @Override + public boolean remove(T element) { + throw new UnsupportedOperationException(); + } + + @Override + public Flux iterate() { + return Flux.empty(); + } + + @Override + public void close() throws IOException { + + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java new file mode 100644 index 0000000..053e1ec --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java @@ -0,0 +1,148 @@ +package it.cavallium.dbengine.lucene; + +import static it.cavallium.dbengine.lucene.LLDocElementScoreComparator.SCORE_DOC_SCORE_ELEM_COMPARATOR; +import static org.apache.lucene.search.TotalHits.Relation.*; + +import java.util.Comparator; +import org.apache.lucene.search.FieldComparator; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TotalHits; +import org.apache.lucene.search.TotalHits.Relation; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Flux; + +public interface FullDocs extends ResourceIterable { + + Comparator SHARD_INDEX_TIE_BREAKER = Comparator.comparingInt(LLDocElement::shardIndex); + Comparator DOC_ID_TIE_BREAKER = Comparator.comparingInt(LLDocElement::doc); + Comparator DEFAULT_TIE_BREAKER = SHARD_INDEX_TIE_BREAKER.thenComparing(DOC_ID_TIE_BREAKER); + + @Override + Flux iterate(); + + @Override + Flux iterate(long skips); + + TotalHits totalHits(); + + static FullDocs merge(@Nullable Sort sort, FullDocs[] fullDocs) { + ResourceIterable mergedIterable = mergeResourceIterable(sort, fullDocs); + TotalHits mergedTotalHits = mergeTotalHits(fullDocs); + return new FullDocs<>() { + @Override + public Flux iterate() { + return mergedIterable.iterate(); + } + + @Override + public Flux iterate(long skips) { + return mergedIterable.iterate(skips); + } + + @Override + public TotalHits totalHits() { + return mergedTotalHits; + } + }; + } + + static int tieBreakCompare( + T firstDoc, + T secondDoc, + Comparator tieBreaker) { + assert tieBreaker != null; + + int value = tieBreaker.compare(firstDoc, secondDoc); + if (value == 0) { + throw new IllegalStateException(); + } else { + return value; + } + } + + static ResourceIterable mergeResourceIterable( + @Nullable Sort sort, + FullDocs[] fullDocs) { + return () -> { + @SuppressWarnings("unchecked") + Flux[] iterables = new Flux[fullDocs.length]; + + for (int i = 0; i < fullDocs.length; i++) { + var singleFullDocs = fullDocs[i].iterate(); + iterables[i] = singleFullDocs; + } + + Comparator comp; + if (sort == null) { + // Merge maintaining sorting order (Algorithm taken from TopDocs.ScoreMergeSortQueue) + + comp = SCORE_DOC_SCORE_ELEM_COMPARATOR.thenComparing(DEFAULT_TIE_BREAKER); + } else { + // Merge maintaining sorting order (Algorithm taken from TopDocs.MergeSortQueue) + + SortField[] sortFields = sort.getSort(); + var comparators = new FieldComparator[sortFields.length]; + var reverseMul = new int[sortFields.length]; + + for(int compIDX = 0; compIDX < sortFields.length; ++compIDX) { + SortField sortField = sortFields[compIDX]; + comparators[compIDX] = sortField.getComparator(1, compIDX); + reverseMul[compIDX] = sortField.getReverse() ? -1 : 1; + } + + comp = (first, second) -> { + assert first != second; + + LLFieldDoc firstFD = (LLFieldDoc) first; + LLFieldDoc secondFD = (LLFieldDoc) second; + + for(int compIDX = 0; compIDX < comparators.length; ++compIDX) { + //noinspection rawtypes + FieldComparator fieldComp = comparators[compIDX]; + //noinspection unchecked + int cmp = reverseMul[compIDX] * fieldComp.compareValues(firstFD.fields().get(compIDX), secondFD.fields().get(compIDX)); + if (cmp != 0) { + return cmp; + } + } + + return tieBreakCompare(first, second, DEFAULT_TIE_BREAKER); + }; + } + + @SuppressWarnings("unchecked") + Flux[] fluxes = new Flux[fullDocs.length]; + for (int i = 0; i < iterables.length; i++) { + var shardIndex = i; + fluxes[i] = iterables[i].map(shard -> { + if (shard instanceof LLScoreDoc scoreDoc) { + //noinspection unchecked + return (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex); + } else { + throw new UnsupportedOperationException("Unsupported type " + shard.getClass()); + } + }); + if (fullDocs[i].totalHits().relation == EQUAL_TO) { + fluxes[i] = fluxes[i].take(fullDocs[i].totalHits().value, true); + } + } + + return Flux.mergeComparing(comp, fluxes); + }; + } + + static TotalHits mergeTotalHits(FullDocs[] fullDocs) { + long totalCount = 0; + Relation totalRelation = EQUAL_TO; + for (FullDocs fullDoc : fullDocs) { + var totalHits = fullDoc.totalHits(); + totalCount += totalHits.value; + totalRelation = switch (totalHits.relation) { + case EQUAL_TO -> totalRelation; + case GREATER_THAN_OR_EQUAL_TO -> totalRelation == EQUAL_TO ? GREATER_THAN_OR_EQUAL_TO : totalRelation; + }; + } + return new TotalHits(totalCount, totalRelation); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLDocElement.java b/src/main/java/it/cavallium/dbengine/lucene/LLDocElement.java new file mode 100644 index 0000000..88437cd --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LLDocElement.java @@ -0,0 +1,10 @@ +package it.cavallium.dbengine.lucene; + +public sealed interface LLDocElement permits LLFieldDoc, LLScoreDoc { + + int doc(); + + float score(); + + int shardIndex(); +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLDocElementScoreComparator.java b/src/main/java/it/cavallium/dbengine/lucene/LLDocElementScoreComparator.java new file mode 100644 index 0000000..ba7f2a4 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LLDocElementScoreComparator.java @@ -0,0 +1,13 @@ +package it.cavallium.dbengine.lucene; + +import java.util.Comparator; + +class LLDocElementScoreComparator implements Comparator { + + public static final Comparator SCORE_DOC_SCORE_ELEM_COMPARATOR = new LLDocElementScoreComparator(); + + @Override + public int compare(LLDocElement hitA, LLDocElement hitB) { + return Float.compare(hitA.score(), hitB.score()); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLFieldDoc.java b/src/main/java/it/cavallium/dbengine/lucene/LLFieldDoc.java new file mode 100644 index 0000000..05b20ba --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LLFieldDoc.java @@ -0,0 +1,5 @@ +package it.cavallium.dbengine.lucene; + +import java.util.List; + +public record LLFieldDoc(int doc, float score, int shardIndex, List fields) implements LLDocElement {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLFieldDocCodec.java b/src/main/java/it/cavallium/dbengine/lucene/LLFieldDocCodec.java new file mode 100644 index 0000000..46aae09 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LLFieldDocCodec.java @@ -0,0 +1,151 @@ +package it.cavallium.dbengine.lucene; + +import io.net5.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.function.Function; + +public class LLFieldDocCodec implements LMDBCodec { + + private enum FieldType { + FLOAT, + DOUBLE, + INT, + LONG; + + public byte ordinalByte() { + return (byte) ordinal(); + } + } + + @Override + public ByteBuf serialize(Function allocator, LLFieldDoc data) { + int fieldsDataSize = 0; + byte[] fieldTypes = new byte[data.fields().size()]; + int fieldId = 0; + for (Object field : data.fields()) { + assert field != null; + if (field instanceof Float) { + fieldsDataSize += Float.BYTES; + fieldTypes[fieldId] = FieldType.FLOAT.ordinalByte(); + } else if (field instanceof Double) { + fieldsDataSize += Double.BYTES; + fieldTypes[fieldId] = FieldType.DOUBLE.ordinalByte(); + } else if (field instanceof Integer) { + fieldsDataSize += Integer.BYTES; + fieldTypes[fieldId] = FieldType.INT.ordinalByte(); + } else if (field instanceof Long) { + fieldsDataSize += Long.BYTES; + fieldTypes[fieldId] = FieldType.LONG.ordinalByte(); + } else { + throw new UnsupportedOperationException("Unsupported field type " + field.getClass()); + } + fieldId++; + } + int size = Float.BYTES + Integer.BYTES + Integer.BYTES + Character.BYTES + (data.fields().size() + Byte.BYTES) + fieldsDataSize; + var buf = allocator.apply(size); + setScore(buf, data.score()); + setDoc(buf, data.doc()); + setShardIndex(buf, data.shardIndex()); + setFieldsCount(buf, data.fields().size()); + buf.writerIndex(size); + + fieldId = 0; + for (Object field : data.fields()) { + assert field != null; + buf.writeByte(fieldTypes[fieldId]); + if (field instanceof Float val) { + buf.writeFloat(val); + } else if (field instanceof Double val) { + buf.writeDouble(val); + } else if (field instanceof Integer val) { + buf.writeInt(val); + } else if (field instanceof Long val) { + buf.writeLong(val); + } else { + throw new UnsupportedOperationException("Unsupported field type " + field.getClass()); + } + fieldId++; + } + assert buf.writableBytes() == 0; + return buf.asReadOnly(); + } + + @Override + public LLFieldDoc deserialize(ByteBuf buf) { + var fieldsCount = getFieldsCount(buf); + ArrayList fields = new ArrayList<>(fieldsCount); + buf.readerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES + Character.BYTES); + for (char i = 0; i < fieldsCount; i++) { + fields.add(switch (FieldType.values()[buf.readByte()]) { + case FLOAT -> buf.readFloat(); + case DOUBLE -> buf.readDouble(); + case INT -> buf.readInt(); + case LONG -> buf.readLong(); + }); + } + assert buf.readableBytes() == 0; + return new LLFieldDoc(getDoc(buf), getScore(buf), getShardIndex(buf), fields); + } + + @Override + public int compare(LLFieldDoc hitA, LLFieldDoc hitB) { + if (hitA.score() == hitB.score()) { + if (hitA.doc() == hitB.doc()) { + return Integer.compare(hitA.shardIndex(), hitB.shardIndex()); + } else { + return Integer.compare(hitB.doc(), hitA.doc()); + } + } else { + return Float.compare(hitA.score(), hitB.score()); + } + } + + @Override + public int compareDirect(ByteBuf hitA, ByteBuf hitB) { + var scoreA = getScore(hitA); + var scoreB = getScore(hitB); + if (scoreA == scoreB) { + var docA = getDoc(hitA); + var docB = getDoc(hitB); + if (docA == docB) { + return Integer.compare(getShardIndex(hitA), getShardIndex(hitB)); + } else { + return Integer.compare(docB, docA); + } + } else { + return Float.compare(scoreA, scoreB); + } + } + + private static float getScore(ByteBuf hit) { + return hit.getFloat(0); + } + + private static int getDoc(ByteBuf hit) { + return hit.getInt(Float.BYTES); + } + + private static int getShardIndex(ByteBuf hit) { + return hit.getInt(Float.BYTES + Integer.BYTES); + } + + private char getFieldsCount(ByteBuf hit) { + return hit.getChar(Float.BYTES + Integer.BYTES + Integer.BYTES); + } + + private static void setScore(ByteBuf hit, float score) { + hit.setFloat(0, score); + } + + private static void setDoc(ByteBuf hit, int doc) { + hit.setInt(Float.BYTES, doc); + } + + private static void setShardIndex(ByteBuf hit, int shardIndex) { + hit.setInt(Float.BYTES + Integer.BYTES, shardIndex); + } + + private void setFieldsCount(ByteBuf hit, int size) { + hit.setChar(Float.BYTES + Integer.BYTES + Integer.BYTES, (char) size); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLScoreDoc.java b/src/main/java/it/cavallium/dbengine/lucene/LLScoreDoc.java new file mode 100644 index 0000000..77a0262 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LLScoreDoc.java @@ -0,0 +1,10 @@ +package it.cavallium.dbengine.lucene; + +import org.apache.lucene.search.ScoreDoc; + +public record LLScoreDoc(int doc, float score, int shardIndex) implements LLDocElement { + + public ScoreDoc toScoreDoc() { + return new ScoreDoc(doc, score, shardIndex); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLScoreDocCodec.java b/src/main/java/it/cavallium/dbengine/lucene/LLScoreDocCodec.java new file mode 100644 index 0000000..795e454 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LLScoreDocCodec.java @@ -0,0 +1,76 @@ +package it.cavallium.dbengine.lucene; + +import io.net5.buffer.ByteBuf; +import java.util.function.Function; + +public class LLScoreDocCodec implements LMDBCodec { + + @Override + public ByteBuf serialize(Function allocator, LLScoreDoc data) { + var buf = allocator.apply(Float.BYTES + Integer.BYTES + Integer.BYTES); + setScore(buf, data.score()); + setDoc(buf, data.doc()); + setShardIndex(buf, data.shardIndex()); + buf.writerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES); + return buf.asReadOnly(); + } + + @Override + public LLScoreDoc deserialize(ByteBuf buf) { + return new LLScoreDoc(getDoc(buf), getScore(buf), getShardIndex(buf)); + } + + @Override + public int compare(LLScoreDoc hitA, LLScoreDoc hitB) { + if (hitA.score() == hitB.score()) { + if (hitA.doc() == hitB.doc()) { + return Integer.compare(hitA.shardIndex(), hitB.shardIndex()); + } else { + return Integer.compare(hitB.doc(), hitA.doc()); + } + } else { + return Float.compare(hitA.score(), hitB.score()); + } + } + + @Override + public int compareDirect(ByteBuf hitA, ByteBuf hitB) { + var scoreA = getScore(hitA); + var scoreB = getScore(hitB); + if (scoreA == scoreB) { + var docA = getDoc(hitA); + var docB = getDoc(hitB); + if (docA == docB) { + return Integer.compare(getShardIndex(hitA), getShardIndex(hitB)); + } else { + return Integer.compare(docB, docA); + } + } else { + return Float.compare(scoreA, scoreB); + } + } + + private static float getScore(ByteBuf hit) { + return hit.getFloat(0); + } + + private static int getDoc(ByteBuf hit) { + return hit.getInt(Float.BYTES); + } + + private static int getShardIndex(ByteBuf hit) { + return hit.getInt(Float.BYTES + Integer.BYTES); + } + + private static void setScore(ByteBuf hit, float score) { + hit.setFloat(0, score); + } + + private static void setDoc(ByteBuf hit, int doc) { + hit.setInt(Float.BYTES, doc); + } + + private static void setShardIndex(ByteBuf hit, int shardIndex) { + hit.setInt(Float.BYTES + Integer.BYTES, shardIndex); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LMDBCodec.java b/src/main/java/it/cavallium/dbengine/lucene/LMDBCodec.java new file mode 100644 index 0000000..94b413e --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LMDBCodec.java @@ -0,0 +1,17 @@ +package it.cavallium.dbengine.lucene; + +import io.net5.buffer.ByteBuf; +import java.util.Comparator; +import java.util.function.Function; + +public interface LMDBCodec { + + ByteBuf serialize(Function allocator, T data); + + T deserialize(ByteBuf b); + + int compare(T o1, T o2); + + int compareDirect(ByteBuf o1, ByteBuf o2); + +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LMDBPriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/LMDBPriorityQueue.java new file mode 100644 index 0000000..a62661a --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LMDBPriorityQueue.java @@ -0,0 +1,403 @@ +package it.cavallium.dbengine.lucene; + +import static org.lmdbjava.DbiFlags.*; + +import io.net5.buffer.ByteBuf; +import io.net5.buffer.PooledByteBufAllocator; +import io.net5.buffer.Unpooled; +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import java.io.IOException; +import java.util.Iterator; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.jetbrains.annotations.NotNull; +import org.lmdbjava.Cursor; +import org.lmdbjava.CursorIterable; +import org.lmdbjava.CursorIterable.KeyVal; +import org.lmdbjava.Dbi; +import org.lmdbjava.Env; +import org.lmdbjava.PutFlags; +import org.lmdbjava.Txn; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuples; + +public class LMDBPriorityQueue implements PriorityQueue { + + private static final boolean FORCE_SYNC = false; + private static final boolean FORCE_THREAD_LOCAL = true; + + private static final AtomicLong NEXT_LMDB_QUEUE_ID = new AtomicLong(0); + private static final ByteBuf EMPTY = Unpooled.directBuffer(1, 1).writeByte(1).asReadOnly(); + + private final AtomicBoolean closed = new AtomicBoolean(); + private final Runnable onClose; + private final LMDBCodec codec; + private final Env env; + private final Dbi lmdb; + private final Scheduler scheduler = Schedulers.newBoundedElastic(1, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, LMDBThread::new, Integer.MAX_VALUE); + + private boolean writing; + private boolean iterating; + private Txn readTxn; + private Txn rwTxn; + private Cursor cur; + + private boolean topValid = true; + private T top = null; + private long size = 0; + + public LMDBPriorityQueue(LLTempLMDBEnv env, LMDBCodec codec) { + this.onClose = env::decrementRef; + var name = "$queue_" + NEXT_LMDB_QUEUE_ID.getAndIncrement(); + this.codec = codec; + this.env = env.getEnvAndIncrementRef(); + this.lmdb = this.env.openDbi(name, codec::compareDirect, MDB_CREATE); + + this.writing = true; + this.iterating = false; + if (FORCE_THREAD_LOCAL) { + this.rwTxn = null; + } else { + this.rwTxn = this.env.txnWrite(); + } + this.readTxn = null; + this.cur = null; + } + + private ByteBuf allocate(int size) { + return PooledByteBufAllocator.DEFAULT.directBuffer(size, size); + } + + private void switchToMode(boolean write, boolean wantCursor) { + if (iterating) { + throw new IllegalStateException("Tried to " + (write ? "write" : "read") + " while still iterating"); + } + boolean changedMode = false; + if (write) { + if (!writing) { + changedMode = true; + writing = true; + if (cur != null) { + cur.close(); + cur = null; + } + readTxn.close(); + readTxn = null; + assert rwTxn == null; + rwTxn = env.txnWrite(); + } else if (rwTxn == null) { + assert readTxn == null; + rwTxn = env.txnWrite(); + } + } else { + if (writing) { + changedMode = true; + writing = false; + if (cur != null) { + cur.close(); + cur = null; + } + if (rwTxn != null) { + rwTxn.commit(); + rwTxn.close(); + rwTxn = null; + } + if (FORCE_SYNC) { + env.sync(true); + } + assert rwTxn == null; + assert readTxn == null; + readTxn = env.txnRead(); + } + } + + if (cur == null) { + if (wantCursor) { + cur = lmdb.openCursor(Objects.requireNonNull(writing ? rwTxn : readTxn)); + } + } else { + if (changedMode) { + cur.close(); + cur = null; + } + } + } + + private void endMode() { + if (FORCE_THREAD_LOCAL) { + if (cur != null) { + cur.close(); + cur = null; + } + writing = true; + if (readTxn != null) { + readTxn.commit(); + readTxn.close(); + readTxn = null; + } + if (rwTxn != null) { + rwTxn.commit(); + rwTxn.close(); + rwTxn = null; + } + } + assert cur == null; + assert rwTxn == null; + assert readTxn == null; + } + + private static void ensureThread() { + } + + private static void ensureItThread() { + if (!(Thread.currentThread() instanceof LMDBThread)) { + throw new IllegalStateException("Must run in LMDB scheduler"); + } + } + + @Override + public void add(T element) { + ensureThread(); + switchToMode(true, false); + var buf = codec.serialize(this::allocate, element); + try { + if (lmdb.put(rwTxn, buf, EMPTY, PutFlags.MDB_NOOVERWRITE)) { + if (++size == 1) { + topValid = true; + top = element; + } else { + topValid = false; + } + } + } finally { + endMode(); + } + + assert topSingleValid(element); + } + + private boolean topSingleValid(T element) { + if (size == 1) { + var top = databaseTop(); + return codec.compare(top, element) == 0; + } else { + return true; + } + } + + @Override + public T top() { + ensureThread(); + if (topValid) { + return top; + } else { + var top = databaseTop(); + this.top = top; + topValid = true; + return top; + } + } + + private T databaseTop() { + ensureThread(); + switchToMode(false, true); + try { + if (cur.first()) { + return codec.deserialize(cur.key()); + } else { + return null; + } + } finally { + endMode(); + } + } + + @Override + public T pop() { + ensureThread(); + switchToMode(true, true); + try { + if (cur.first()) { + var data = codec.deserialize(cur.key()); + if (--size == 0) { + topValid = true; + top = null; + } else { + topValid = false; + } + cur.delete(); + return data; + } else { + return null; + } + } finally { + endMode(); + } + } + + @Override + public void updateTop() { + // do nothing + } + + @Override + public void updateTop(T newTop) { + ensureThread(); + assert codec.compare(newTop, databaseTop()) == 0; + } + + @Override + public long size() { + ensureThread(); + return size; + } + + @Override + public void clear() { + ensureThread(); + switchToMode(true, false); + try { + lmdb.drop(rwTxn); + topValid = true; + top = null; + size = 0; + } finally { + endMode(); + } + } + + @Override + public boolean remove(@NotNull T element) { + ensureThread(); + Objects.requireNonNull(element); + switchToMode(true, false); + var buf = codec.serialize(this::allocate, element); + try { + var deleted = lmdb.delete(rwTxn, buf); + if (deleted) { + if (topValid && codec.compare(top, element) == 0) { + if (--size == 0) { + top = null; + } + } else { + if (--size == 0) { + topValid = true; + top = null; + } else { + topValid = false; + } + } + } + return deleted; + } finally { + endMode(); + } + } + + @Override + public Flux iterate() { + return Flux + ., Iterator>>>generate(() -> { + ensureItThread(); + switchToMode(false, false); + iterating = true; + if (cur != null) { + cur.close(); + cur = null; + } + CursorIterable cit = lmdb.iterate(readTxn); + var it = cit.iterator(); + return Tuples.of(cit, it); + }, (t, sink) -> { + ensureItThread(); + var it = t.getT2(); + if (it.hasNext()) { + sink.next(codec.deserialize(it.next().key())); + } else { + sink.complete(); + } + return t; + }, t -> { + ensureItThread(); + var cit = t.getT1(); + cit.close(); + iterating = false; + endMode(); + }) + .subscribeOn(scheduler, false); + } + + @Override + public Flux iterate(long skips) { + return Flux + ., Iterator>, Long>>generate(() -> { + ensureItThread(); + switchToMode(false, false); + iterating = true; + if (cur != null) { + cur.close(); + cur = null; + } + CursorIterable cit = lmdb.iterate(readTxn); + var it = cit.iterator(); + return Tuples.of(cit, it, skips); + }, (t, sink) -> { + ensureItThread(); + var it = t.getT2(); + var remainingSkips = t.getT3(); + while (remainingSkips-- > 0 && it.hasNext()) { + it.next(); + } + if (it.hasNext()) { + sink.next(codec.deserialize(it.next().key())); + } else { + sink.complete(); + } + return t.getT3() == 0L ? t : t.mapT3(s -> 0L); + }, t -> { + ensureItThread(); + var cit = t.getT1(); + cit.close(); + iterating = false; + endMode(); + }) + .subscribeOn(scheduler, false); + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + try { + ensureThread(); + if (cur != null) { + cur.close(); + } + if (rwTxn != null) { + rwTxn.close(); + } + if (readTxn != null) { + readTxn.close(); + } + try (var txn = env.txnWrite()) { + lmdb.drop(txn, true); + txn.commit(); + } + lmdb.close(); + } finally { + onClose.run(); + } + } + scheduler.dispose(); + } + + public Scheduler getScheduler() { + return scheduler; + } + +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LMDBThread.java b/src/main/java/it/cavallium/dbengine/lucene/LMDBThread.java new file mode 100644 index 0000000..c98fd43 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LMDBThread.java @@ -0,0 +1,8 @@ +package it.cavallium.dbengine.lucene; + +public class LMDBThread extends Thread { + + public LMDBThread(Runnable r) { + super(r); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index ad9f299..50eff75 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -406,7 +406,8 @@ public class LuceneUtils { } } - public static TopDocs mergeTopDocs(Sort sort, + public static TopDocs mergeTopDocs( + @Nullable Sort sort, @Nullable Integer startN, @Nullable Integer topN, TopDocs[] topDocs, diff --git a/src/main/java/it/cavallium/dbengine/lucene/MaxScoreAccumulator.java b/src/main/java/it/cavallium/dbengine/lucene/MaxScoreAccumulator.java new file mode 100644 index 0000000..57bbac5 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/MaxScoreAccumulator.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package it.cavallium.dbengine.lucene; + +import java.util.Objects; +import java.util.concurrent.atomic.LongAccumulator; + +/** Maintains the maximum score and its corresponding document id concurrently */ +public final class MaxScoreAccumulator { + // we use 2^10-1 to check the remainder with a bitwise operation + static final int DEFAULT_INTERVAL = 0x3ff; + + // scores are always positive + final LongAccumulator acc = new LongAccumulator(Long::max, Long.MIN_VALUE); + + // non-final and visible for tests + public long modInterval; + + public MaxScoreAccumulator() { + this.modInterval = DEFAULT_INTERVAL; + } + + public void accumulate(int docID, float score) { + assert docID >= 0 && score >= 0; + long encode = (((long) Float.floatToIntBits(score)) << 32) | docID; + acc.accumulate(encode); + } + + public DocAndScore get() { + long value = acc.get(); + if (value == Long.MIN_VALUE) { + return null; + } + float score = Float.intBitsToFloat((int) (value >> 32)); + int docID = (int) value; + return new DocAndScore(docID, score); + } + + public static class DocAndScore implements Comparable { + public final int docID; + public final float score; + + public DocAndScore(int docID, float score) { + this.docID = docID; + this.score = score; + } + + @Override + public int compareTo(DocAndScore o) { + int cmp = Float.compare(score, o.score); + if (cmp == 0) { + return Integer.compare(docID, o.docID); + } + return cmp; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DocAndScore result = (DocAndScore) o; + return docID == result.docID && Float.compare(result.score, score) == 0; + } + + @Override + public int hashCode() { + return Objects.hash(docID, score); + } + + @Override + public String toString() { + return "DocAndScore{" + "docID=" + docID + ", score=" + score + '}'; + } + } +} \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/lucene/PqFullDocs.java b/src/main/java/it/cavallium/dbengine/lucene/PqFullDocs.java new file mode 100644 index 0000000..11fff6e --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/PqFullDocs.java @@ -0,0 +1,30 @@ +package it.cavallium.dbengine.lucene; + +import org.apache.lucene.search.TotalHits; +import reactor.core.publisher.Flux; + +public class PqFullDocs implements FullDocs { + + private final PriorityQueue pq; + private final TotalHits totalHits; + + public PqFullDocs(PriorityQueue pq, TotalHits totalHits) { + this.pq = pq; + this.totalHits = totalHits; + } + + @Override + public Flux iterate() { + return pq.iterate(); + } + + @Override + public Flux iterate(long skips) { + return pq.iterate(skips); + } + + @Override + public TotalHits totalHits() { + return totalHits; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/PriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/PriorityQueue.java new file mode 100644 index 0000000..328991e --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/PriorityQueue.java @@ -0,0 +1,64 @@ +package it.cavallium.dbengine.lucene; + +import java.io.Closeable; +import java.util.Iterator; + +public interface PriorityQueue extends ResourceIterable, Closeable { + + /** + * Adds an Object to a PriorityQueue in log(size) time. If one tries to add more objects than maxSize from initialize + * an {@link ArrayIndexOutOfBoundsException} is thrown. + */ + void add(T element); + + /** + * Returns the least element of the PriorityQueue in constant time. + */ + T top(); + + /** + * Removes and returns the least element of the PriorityQueue in log(size) time. + */ + T pop(); + + /** + * Should be called when the Object at top changes values. Still log(n) worst case, but it's at least twice as fast + * to + * + *
+	 * pq.top().change();
+	 * pq.updateTop();
+	 * 
+ *

+ * instead of + * + *

+	 * o = pq.pop();
+	 * o.change();
+	 * pq.push(o);
+	 * 
+ */ + void updateTop(); + + /** + * Replace the top of the pq with {@code newTop} and run {@link #updateTop()}. + */ + void updateTop(T newTop); + + /** + * Returns the number of elements currently stored in the PriorityQueue. + */ + long size(); + + /** + * Removes all entries from the PriorityQueue. + */ + void clear(); + + /** + * Removes an existing element currently stored in the PriorityQueue. Cost is linear with the size of the queue. (A + * specialization of PriorityQueue which tracks element positions would provide a constant remove time but the + * trade-off would be extra cost to all additions/insertions) + */ + boolean remove(T element); +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/RandomFieldComparator.java b/src/main/java/it/cavallium/dbengine/lucene/RandomFieldComparator.java index b66b607..8299277 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/RandomFieldComparator.java +++ b/src/main/java/it/cavallium/dbengine/lucene/RandomFieldComparator.java @@ -74,11 +74,7 @@ public class RandomFieldComparator extends FieldComparator implements Lea return scorer.docID(); } }; - if (!(scorer instanceof ScoreCachingWrappingScorer)) { - this.scorer = new ScoreCachingWrappingScorer(randomizedScorer); - } else { - this.scorer = randomizedScorer; - } + this.scorer = ScoreCachingWrappingScorer.wrap(randomizedScorer); } @SuppressWarnings("RedundantCast") diff --git a/src/main/java/it/cavallium/dbengine/lucene/ResourceIterable.java b/src/main/java/it/cavallium/dbengine/lucene/ResourceIterable.java new file mode 100644 index 0000000..31aa941 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/ResourceIterable.java @@ -0,0 +1,24 @@ +package it.cavallium.dbengine.lucene; + +import java.util.Iterator; +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Flux; + +public interface ResourceIterable { + + /** + * Iterate this PriorityQueue + */ + Flux iterate(); + + /** + * Iterate this PriorityQueue + */ + default Flux iterate(long skips) { + if (skips == 0) { + return iterate(); + } else { + return iterate().skip(skips); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/ScoreDocPartialComparator.java b/src/main/java/it/cavallium/dbengine/lucene/ScoreDocPartialComparator.java new file mode 100644 index 0000000..ecaab48 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/ScoreDocPartialComparator.java @@ -0,0 +1,18 @@ +package it.cavallium.dbengine.lucene; + +import java.util.Comparator; +import org.apache.lucene.search.ScoreDoc; + +class ScoreDocPartialComparator implements Comparator { + + public static final Comparator SCORE_DOC_PARTIAL_COMPARATOR = new ScoreDocPartialComparator(); + + @Override + public int compare(ScoreDoc hitA, ScoreDoc hitB) { + if (hitA.score == hitB.score) { + return Integer.compare(hitB.doc, hitA.doc); + } else { + return Float.compare(hitA.score, hitB.score); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/ScoreDocShardComparator.java b/src/main/java/it/cavallium/dbengine/lucene/ScoreDocShardComparator.java new file mode 100644 index 0000000..84203df --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/ScoreDocShardComparator.java @@ -0,0 +1,21 @@ +package it.cavallium.dbengine.lucene; + +import java.util.Comparator; + +class ScoreDocShardComparator implements Comparator { + + public static final Comparator SCORE_DOC_SHARD_COMPARATOR = new ScoreDocShardComparator(); + + @Override + public int compare(LLScoreDoc hitA, LLScoreDoc hitB) { + if (hitA.score() == hitB.score()) { + if (hitA.doc() == hitB.doc()) { + return Integer.compare(hitA.shardIndex(), hitB.shardIndex()); + } else { + return Integer.compare(hitB.doc(), hitA.doc()); + } + } else { + return Float.compare(hitA.score(), hitB.score()); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/StringIndicizer.java b/src/main/java/it/cavallium/dbengine/lucene/StringIndicizer.java new file mode 100644 index 0000000..e187538 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/StringIndicizer.java @@ -0,0 +1,68 @@ +package it.cavallium.dbengine.lucene; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import it.cavallium.dbengine.client.Indicizer; +import it.cavallium.dbengine.client.IndicizerAnalyzers; +import it.cavallium.dbengine.client.IndicizerSimilarities; +import it.cavallium.dbengine.database.LLDocument; +import it.cavallium.dbengine.database.LLItem; +import it.cavallium.dbengine.database.LLTerm; +import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; +import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; +import java.util.LinkedList; +import java.util.Map; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.Field.Store; +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Mono; + +public class StringIndicizer extends Indicizer { + + @Override + public @NotNull Mono toDocument(@NotNull String key, @NotNull String value) { + return Mono.fromCallable(() -> { + var fields = new LinkedList(); + fields.add(LLItem.newStringField("uid", key, Field.Store.YES)); + fields.add(LLItem.newTextField("text", value, Store.NO)); + @SuppressWarnings("UnstableApiUsage") + var numInt = Ints.tryParse(value); + if (numInt != null) { + fields.add(LLItem.newIntPoint("intpoint", numInt)); + fields.add(LLItem.newSortedNumericDocValuesField("intsort", numInt)); + } + @SuppressWarnings("UnstableApiUsage") + var numLong = Longs.tryParse(value); + if (numLong != null) { + fields.add(LLItem.newLongPoint("longpoint", numLong)); + fields.add(LLItem.newSortedNumericDocValuesField("longsort", numLong)); + } + return new LLDocument(fields.toArray(LLItem[]::new)); + }); + } + + @Override + public @NotNull LLTerm toIndex(@NotNull String key) { + return new LLTerm("uid", key); + } + + @Override + public @NotNull String getKeyFieldName() { + return "uid"; + } + + @Override + public @NotNull String getKey(String key) { + return key; + } + + @Override + public IndicizerAnalyzers getPerFieldAnalyzer() { + return IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple); + } + + @Override + public IndicizerSimilarities getPerFieldSimilarity() { + return IndicizerSimilarities.of(TextFieldsSimilarity.Boolean); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java new file mode 100644 index 0000000..e197298 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package it.cavallium.dbengine.lucene.collector; + +import it.cavallium.dbengine.lucene.EmptyPriorityQueue; +import it.cavallium.dbengine.lucene.FullDocs; +import it.cavallium.dbengine.lucene.LLDocElement; +import it.cavallium.dbengine.lucene.PqFullDocs; +import it.cavallium.dbengine.lucene.PriorityQueue; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHits; + +/** + * A base class for all collectors that return a {@link TopDocs} output. This collector allows easy + * extension by providing a single constructor which accepts a {@link PriorityQueue} as well as + * protected members for that priority queue and a counter of the number of total hits.
+ * Extending classes can override any of the methods to provide their own implementation, as well as + * avoid the use of the priority queue entirely by passing null to {@link + * #FullDocsCollector(PriorityQueue)}. In that case however, you might want to consider overriding + * all methods, in order to avoid a NullPointerException. + */ +public abstract class FullDocsCollector implements Collector, AutoCloseable { + + /** + * This is used in case topDocs() is called with illegal parameters, or there simply aren't + * (enough) results. + */ + private static final FullDocs EMPTY_FULLDOCS = + new PqFullDocs(new EmptyPriorityQueue<>(), new TotalHits(0, TotalHits.Relation.EQUAL_TO)); + + /** + * The priority queue which holds the top documents. Note that different implementations of + * PriorityQueue give different meaning to 'top documents'. HitQueue for example aggregates the + * top scoring documents, while other PQ implementations may hold documents sorted by other + * criteria. + */ + protected final PriorityQueue pq; + + /** The total number of documents that the collector encountered. */ + protected int totalHits; + + /** Whether {@link #totalHits} is exact or a lower bound. */ + protected TotalHits.Relation totalHitsRelation = TotalHits.Relation.EQUAL_TO; + + protected FullDocsCollector(PriorityQueue pq) { + this.pq = pq; + } + + /** The total number of documents that matched this query. */ + public int getTotalHits() { + return totalHits; + } + + /** Returns the top docs that were collected by this collector. */ + public FullDocs fullDocs() { + return new PqFullDocs<>(this.pq, new TotalHits(totalHits, totalHitsRelation)); + } + + @Override + public void close() throws Exception { + pq.close(); + } +} \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/HitsThresholdChecker.java b/src/main/java/it/cavallium/dbengine/lucene/collector/HitsThresholdChecker.java new file mode 100644 index 0000000..768be13 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/HitsThresholdChecker.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package it.cavallium.dbengine.lucene.collector; + +import java.util.concurrent.atomic.AtomicLong; +import org.apache.lucene.search.ScoreMode; + +/** Used for defining custom algorithms to allow searches to early terminate */ +abstract class HitsThresholdChecker { + /** Implementation of HitsThresholdChecker which allows global hit counting */ + private static class GlobalHitsThresholdChecker extends HitsThresholdChecker { + private final int totalHitsThreshold; + private final AtomicLong globalHitCount; + + public GlobalHitsThresholdChecker(int totalHitsThreshold) { + + if (totalHitsThreshold < 0) { + throw new IllegalArgumentException( + "totalHitsThreshold must be >= 0, got " + totalHitsThreshold); + } + + this.totalHitsThreshold = totalHitsThreshold; + this.globalHitCount = new AtomicLong(); + } + + @Override + public void incrementHitCount() { + globalHitCount.incrementAndGet(); + } + + @Override + public boolean isThresholdReached() { + return globalHitCount.getAcquire() > totalHitsThreshold; + } + + @Override + public ScoreMode scoreMode() { + return totalHitsThreshold == Integer.MAX_VALUE ? ScoreMode.COMPLETE : ScoreMode.TOP_SCORES; + } + + @Override + public int getHitsThreshold() { + return totalHitsThreshold; + } + } + + /** Default implementation of HitsThresholdChecker to be used for single threaded execution */ + private static class LocalHitsThresholdChecker extends HitsThresholdChecker { + private final int totalHitsThreshold; + private int hitCount; + + public LocalHitsThresholdChecker(int totalHitsThreshold) { + + if (totalHitsThreshold < 0) { + throw new IllegalArgumentException( + "totalHitsThreshold must be >= 0, got " + totalHitsThreshold); + } + + this.totalHitsThreshold = totalHitsThreshold; + } + + @Override + public void incrementHitCount() { + ++hitCount; + } + + @Override + public boolean isThresholdReached() { + return hitCount > totalHitsThreshold; + } + + @Override + public ScoreMode scoreMode() { + return totalHitsThreshold == Integer.MAX_VALUE ? ScoreMode.COMPLETE : ScoreMode.TOP_SCORES; + } + + @Override + public int getHitsThreshold() { + return totalHitsThreshold; + } + } + + /* + * Returns a threshold checker that is useful for single threaded searches + */ + public static HitsThresholdChecker create(final int totalHitsThreshold) { + return new LocalHitsThresholdChecker(totalHitsThreshold); + } + + /* + * Returns a threshold checker that is based on a shared counter + */ + public static HitsThresholdChecker createShared(final int totalHitsThreshold) { + return new GlobalHitsThresholdChecker(totalHitsThreshold); + } + + public abstract void incrementHitCount(); + + public abstract ScoreMode scoreMode(); + + public abstract int getHitsThreshold(); + + public abstract boolean isThresholdReached(); +} \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollector.java new file mode 100644 index 0000000..d0d3289 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollector.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package it.cavallium.dbengine.lucene.collector; + +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.FullDocs; +import it.cavallium.dbengine.lucene.LLScoreDoc; +import it.cavallium.dbengine.lucene.LLScoreDocCodec; +import it.cavallium.dbengine.lucene.LMDBPriorityQueue; +import it.cavallium.dbengine.lucene.MaxScoreAccumulator; +import java.io.IOException; +import java.util.Collection; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.LeafCollector; +import it.cavallium.dbengine.lucene.MaxScoreAccumulator.DocAndScore; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.TotalHits; + +/** + * A {@link Collector} implementation that collects the top-scoring hits, returning them as a {@link + * FullDocs}. This is used by {@link IndexSearcher} to implement {@link FullDocs}-based search. Hits + * are sorted by score descending and then (when the scores are tied) docID ascending. When you + * create an instance of this collector you should know in advance whether documents are going to be + * collected in doc Id order or not. + * + *

NOTE: The values {@link Float#NaN} and {@link Float#NEGATIVE_INFINITY} are not valid + * scores. This collector will not properly collect hits with such scores. + */ +public abstract class LMDBFullScoreDocCollector extends FullDocsCollector { + + /** Scorable leaf collector */ + public abstract static class ScorerLeafCollector implements LeafCollector { + + protected Scorable scorer; + + @Override + public void setScorer(Scorable scorer) throws IOException { + this.scorer = scorer; + } + } + + private static class SimpleLMDBFullScoreDocCollector extends LMDBFullScoreDocCollector { + + SimpleLMDBFullScoreDocCollector(LLTempLMDBEnv env, + HitsThresholdChecker hitsThresholdChecker, MaxScoreAccumulator minScoreAcc) { + super(env, hitsThresholdChecker, minScoreAcc); + } + + @Override + public LeafCollector getLeafCollector(LeafReaderContext context) { + // reset the minimum competitive score + docBase = context.docBase; + return new ScorerLeafCollector() { + + @Override + public void setScorer(Scorable scorer) throws IOException { + super.setScorer(scorer); + minCompetitiveScore = 0f; + updateMinCompetitiveScore(scorer); + if (minScoreAcc != null) { + updateGlobalMinCompetitiveScore(scorer); + } + } + + @Override + public void collect(int doc) throws IOException { + float score = scorer.score(); + + // This collector relies on the fact that scorers produce positive values: + assert score >= 0; // NOTE: false for NaN + + totalHits++; + hitsThresholdChecker.incrementHitCount(); + + if (minScoreAcc != null && (totalHits & minScoreAcc.modInterval) == 0) { + updateGlobalMinCompetitiveScore(scorer); + } + + var pqTop = pq.top(); + if (pqTop != null) { + if (score <= pqTop.score()) { + if (totalHitsRelation == TotalHits.Relation.EQUAL_TO) { + // we just reached totalHitsThreshold, we can start setting the min + // competitive score now + updateMinCompetitiveScore(scorer); + } + // Since docs are returned in-order (i.e., increasing doc Id), a document + // with equal score to pqTop.score cannot compete since HitQueue favors + // documents with lower doc Ids. Therefore reject those docs too. + return; + } + } + pq.add(new LLScoreDoc(doc + docBase, score, -1)); + pq.updateTop(); + updateMinCompetitiveScore(scorer); + } + }; + } + } + + /** + * Creates a new {@link LMDBFullScoreDocCollector} given the number of hits to collect and the number + * of hits to count accurately. + * + *

NOTE: If the total hit count of the top docs is less than or exactly {@code + * totalHitsThreshold} then this value is accurate. On the other hand, if the {@link + * FullDocs#totalHits} value is greater than {@code totalHitsThreshold} then its value is a lower + * bound of the hit count. A value of {@link Integer#MAX_VALUE} will make the hit count accurate + * but will also likely make query processing slower. + * + *

NOTE: The instances returned by this method pre-allocate a full array of length + * numHits, and fill the array with sentinel objects. + */ + public static LMDBFullScoreDocCollector create(LLTempLMDBEnv env, int totalHitsThreshold) { + return create(env, HitsThresholdChecker.create(totalHitsThreshold), null); + } + + static LMDBFullScoreDocCollector create( + LLTempLMDBEnv env, + HitsThresholdChecker hitsThresholdChecker, + MaxScoreAccumulator minScoreAcc) { + + if (hitsThresholdChecker == null) { + throw new IllegalArgumentException("hitsThresholdChecker must be non null"); + } + + return new SimpleLMDBFullScoreDocCollector(env, hitsThresholdChecker, minScoreAcc); + } + + /** + * Create a CollectorManager which uses a shared hit counter to maintain number of hits and a + * shared {@link MaxScoreAccumulator} to propagate the minimum score accross segments + */ + public static CollectorManager> createSharedManager( + LLTempLMDBEnv env, + int totalHitsThreshold) { + return new CollectorManager<>() { + + private final HitsThresholdChecker hitsThresholdChecker = + HitsThresholdChecker.createShared(totalHitsThreshold); + private final MaxScoreAccumulator minScoreAcc = new MaxScoreAccumulator(); + + @Override + public LMDBFullScoreDocCollector newCollector() { + return LMDBFullScoreDocCollector.create(env, hitsThresholdChecker, minScoreAcc); + } + + @Override + public FullDocs reduce(Collection collectors) { + @SuppressWarnings("unchecked") + final FullDocs[] fullDocs = new FullDocs[collectors.size()]; + int i = 0; + for (LMDBFullScoreDocCollector collector : collectors) { + fullDocs[i++] = collector.fullDocs(); + } + return FullDocs.merge(null, fullDocs); + } + }; + } + + int docBase; + final HitsThresholdChecker hitsThresholdChecker; + final MaxScoreAccumulator minScoreAcc; + float minCompetitiveScore; + + // prevents instantiation + LMDBFullScoreDocCollector(LLTempLMDBEnv env, + HitsThresholdChecker hitsThresholdChecker, MaxScoreAccumulator minScoreAcc) { + super(new LMDBPriorityQueue<>(env, new LLScoreDocCodec())); + assert hitsThresholdChecker != null; + + this.hitsThresholdChecker = hitsThresholdChecker; + this.minScoreAcc = minScoreAcc; + } + + @Override + public ScoreMode scoreMode() { + return hitsThresholdChecker.scoreMode(); + } + + protected void updateGlobalMinCompetitiveScore(Scorable scorer) throws IOException { + assert minScoreAcc != null; + DocAndScore maxMinScore = minScoreAcc.get(); + if (maxMinScore != null) { + // since we tie-break on doc id and collect in doc id order we can require + // the next float if the global minimum score is set on a document id that is + // smaller than the ids in the current leaf + float score = + docBase > maxMinScore.docID ? Math.nextUp(maxMinScore.score) : maxMinScore.score; + if (score > minCompetitiveScore) { + assert hitsThresholdChecker.isThresholdReached(); + scorer.setMinCompetitiveScore(score); + minCompetitiveScore = score; + totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO; + } + } + } + + protected void updateMinCompetitiveScore(Scorable scorer) throws IOException { + var pqTop = pq.top(); + if (hitsThresholdChecker.isThresholdReached() + && pqTop != null + && pqTop.score() != Float.NEGATIVE_INFINITY) { // -Infinity is the score of sentinels + // since we tie-break on doc id and collect in doc id order, we can require + // the next float + float localMinScore = Math.nextUp(pqTop.score()); + if (localMinScore > minCompetitiveScore) { + scorer.setMinCompetitiveScore(localMinScore); + totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO; + minCompetitiveScore = localMinScore; + if (minScoreAcc != null) { + // we don't use the next float but we register the document + // id so that other leaves can require it if they are after + // the current maximum + minScoreAcc.accumulate(pqTop.doc(), pqTop.score()); + } + } + } + } +} \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/lucene/UnscoredCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/UnscoredCollector.java similarity index 98% rename from src/main/java/it/cavallium/dbengine/lucene/UnscoredCollector.java rename to src/main/java/it/cavallium/dbengine/lucene/collector/UnscoredCollector.java index d6caaf7..898a8b0 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/UnscoredCollector.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/UnscoredCollector.java @@ -1,4 +1,4 @@ -package it.cavallium.dbengine.lucene; +package it.cavallium.dbengine.lucene.collector; import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.ALLOW_UNSCORED_PAGINATION_MODE; diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java index ea3a6ac..db315d0 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java @@ -35,6 +35,12 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher { true); } } + + @Override + public String getName() { + return "adaptivelocal"; + } + public Mono> transformedCollect(Mono> indexSearcher, LocalQueryParams queryParams, String keyFieldName, diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java index d6b65df..8476a95 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java @@ -4,15 +4,16 @@ import io.net5.buffer.api.Send; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; +import java.io.Closeable; +import java.io.IOException; import reactor.core.publisher.Mono; -public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher { +public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher, Closeable { private static final LuceneMultiSearcher count = new SimpleUnsortedUnscoredLuceneMultiSearcher(new CountLuceneLocalSearcher()); - private static final LuceneMultiSearcher scoredSimple - = new ScoredSimpleLuceneShardSearcher(); + private static final LuceneMultiSearcher scoredSimple = new ScoredSimpleLuceneMultiSearcher(); private static final LuceneMultiSearcher unsortedUnscoredPaged = new SimpleUnsortedUnscoredLuceneMultiSearcher(new SimpleLuceneLocalSearcher()); @@ -20,6 +21,12 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher { private static final LuceneMultiSearcher unsortedUnscoredContinuous = new UnsortedUnscoredContinuousLuceneMultiSearcher(); + private final UnsortedScoredFullLuceneMultiSearcher scoredFull; + + public AdaptiveLuceneMultiSearcher() throws IOException { + scoredFull = new UnsortedScoredFullLuceneMultiSearcher(); + } + @Override public Mono> collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, @@ -47,7 +54,11 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher { if (queryParams.limit() == 0) { return count.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } else if (queryParams.isSorted() || queryParams.isScored()) { - return scoredSimple.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + if (queryParams.isSorted() || realLimit <= (long) queryParams.pageLimits().getPageLimit(0)) { + return scoredSimple.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + } else { + return scoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + } } else if (realLimit <= (long) queryParams.pageLimits().getPageLimit(0)) { // Run single-page searches using the paged multi searcher return unsortedUnscoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); @@ -57,4 +68,14 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher { } }, true); } + + @Override + public void close() throws IOException { + scoredFull.close(); + } + + @Override + public String getName() { + return "adaptivemulti"; + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CalculatedResults.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CalculatedResults.java new file mode 100644 index 0000000..f4cef12 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CalculatedResults.java @@ -0,0 +1,7 @@ +package it.cavallium.dbengine.lucene.searcher; + +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.database.LLKeyScore; +import reactor.core.publisher.Flux; + +record CalculatedResults(TotalHitsCount totalHitsCount, Flux firstPageHitsFlux) {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java index 6342eef..ddbd9ba 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountLuceneLocalSearcher.java @@ -42,4 +42,9 @@ public class CountLuceneLocalSearcher implements LuceneLocalSearcher { .map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), null).send()) .doOnDiscard(Send.class, Send::close); } + + @Override + public String getName() { + return "count"; + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java index dfc5bb5..1a57498 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneLocalSearcher.java @@ -16,4 +16,10 @@ public interface LuceneLocalSearcher { LocalQueryParams queryParams, String keyFieldName, LLSearchTransformer transformer); + + /** + * Get the name of this searcher type + * @return searcher type name + */ + String getName(); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneMultiSearcher.java similarity index 91% rename from src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java rename to src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneMultiSearcher.java index 5001683..1e63219 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneMultiSearcher.java @@ -1,34 +1,38 @@ package it.cavallium.dbengine.lucene.searcher; -import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT; +import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS; import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT; import io.net5.buffer.api.Send; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; -import it.cavallium.dbengine.database.disk.LLLocalGroupedReactiveRocksIterator; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHits; +import org.apache.lucene.search.TotalHits.Relation; +import org.jetbrains.annotations.Nullable; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher { +public class ScoredSimpleLuceneMultiSearcher implements LuceneMultiSearcher { - protected static final Logger logger = LoggerFactory.getLogger(ScoredSimpleLuceneShardSearcher.class); + protected static final Logger logger = LoggerFactory.getLogger(ScoredSimpleLuceneMultiSearcher.class); - public ScoredSimpleLuceneShardSearcher() { + public ScoredSimpleLuceneMultiSearcher() { } @Override @@ -64,11 +68,7 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher { } private Sort getSort(LocalQueryParams queryParams) { - Sort luceneSort = queryParams.sort(); - if (luceneSort == null) { - luceneSort = Sort.RELEVANCE; - } - return luceneSort; + return queryParams.sort(); } /** @@ -175,8 +175,8 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher { if (resultsOffset < 0) { throw new IndexOutOfBoundsException(resultsOffset); } - if ((s.pageIndex() == 0 || s.last() != null) && s.remainingLimit() > 0) { - var sort = getSort(queryParams); + if (s.pageIndex() == 0 || (s.last() != null && s.remainingLimit() > 0)) { + @Nullable var sort = getSort(queryParams); var pageLimit = pageLimits.getPageLimit(s.pageIndex()); var after = (FieldDoc) s.last(); var totalHitsThreshold = LuceneUtils.totalHitsThreshold(); @@ -211,4 +211,9 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher { })) ); } + + @Override + public String getName() { + return "scoredsimplemulti"; + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java index 92011ad..5805e4b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java @@ -18,6 +18,7 @@ import reactor.core.scheduler.Schedulers; public class ScoringShardsCollectorManager implements CollectorManager { + @Nullable private final Sort sort; private final int numHits; private final FieldDoc after; @@ -26,7 +27,7 @@ public class ScoringShardsCollectorManager implements CollectorManager sharedCollectorManager; - public ScoringShardsCollectorManager(final Sort sort, + public ScoringShardsCollectorManager(@Nullable final Sort sort, final int numHits, final FieldDoc after, final int totalHitsThreshold, @@ -35,7 +36,7 @@ public class ScoringShardsCollectorManager implements CollectorManager new CurrentPageInfo(null, limit, 0)) - .handle((s, sink) -> this.searchPageSync(queryParams, indexSearchers, pagination, resultsOffset, s, sink)); + .just(currentPageInfo) + .handle((s, sink) -> this.searchPageSync(queryParams, indexSearchers, pagination, resultsOffset, s, sink)) + //defaultIfEmpty(new PageData(new TopDocs(new TotalHits(0, Relation.EQUAL_TO), new ScoreDoc[0]), currentPageInfo)) + .single(); } /** @@ -108,7 +119,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { CurrentPageInfo nextPageInfo = firstPageData.nextPageInfo(); return new FirstPageResults(totalHitsCount, firstPageHitsFlux, nextPageInfo); - }); + }).single(); } private Mono> computeOtherResults(Mono firstResultMono, @@ -125,7 +136,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { Flux combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux); return new LuceneSearchResult(totalHitsCount, combinedFlux, onClose).send(); - }); + }).single(); } /** @@ -162,7 +173,18 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { throw new IndexOutOfBoundsException(resultsOffset); } var currentPageLimit = queryParams.pageLimits().getPageLimit(s.pageIndex()); - if ((s.pageIndex() == 0 || s.last() != null) && s.remainingLimit() > 0) { + if (s.pageIndex() == 0 && s.remainingLimit() == 0) { + int count; + try { + count = indexSearchers.get(0).count(queryParams.query()); + } catch (IOException e) { + sink.error(e); + return EMPTY_STATUS; + } + var nextPageInfo = new CurrentPageInfo(null, 0, 1); + sink.next(new PageData(new TopDocs(new TotalHits(count, Relation.EQUAL_TO), new ScoreDoc[0]), nextPageInfo)); + return EMPTY_STATUS; + } else if (s.pageIndex() == 0 || (s.last() != null && s.remainingLimit() > 0)) { TopDocs pageTopDocs; try { TopDocsCollector collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(), diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java index 6b8bea3..e3e085d 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java @@ -100,4 +100,9 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea queryParams.scoreMode() ); } + + @Override + public String getName() { + return "simpleunsortedunscoredmulti"; + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java index 372d574..60171d8 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java @@ -2,28 +2,13 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.ALLOW_UNSCORED_PAGINATION_MODE; -import it.cavallium.dbengine.lucene.UnscoredCollector; -import java.io.IOException; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.misc.search.DiversifiedTopDocsCollector; -import org.apache.lucene.search.BulkScorer; -import org.apache.lucene.search.Collector; +import it.cavallium.dbengine.lucene.collector.UnscoredCollector; import org.apache.lucene.search.FieldDoc; -import org.apache.lucene.search.HitQueue; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.LeafCollector; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Sort; -import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocsCollector; import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TopScoreDocCollector; -import org.apache.lucene.search.TotalHits.Relation; -import reactor.core.scheduler.Schedulers; class TopDocsSearcher { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullLuceneMultiSearcher.java new file mode 100644 index 0000000..8fb6770 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullLuceneMultiSearcher.java @@ -0,0 +1,120 @@ +package it.cavallium.dbengine.lucene.searcher; + +import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.FullDocs; +import it.cavallium.dbengine.lucene.LLScoreDoc; +import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; +import java.io.Closeable; +import java.io.IOException; +import java.util.Objects; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Sort; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class UnsortedScoredFullLuceneMultiSearcher implements LuceneMultiSearcher, Closeable { + + protected static final Logger logger = LoggerFactory.getLogger(UnsortedScoredFullLuceneMultiSearcher.class); + + private final LLTempLMDBEnv env; + + public UnsortedScoredFullLuceneMultiSearcher() throws IOException { + this.env = new LLTempLMDBEnv(); + } + + @Override + public Mono> collectMulti(Mono> indexSearchersMono, + LocalQueryParams queryParams, + String keyFieldName, + LLSearchTransformer transformer) { + Mono queryParamsMono; + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono + .fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true); + } + + return queryParamsMono.flatMap(queryParams2 -> { + Objects.requireNonNull(queryParams2.scoreMode(), "ScoreMode must not be null"); + if (queryParams2.sort() != null && queryParams2.sort() != Sort.RELEVANCE) { + throw new IllegalArgumentException(UnsortedScoredFullLuceneMultiSearcher.this.getClass().getSimpleName() + + " doesn't support sorted queries"); + } + + return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this + // Search results + .search(indexSearchers.shards(), queryParams2) + // Compute the results + .transform(fullDocsMono -> this.computeResults(fullDocsMono, indexSearchers, + keyFieldName, queryParams2)) + // Ensure that one LuceneSearchResult is always returned + .single(), + false); + }); + } + + /** + * Search effectively the raw results + */ + private Mono> search(Iterable indexSearchers, + LocalQueryParams queryParams) { + return Mono + .fromCallable(() -> { + LLUtils.ensureBlocking(); + var totalHitsThreshold = LuceneUtils.totalHitsThreshold(); + return LMDBFullScoreDocCollector.createSharedManager(env, totalHitsThreshold); + }) + .flatMap(sharedManager -> Flux + .fromIterable(indexSearchers) + .flatMap(shard -> Mono.fromCallable(() -> { + LLUtils.ensureBlocking(); + var collector = sharedManager.newCollector(); + shard.search(queryParams.query(), collector); + return collector; + })) + .collectList() + .flatMap(collectors -> Mono.fromCallable(() -> { + LLUtils.ensureBlocking(); + return sharedManager.reduce(collectors); + })) + ); + } + + /** + * Compute the results, extracting useful data + */ + private Mono> computeResults(Mono> dataMono, + LLIndexSearchers indexSearchers, + String keyFieldName, + LocalQueryParams queryParams) { + return dataMono.map(data -> { + var totalHitsCount = LuceneUtils.convertTotalHitsCount(data.totalHits()); + + Flux hitsFlux = LuceneUtils + .convertHits(data.iterate(queryParams.offset()).map(LLScoreDoc::toScoreDoc), + indexSearchers.shards(), keyFieldName, true) + .take(queryParams.limit(), true); + + return new LuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers::close).send(); + }); + } + + @Override + public void close() throws IOException { + env.close(); + } + + @Override + public String getName() { + return "scoredfullmulti"; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java index 378b7ea..3be653b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java @@ -114,4 +114,9 @@ public class UnsortedUnscoredContinuousLuceneMultiSearcher implements LuceneMult queryParams.scoreMode() ); } + + @Override + public String getName() { + return "unsortedunscoredcontinuousmulti"; + } } diff --git a/src/main/java/org/lmdbjava/Net5ByteBufProxy.java b/src/main/java/org/lmdbjava/Net5ByteBufProxy.java new file mode 100644 index 0000000..7eea25d --- /dev/null +++ b/src/main/java/org/lmdbjava/Net5ByteBufProxy.java @@ -0,0 +1,154 @@ +/*- + * #%L + * LmdbJava + * %% + * Copyright (C) 2016 - 2021 The LmdbJava Open Source Project + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +package org.lmdbjava; + +import static io.net5.buffer.PooledByteBufAllocator.DEFAULT; +import static java.lang.Class.forName; +import static org.lmdbjava.UnsafeAccess.UNSAFE; + +import io.net5.buffer.ByteBuf; +import java.lang.reflect.Field; + +import io.net5.buffer.ByteBuf; +import io.net5.buffer.PooledByteBufAllocator; +import jnr.ffi.Pointer; + +/** + * A buffer proxy backed by Netty's {@link ByteBuf}. + * + *

+ * This class requires {@link UnsafeAccess} and netty-buffer must be in the + * classpath. + */ +public final class Net5ByteBufProxy extends BufferProxy { + + /** + * A proxy for using Netty {@link ByteBuf}. Guaranteed to never be null, + * although a class initialization exception will occur if an attempt is made + * to access this field when Netty is unavailable. + */ + public static final BufferProxy PROXY_NETTY = new Net5ByteBufProxy(); + + private static final int BUFFER_RETRIES = 10; + private static final String FIELD_NAME_ADDRESS = "memoryAddress"; + private static final String FIELD_NAME_LENGTH = "length"; + private static final String NAME = "io.net5.buffer.PooledUnsafeDirectByteBuf"; + private final long lengthOffset; + private final long addressOffset; + + private final PooledByteBufAllocator nettyAllocator; + + private Net5ByteBufProxy() { + this(DEFAULT); + } + + public Net5ByteBufProxy(final PooledByteBufAllocator allocator) { + this.nettyAllocator = allocator; + + try { + final ByteBuf initBuf = this.allocate(); + initBuf.release(); + final Field address = findField(NAME, FIELD_NAME_ADDRESS); + final Field length = findField(NAME, FIELD_NAME_LENGTH); + addressOffset = UNSAFE.objectFieldOffset(address); + lengthOffset = UNSAFE.objectFieldOffset(length); + } catch (final SecurityException e) { + throw new LmdbException("Field access error", e); + } + } + + static Field findField(final String c, final String name) { + Class clazz; + try { + clazz = forName(c); + } catch (final ClassNotFoundException e) { + throw new LmdbException(c + " class unavailable", e); + } + do { + try { + final Field field = clazz.getDeclaredField(name); + field.setAccessible(true); + return field; + } catch (final NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } while (clazz != null); + throw new LmdbException(name + " not found"); + } + + @Override + protected ByteBuf allocate() { + for (int i = 0; i < BUFFER_RETRIES; i++) { + final ByteBuf bb = nettyAllocator.directBuffer(); + if (NAME.equals(bb.getClass().getName())) { + return bb; + } else { + bb.release(); + } + } + throw new IllegalStateException("Netty buffer must be " + NAME); + } + + @Override + protected int compare(final ByteBuf o1, final ByteBuf o2) { + return o1.compareTo(o2); + } + + @Override + protected void deallocate(final ByteBuf buff) { + buff.release(); + } + + @Override + protected byte[] getBytes(final ByteBuf buffer) { + final byte[] dest = new byte[buffer.capacity()]; + buffer.getBytes(0, dest); + return dest; + } + + @Override + protected void in(final ByteBuf buffer, final Pointer ptr, final long ptrAddr) { + UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE, + buffer.writerIndex() - buffer.readerIndex()); + UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA, + buffer.memoryAddress() + buffer.readerIndex()); + } + + @Override + protected void in(final ByteBuf buffer, final int size, final Pointer ptr, + final long ptrAddr) { + UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE, + size); + UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA, + buffer.memoryAddress() + buffer.readerIndex()); + } + + @Override + protected ByteBuf out(final ByteBuf buffer, final Pointer ptr, + final long ptrAddr) { + final long addr = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA); + final long size = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE); + UNSAFE.putLong(buffer, addressOffset, addr); + UNSAFE.putInt(buffer, lengthOffset, (int) size); + buffer.writerIndex((int) size).readerIndex(0); + return buffer; + } +} diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index 1af7174..dd7faad 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -9,9 +9,12 @@ import io.net5.buffer.api.pool.MetricUtils; import io.net5.buffer.api.pool.PoolArenaMetric; import io.net5.buffer.api.pool.PooledBufferAllocator; import io.net5.util.internal.PlatformDependent; +import it.cavallium.dbengine.client.LuceneIndex; +import it.cavallium.dbengine.client.LuceneIndexImpl; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLKeyValueDatabase; +import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; @@ -23,6 +26,7 @@ import it.cavallium.dbengine.database.collections.SubStageGetterMap; import it.cavallium.dbengine.database.disk.MemorySegmentUtils; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; +import it.cavallium.dbengine.lucene.StringIndicizer; import java.nio.file.Path; import java.util.Map; import java.util.Objects; @@ -121,6 +125,9 @@ public class DbTestUtils { } public static record TempDb(TestAllocator allocator, LLDatabaseConnection connection, LLKeyValueDatabase db, + LLLuceneIndex luceneSingle, + LLLuceneIndex luceneMulti, + SwappableLuceneSearcher swappableLuceneSearcher, Path path) {} static boolean computeCanUseNettyDirect() { @@ -166,6 +173,10 @@ public class DbTestUtils { return database.getDictionary(name, updateMode); } + public static Mono> tempLuceneIndex(LLLuceneIndex index) { + return Mono.fromCallable(() -> new LuceneIndexImpl<>(index, new StringIndicizer())); + } + public enum MapType { MAP, diff --git a/src/test/java/it/cavallium/dbengine/ExpectedQueryType.java b/src/test/java/it/cavallium/dbengine/ExpectedQueryType.java new file mode 100644 index 0000000..a468571 --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/ExpectedQueryType.java @@ -0,0 +1,3 @@ +package it.cavallium.dbengine; + +record ExpectedQueryType(boolean shard, boolean sorted, boolean scored, boolean unlimited, boolean onlyCount) {} diff --git a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java index 8204a0a..3fe96aa 100644 --- a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java +++ b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java @@ -5,15 +5,26 @@ import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; import it.cavallium.dbengine.DbTestUtils.TempDb; import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.client.DatabaseOptions; +import it.cavallium.dbengine.client.IndicizerAnalyzers; +import it.cavallium.dbengine.client.IndicizerSimilarities; +import it.cavallium.dbengine.client.LuceneOptions; +import it.cavallium.dbengine.client.NRTCachingOptions; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; +import it.cavallium.dbengine.database.lucene.LuceneHacks; +import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; +import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; +import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneLocalSearcher; +import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneMultiSearcher; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicInteger; import reactor.core.publisher.Mono; @@ -23,6 +34,10 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator { private static final AtomicInteger dbId = new AtomicInteger(0); + private static final Optional NRT = Optional.empty(); + private static final LuceneOptions LUCENE_OPTS = new LuceneOptions(Map.of(), Duration.ofSeconds(5), Duration.ofSeconds(5), + false, true, Optional.empty(), true, NRT, -1, true, true); + @Override public Mono openTempDb(TestAllocator allocator) { boolean canUseNettyDirect = DbTestUtils.computeCanUseNettyDirect(); @@ -44,13 +59,33 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator { }) .subscribeOn(Schedulers.boundedElastic()) .then(new LLLocalDatabaseConnection(allocator.allocator(), wrkspcPath).connect()) - .flatMap(conn -> conn - .getDatabase("testdb", - List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), - new DatabaseOptions(Map.of(), true, false, true, false, true, canUseNettyDirect, canUseNettyDirect, -1) - ) - .map(db -> new TempDb(allocator, conn, db, wrkspcPath)) - ); + .flatMap(conn -> { + SwappableLuceneSearcher searcher = new SwappableLuceneSearcher(); + var luceneHacks = new LuceneHacks(() -> searcher, () -> searcher); + return Mono.zip( + conn.getDatabase("testdb", + List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), + new DatabaseOptions(Map.of(), true, false, true, false, + true, canUseNettyDirect, canUseNettyDirect, -1) + ), + conn.getLuceneIndex("testluceneindex1", + 1, + IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple), + IndicizerSimilarities.of(TextFieldsSimilarity.Boolean), + LUCENE_OPTS, + luceneHacks + ), + conn.getLuceneIndex("testluceneindex16", + 1, + IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple), + IndicizerSimilarities.of(TextFieldsSimilarity.Boolean), + LUCENE_OPTS, + luceneHacks + ), + Mono.just(searcher) + ) + .map(tuple -> new TempDb(allocator, conn, tuple.getT1(), tuple.getT2(), tuple.getT3(), tuple.getT4(), wrkspcPath)); + }); }); } diff --git a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java index 3001728..86b81ab 100644 --- a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java +++ b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java @@ -3,25 +3,59 @@ package it.cavallium.dbengine; import it.cavallium.dbengine.DbTestUtils.TempDb; import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.client.DatabaseOptions; +import it.cavallium.dbengine.client.IndicizerAnalyzers; +import it.cavallium.dbengine.client.IndicizerSimilarities; +import it.cavallium.dbengine.client.LuceneOptions; +import it.cavallium.dbengine.client.NRTCachingOptions; import it.cavallium.dbengine.database.Column; +import it.cavallium.dbengine.database.lucene.LuceneHacks; import it.cavallium.dbengine.database.memory.LLMemoryDatabaseConnection; +import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; +import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; +import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Optional; import reactor.core.publisher.Mono; public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator { + private static final Optional NRT = Optional.empty(); + private static final LuceneOptions LUCENE_OPTS = new LuceneOptions(Map.of(), Duration.ofSeconds(5), Duration.ofSeconds(5), + false, true, Optional.empty(), true, NRT, -1, true, true); + @Override public Mono openTempDb(TestAllocator allocator) { boolean canUseNettyDirect = DbTestUtils.computeCanUseNettyDirect(); return Mono .fromCallable(() -> new LLMemoryDatabaseConnection(allocator.allocator())) - .flatMap(conn -> conn - .getDatabase("testdb", - List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), - new DatabaseOptions(Map.of(), true, false, true, false, true, canUseNettyDirect, canUseNettyDirect, -1) - ) - .map(db -> new TempDb(allocator, conn, db, null))); + .flatMap(conn -> { + SwappableLuceneSearcher searcher = new SwappableLuceneSearcher(); + var luceneHacks = new LuceneHacks(() -> searcher, () -> searcher); + return Mono + .zip( + conn.getDatabase("testdb", + List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), + new DatabaseOptions(Map.of(), true, false, true, false, true, canUseNettyDirect, canUseNettyDirect, -1) + ), + conn.getLuceneIndex("testluceneindex1", + 1, + IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple), + IndicizerSimilarities.of(TextFieldsSimilarity.Boolean), + LUCENE_OPTS, + luceneHacks + ), + conn.getLuceneIndex("testluceneindex16", + 1, + IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple), + IndicizerSimilarities.of(TextFieldsSimilarity.Boolean), + LUCENE_OPTS, + luceneHacks + ), + Mono.just(searcher) + ) + .map(tuple -> new TempDb(allocator, conn, tuple.getT1(), tuple.getT2(), tuple.getT3(), tuple.getT4(), null)); + }); } @Override diff --git a/src/test/java/it/cavallium/dbengine/Scored.java b/src/test/java/it/cavallium/dbengine/Scored.java new file mode 100644 index 0000000..745a041 --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/Scored.java @@ -0,0 +1,3 @@ +package it.cavallium.dbengine; + +record Scored(String key, float score) {} diff --git a/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java b/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java new file mode 100644 index 0000000..c3f93ca --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/TestLuceneIndex.java @@ -0,0 +1,385 @@ +package it.cavallium.dbengine; + +import static it.cavallium.dbengine.DbTestUtils.destroyAllocator; +import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; +import static it.cavallium.dbengine.DbTestUtils.newAllocator; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +import it.cavallium.dbengine.DbTestUtils.TempDb; +import it.cavallium.dbengine.DbTestUtils.TestAllocator; +import it.cavallium.dbengine.client.LuceneIndex; +import it.cavallium.dbengine.client.MultiSort; +import it.cavallium.dbengine.client.SearchResultKey; +import it.cavallium.dbengine.client.SearchResultKeys; +import it.cavallium.dbengine.client.query.ClientQueryParams; +import it.cavallium.dbengine.client.query.ClientQueryParamsBuilder; +import it.cavallium.dbengine.client.query.QueryParser; +import it.cavallium.dbengine.client.query.current.data.MatchAllDocsQuery; +import it.cavallium.dbengine.client.query.current.data.MatchNoDocsQuery; +import it.cavallium.dbengine.client.query.current.data.NoSort; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.database.LLLuceneIndex; +import it.cavallium.dbengine.database.LLScoreMode; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneLocalSearcher; +import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneMultiSearcher; +import it.cavallium.dbengine.lucene.searcher.CountLuceneLocalSearcher; +import it.cavallium.dbengine.lucene.searcher.LuceneLocalSearcher; +import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher; +import it.cavallium.dbengine.lucene.searcher.UnsortedScoredFullLuceneMultiSearcher; +import it.cavallium.dbengine.lucene.searcher.ScoredSimpleLuceneMultiSearcher; +import it.cavallium.dbengine.lucene.searcher.SimpleLuceneLocalSearcher; +import it.cavallium.dbengine.lucene.searcher.SimpleUnsortedUnscoredLuceneMultiSearcher; +import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredContinuousLuceneMultiSearcher; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink.OverflowStrategy; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuples; + +public class TestLuceneIndex { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private TestAllocator allocator; + private TempDb tempDb; + private LLLuceneIndex luceneSingle; + private LLLuceneIndex luceneMulti; + + protected TemporaryDbGenerator getTempDbGenerator() { + return new MemoryTemporaryDbGenerator(); + } + + @BeforeEach + public void beforeEach() { + this.allocator = newAllocator(); + ensureNoLeaks(allocator.allocator(), false, false); + tempDb = Objects.requireNonNull(getTempDbGenerator().openTempDb(allocator).block(), "TempDB"); + luceneSingle = tempDb.luceneSingle(); + luceneMulti = tempDb.luceneMulti(); + } + + public static Stream provideArguments() { + return Stream.of(false, true).map(Arguments::of); + } + + private static final Flux multi = Flux.just(false, true); + private static final Flux scoreModes = Flux.just(LLScoreMode.NO_SCORES, + LLScoreMode.TOP_SCORES, + LLScoreMode.COMPLETE_NO_SCORES, + LLScoreMode.COMPLETE + ); + private static final Flux>> multiSort = Flux.just(MultiSort.topScore(), + MultiSort.randomSortField(), + MultiSort.noSort(), + MultiSort.docSort(), + MultiSort.numericSort("longsort", false), + MultiSort.numericSort("longsort", true) + ); + + private static Flux getSearchers(ExpectedQueryType info) { + return Flux.push(sink -> { + try { + if (info.shard()) { + sink.next(new AdaptiveLuceneMultiSearcher()); + if (info.onlyCount()) { + sink.next(new SimpleUnsortedUnscoredLuceneMultiSearcher(new CountLuceneLocalSearcher())); + } else { + sink.next(new ScoredSimpleLuceneMultiSearcher()); + if (!info.sorted()) { + sink.next(new UnsortedScoredFullLuceneMultiSearcher()); + } + if (!info.scored() && !info.sorted()) { + sink.next(new SimpleUnsortedUnscoredLuceneMultiSearcher(new SimpleLuceneLocalSearcher())); + sink.next(new UnsortedUnscoredContinuousLuceneMultiSearcher()); + } + } + } else { + sink.next(new AdaptiveLuceneLocalSearcher()); + if (info.onlyCount()) { + sink.next(new CountLuceneLocalSearcher()); + } else { + sink.next(new SimpleLuceneLocalSearcher()); + } + } + sink.complete(); + } catch (IOException e) { + sink.error(e); + } + }, OverflowStrategy.BUFFER); + } + + public static Stream provideQueryArgumentsScoreMode() { + return multi + .concatMap(shard -> scoreModes.map(scoreMode -> Tuples.of(shard, scoreMode))) + .map(tuple -> Arguments.of(tuple.toArray())) + .toStream(); + } + + public static Stream provideQueryArgumentsSort() { + return multi + .concatMap(shard -> multiSort.map(multiSort -> Tuples.of(shard, multiSort))) + .map(tuple -> Arguments.of(tuple.toArray())) + .toStream(); + } + + public static Stream provideQueryArgumentsScoreModeAndSort() { + return multi + .concatMap(shard -> scoreModes.map(scoreMode -> Tuples.of(shard, scoreMode))) + .concatMap(tuple -> multiSort.map(multiSort -> Tuples.of(tuple.getT1(), tuple.getT2(), multiSort))) + .map(tuple -> Arguments.of(tuple.toArray())) + .toStream(); + } + + @AfterEach + public void afterEach() { + getTempDbGenerator().closeTempDb(tempDb).block(); + ensureNoLeaks(allocator.allocator(), true, false); + destroyAllocator(allocator); + } + + private LuceneIndex getLuceneIndex(boolean shards, @Nullable LuceneLocalSearcher customSearcher) { + LuceneIndex index = run(DbTestUtils.tempLuceneIndex(shards ? luceneSingle : luceneMulti)); + index.updateDocument("test-key-1", "0123456789").block(); + index.updateDocument("test-key-2", "test 0123456789 test word").block(); + index.updateDocument("test-key-3", "0123456789 test example string").block(); + index.updateDocument("test-key-4", "hello world the quick brown fox jumps over the lazy dog").block(); + index.updateDocument("test-key-5", "hello the quick brown fox jumps over the lazy dog").block(); + index.updateDocument("test-key-6", "hello the quick brown fox jumps over the world dog").block(); + index.updateDocument("test-key-7", "the quick brown fox jumps over the world dog").block(); + index.updateDocument("test-key-8", "the quick brown fox jumps over the lazy dog").block(); + index.updateDocument("test-key-9", "Example1").block(); + index.updateDocument("test-key-10", "Example2").block(); + index.updateDocument("test-key-11", "Example3").block(); + index.updateDocument("test-key-12", "-234").block(); + index.updateDocument("test-key-13", "2111").block(); + index.updateDocument("test-key-14", "2999").block(); + index.updateDocument("test-key-15", "3902").block(); + Flux.range(1, 1000).concatMap(i -> index.updateDocument("test-key-" + (15 + i), "" + i)).blockLast(); + tempDb.swappableLuceneSearcher().setSingle(new CountLuceneLocalSearcher()); + tempDb.swappableLuceneSearcher().setMulti(new SimpleUnsortedUnscoredLuceneMultiSearcher(new CountLuceneLocalSearcher())); + assertCount(index, 1000 + 15); + try { + if (customSearcher != null) { + tempDb.swappableLuceneSearcher().setSingle(customSearcher); + if (shards) { + if (customSearcher instanceof LuceneMultiSearcher multiSearcher) { + tempDb.swappableLuceneSearcher().setMulti(multiSearcher); + } else { + throw new IllegalArgumentException("Expected a LuceneMultiSearcher, got a LuceneLocalSearcher: " + customSearcher.getName()); + } + } + } else { + tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLuceneLocalSearcher()); + tempDb.swappableLuceneSearcher().setMulti(new AdaptiveLuceneMultiSearcher()); + } + } catch (IOException e) { + fail(e); + } + return index; + } + + private void run(Flux publisher) { + publisher.subscribeOn(Schedulers.immediate()).blockLast(); + } + + private void runVoid(Mono publisher) { + publisher.then().subscribeOn(Schedulers.immediate()).block(); + } + + private T run(Mono publisher) { + return publisher.subscribeOn(Schedulers.immediate()).block(); + } + + private T run(boolean shouldFail, Mono publisher) { + return publisher.subscribeOn(Schedulers.immediate()).transform(mono -> { + if (shouldFail) { + return mono.onErrorResume(ex -> Mono.empty()); + } else { + return mono; + } + }).block(); + } + + private void runVoid(boolean shouldFail, Mono publisher) { + publisher.then().subscribeOn(Schedulers.immediate()).transform(mono -> { + if (shouldFail) { + return mono.onErrorResume(ex -> Mono.empty()); + } else { + return mono; + } + }).block(); + } + + private void assertCount(LuceneIndex luceneIndex, long expected) { + Assertions.assertEquals(expected, getCount(luceneIndex)); + } + + private long getCount(LuceneIndex luceneIndex) { + luceneIndex.refresh(true).block(); + var totalHitsCount = run(luceneIndex.count(null, new MatchAllDocsQuery())); + Assertions.assertTrue(totalHitsCount.exact(), "Can't get count because the total hits count is not exact"); + return totalHitsCount.value(); + } + + @Test + public void testNoOp() { + } + + @Test + public void testNoOpAllocation() { + for (int i = 0; i < 10; i++) { + var a = allocator.allocator().allocate(i * 512); + a.send().receive().close(); + } + } + + @ParameterizedTest + @MethodSource("provideArguments") + public void testGetLuceneIndex(boolean shards) { + var luceneIndex = getLuceneIndex(shards, null); + Assertions.assertNotNull(luceneIndex); + } + + @ParameterizedTest + @MethodSource("provideArguments") + public void testDeleteAll(boolean shards) { + var luceneIndex = getLuceneIndex(shards, null); + runVoid(luceneIndex.deleteAll()); + assertCount(luceneIndex, 0); + } + + @ParameterizedTest + @MethodSource("provideArguments") + public void testDelete(boolean shards) { + var luceneIndex = getLuceneIndex(shards, null); + var prevCount = getCount(luceneIndex); + runVoid(luceneIndex.deleteDocument("test-key-1")); + assertCount(luceneIndex, prevCount - 1); + } + + @ParameterizedTest + @MethodSource("provideArguments") + public void testUpdateSameDoc(boolean shards) { + var luceneIndex = getLuceneIndex(shards, null); + var prevCount = getCount(luceneIndex); + runVoid(luceneIndex.updateDocument("test-key-1", "new-value")); + assertCount(luceneIndex, prevCount ); + } + + @ParameterizedTest + @MethodSource("provideArguments") + public void testUpdateNewDoc(boolean shards) { + var luceneIndex = getLuceneIndex(shards, null); + var prevCount = getCount(luceneIndex); + runVoid(luceneIndex.updateDocument("test-key-new", "new-value")); + assertCount(luceneIndex, prevCount + 1); + } + + @ParameterizedTest + @MethodSource("provideQueryArgumentsScoreModeAndSort") + public void testSearchNoDocs(boolean shards, LLScoreMode scoreMode, MultiSort> multiSort) { + var searchers = run(getSearchers(new ExpectedQueryType(shards, isSorted(multiSort), isScored(scoreMode, multiSort), true, false)).collectList()); + for (LuceneLocalSearcher searcher : searchers) { + log.info("Using searcher \"{}\"", searcher.getName()); + + var luceneIndex = getLuceneIndex(shards, searcher); + ClientQueryParamsBuilder> queryBuilder = ClientQueryParams.builder(); + queryBuilder.query(new MatchNoDocsQuery()); + queryBuilder.snapshot(null); + queryBuilder.scoreMode(scoreMode); + queryBuilder.sort(multiSort); + var query = queryBuilder.build(); + try (var results = run(luceneIndex.search(query)).receive()) { + var hits = results.totalHitsCount(); + if (supportsPreciseHitsCount(searcher, query)) { + assertEquals(new TotalHitsCount(0, true), hits); + } + + var keys = getResults(results); + assertEquals(List.of(), keys); + } + } + } + + private boolean supportsPreciseHitsCount(LuceneLocalSearcher searcher, + ClientQueryParams> query) { + if (searcher instanceof UnsortedUnscoredContinuousLuceneMultiSearcher) { + return false; + } + var scored = isScored(query.scoreMode(), Objects.requireNonNullElse(query.sort(), MultiSort.noSort())); + var sorted = isSorted(Objects.requireNonNullElse(query.sort(), MultiSort.noSort())); + if (!sorted && !scored) { + if (searcher instanceof AdaptiveLuceneMultiSearcher || searcher instanceof AdaptiveLuceneLocalSearcher) { + return false; + } + } + return true; + } + + @ParameterizedTest + @MethodSource("provideQueryArgumentsScoreModeAndSort") + public void testSearchAllDocs(boolean shards, LLScoreMode scoreMode, MultiSort> multiSort) { + var searchers = run(getSearchers(new ExpectedQueryType(shards, isSorted(multiSort), isScored(scoreMode, multiSort), true, false)).collectList()); + for (LuceneLocalSearcher searcher : searchers) { + log.info("Using searcher \"{}\"", searcher.getName()); + + var luceneIndex = getLuceneIndex(shards, searcher); + ClientQueryParamsBuilder> queryBuilder = ClientQueryParams.builder(); + queryBuilder.query(new MatchNoDocsQuery()); + queryBuilder.snapshot(null); + queryBuilder.scoreMode(scoreMode); + queryBuilder.sort(multiSort); + var query = queryBuilder.build(); + try (var results = run(luceneIndex.search(query)).receive()) { + var hits = results.totalHitsCount(); + if (supportsPreciseHitsCount(searcher, query)) { + assertEquals(new TotalHitsCount(0, true), hits); + } + + var keys = getResults(results); + assertEquals(List.of(), keys); + } + } + } + + private boolean isSorted(MultiSort> multiSort) { + return !(multiSort.getQuerySort() instanceof NoSort); + } + + private boolean isScored(LLScoreMode scoreMode, MultiSort> multiSort) { + var needsScores = LLUtils.toScoreMode(scoreMode).needsScores(); + var sort =QueryParser.toSort(multiSort.getQuerySort()); + if (sort != null) { + needsScores |= sort.needsScores(); + } + return needsScores; + } + + private List getResults(SearchResultKeys results) { + return run(results + .results() + .flatMapSequential(searchResultKey -> searchResultKey + .key() + .single() + .map(key -> new Scored(key, searchResultKey.score())) + ) + .collectList()); + } + +}