From ed00d474d66df20723096c20b00afdc077363598 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 18 Nov 2021 17:13:53 +0100 Subject: [PATCH] Implement numeric buckets collector --- .../dbengine/client/LuceneIndex.java | 4 + .../dbengine/client/LuceneIndexImpl.java | 13 ++ .../dbengine/database/LLLuceneIndex.java | 7 + .../database/disk/LLLocalLuceneIndex.java | 21 +++ .../disk/LLLocalMultiLuceneIndex.java | 17 ++ .../DecimalBucketCollectorMultiManager.java | 158 ++++++++++++++++++ .../lucene/searcher/BucketParams.java | 6 + .../searcher/DecimalBucketMultiSearcher.java | 74 ++++++++ 8 files changed, 300 insertions(+) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketCollectorMultiManager.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/BucketParams.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java index 3f6c43e..83cd77c 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java @@ -8,6 +8,8 @@ import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLSnapshottable; import it.cavallium.dbengine.database.collections.ValueGetter; import it.cavallium.dbengine.database.collections.ValueTransformer; +import it.cavallium.dbengine.lucene.searcher.BucketParams; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; import java.util.Map.Entry; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -53,6 +55,8 @@ public interface LuceneIndex extends LLSnapshottable { Mono>> search(ClientQueryParams queryParams); + Mono computeBuckets(ClientQueryParams queryParams, BucketParams bucketParams); + Mono count(@Nullable CompositeSnapshot snapshot, Query query); boolean isLowMemoryMode(); diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 70e723e..6361ace 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -3,11 +3,14 @@ package it.cavallium.dbengine.client; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.query.ClientQueryParams; import it.cavallium.dbengine.client.query.current.data.Query; +import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; +import it.cavallium.dbengine.lucene.searcher.BucketParams; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -109,6 +112,16 @@ public class LuceneIndexImpl implements LuceneIndex { .single(); } + @Override + public Mono computeBuckets(ClientQueryParams queryParams, BucketParams bucketParams) { + return luceneIndex + .computeBuckets(resolveSnapshot(queryParams.snapshot()), + queryParams.toQueryParams(), + bucketParams + ) + .single(); + } + private Hits> mapResults(LLSearchResultShard llSearchResult) { var scoresWithKeysFlux = llSearchResult .results() diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index b1103d3..df3e587 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -7,6 +7,8 @@ import it.cavallium.dbengine.client.query.current.data.NoSort; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.lucene.searcher.BucketParams; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -55,6 +57,11 @@ public interface LLLuceneIndex extends LLSnapshottable { */ Mono search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName); + /** + * @return buckets with each value collected into one of the buckets + */ + Mono computeBuckets(@Nullable LLSnapshot snapshot, QueryParams queryParams, BucketParams bucketParams); + default Mono count(@Nullable LLSnapshot snapshot, Query query) { QueryParams params = QueryParams.of(query, 0, 0, Nullablefloat.empty(), NoSort.of(), false); return Mono.from(this.search(snapshot, params, null) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index d81548e..d37bed5 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -6,6 +6,7 @@ import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANS import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; +import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.DirectIOOptions; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; @@ -26,11 +27,15 @@ import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher; +import it.cavallium.dbengine.lucene.searcher.BucketParams; +import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; import java.io.IOException; import java.nio.file.Path; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -72,6 +77,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { protected static final Logger logger = LoggerFactory.getLogger(LLLocalLuceneIndex.class); private final LocalSearcher localSearcher; + private final DecimalBucketMultiSearcher decimalBucketMultiSearcher = new DecimalBucketMultiSearcher(); /** * Global lucene index scheduler. * There is only a single thread globally to not overwhelm the disk with @@ -388,6 +394,21 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .doOnDiscard(Resource.class, Resource::close); } + @Override + public Mono computeBuckets(@Nullable LLSnapshot snapshot, + QueryParams queryParams, + BucketParams bucketParams) { + LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer); + var searchers = searcherManager + .retrieveSearcher(snapshot) + .map(indexSearcher -> LLIndexSearchers.unsharded(indexSearcher).send()); + + return decimalBucketMultiSearcher + .collectMulti(searchers, bucketParams, localQueryParams, NO_TRANSFORMATION) + .doOnDiscard(Send.class, Send::close) + .doOnDiscard(Resource.class, Resource::close); + } + public Mono> retrieveSearcher(@Nullable LLSnapshot snapshot) { return searcherManager .retrieveSearcher(snapshot) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 4de4892..c2040da 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -17,9 +17,12 @@ import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher; +import it.cavallium.dbengine.lucene.searcher.BucketParams; +import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.MultiSearcher; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; @@ -51,6 +54,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { private final PerFieldSimilarityWrapper luceneSimilarity; private final MultiSearcher multiSearcher; + private final DecimalBucketMultiSearcher decimalBucketMultiSearcher = new DecimalBucketMultiSearcher(); public LLLocalMultiLuceneIndex(LLTempLMDBEnv env, Path lucene, @@ -243,6 +247,19 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { .doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close); } + @Override + public Mono computeBuckets(@Nullable LLSnapshot snapshot, + QueryParams queryParams, + BucketParams bucketParams) { + LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer); + var searchers = getIndexSearchers(snapshot); + + // Collect all the shards results into a single global result + return decimalBucketMultiSearcher + .collectMulti(searchers, bucketParams, localQueryParams, LLSearchTransformer.NO_TRANSFORMATION) + .doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close); + } + @Override public Mono close() { return Flux diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketCollectorMultiManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketCollectorMultiManager.java new file mode 100644 index 0000000..e39d752 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketCollectorMultiManager.java @@ -0,0 +1,158 @@ +package it.cavallium.dbengine.lucene.collector; + +import it.cavallium.dbengine.lucene.LuceneUtils; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.lucene.document.DocumentStoredFieldVisitor; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.SimpleCollector; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopFieldDocs; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class DecimalBucketCollectorMultiManager implements CollectorMultiManager { + + + private final String bucketField; + @Nullable + private final String valueField; + private final Set fieldsToLoad; + + private final double totalLength; + private final double bucketLength; + private final double minimum; + private final double maximum; + private final int buckets; + + public DecimalBucketCollectorMultiManager(double minimum, + double maximum, + double buckets, + String bucketField, + @Nullable String valueField) { + var bucketsInt = (int) Math.ceil(buckets); + this.minimum = minimum; + this.maximum = maximum; + this.buckets = bucketsInt; + this.bucketLength = (maximum - minimum) / bucketsInt; + this.totalLength = bucketLength * bucketsInt; + this.bucketField = bucketField; + this.valueField = valueField; + if (valueField != null) { + this.fieldsToLoad = Set.of(bucketField, valueField); + } else { + this.fieldsToLoad = Set.of(bucketField); + } + } + + public double[] newBuckets() { + return new double[buckets]; + } + + public CollectorManager get(IndexSearcher indexSearcher) { + + return new CollectorManager<>() { + @Override + public BucketsCollector newCollector() { + return new BucketsCollector(indexSearcher); + } + + @Override + public DoubleArrayList reduce(Collection collectors) { + double[] reducedBuckets = newBuckets(); + for (BucketsCollector collector : collectors) { + var buckets = collector.getBuckets(); + assert reducedBuckets.length == buckets.length; + for (int i = 0; i < buckets.length; i++) { + reducedBuckets[i] += buckets[i]; + } + } + return DoubleArrayList.wrap(reducedBuckets); + } + }; + } + + @Override + public ScoreMode scoreMode() { + throw new NotImplementedException(); + } + + @Override + public DoubleArrayList reduce(List reducedBucketsList) { + if (reducedBucketsList.size() == 1) { + return reducedBucketsList.get(0); + } + double[] reducedBuckets = newBuckets(); + for (DoubleArrayList buckets : reducedBucketsList) { + for (int i = 0; i < buckets.size(); i++) { + reducedBuckets[i] += buckets.getDouble(i); + } + } + return DoubleArrayList.wrap(reducedBuckets); + } + + private class BucketsCollector extends SimpleCollector { + + private final IndexSearcher indexSearcher; + private final DocumentStoredFieldVisitor documentStoredFieldVisitor; + private final double[] buckets; + + public BucketsCollector(IndexSearcher indexSearcher) { + super(); + this.indexSearcher = indexSearcher; + this.documentStoredFieldVisitor = new DocumentStoredFieldVisitor(fieldsToLoad); + this.buckets = newBuckets(); + } + + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE_NO_SCORES; + } + + @Override + public void collect(int doc) throws IOException { + indexSearcher.doc(doc, documentStoredFieldVisitor); + var document = documentStoredFieldVisitor.getDocument(); + var bucketField = document.getField(DecimalBucketCollectorMultiManager.this.bucketField); + IndexableField valueField; + if (DecimalBucketCollectorMultiManager.this.valueField != null) { + valueField = document.getField(DecimalBucketCollectorMultiManager.this.valueField); + } else { + valueField = null; + } + var bucketValue = bucketField.numericValue().doubleValue(); + if (bucketValue >= minimum && bucketValue <= maximum) { + double value; + if (valueField != null) { + value = valueField.numericValue().doubleValue(); + } else { + value = 1.0d; + } + double bucketIndex = (bucketValue - minimum) / bucketLength; + int bucketIndexLow = (int) Math.floor(bucketIndex); + double ratio = (bucketIndex - bucketIndexLow); + assert ratio >= 0 && ratio <= 1; + double loValue = value * (1d - ratio); + double hiValue = value * ratio; + buckets[bucketIndexLow] += loValue; + if (hiValue > 0d) { + buckets[bucketIndexLow + 1] += hiValue; + } + } + } + + public double[] getBuckets() { + return buckets; + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/BucketParams.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/BucketParams.java new file mode 100644 index 0000000..54aaba0 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/BucketParams.java @@ -0,0 +1,6 @@ +package it.cavallium.dbengine.lucene.searcher; + +import org.jetbrains.annotations.Nullable; + +public record BucketParams(double min, double max, int buckets, String bucketFieldName, + @Nullable String valueFieldName) {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java new file mode 100644 index 0000000..a7e0171 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java @@ -0,0 +1,74 @@ +package it.cavallium.dbengine.lucene.searcher; + +import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.FullDocs; +import it.cavallium.dbengine.lucene.LLFieldDoc; +import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.collector.DecimalBucketCollectorMultiManager; +import it.cavallium.dbengine.lucene.collector.LMDBFullFieldDocCollector; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import org.apache.lucene.search.IndexSearcher; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class DecimalBucketMultiSearcher { + + protected static final Logger logger = LoggerFactory.getLogger(DecimalBucketMultiSearcher.class); + + public Mono collectMulti(Mono> indexSearchersMono, + BucketParams bucketParams, + LocalQueryParams queryParams, + LLSearchTransformer transformer) { + Mono queryParamsMono; + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono + .fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true); + } + + return queryParamsMono.flatMap(queryParams2 -> LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this + // Search results + .search(indexSearchers.shards(), bucketParams, queryParams2) + // Ensure that one result is always returned + .single(), + true)); + } + + private Mono search(Iterable indexSearchers, + BucketParams bucketParams, + LocalQueryParams queryParams) { + return Mono + .fromCallable(() -> { + LLUtils.ensureBlocking(); + return new DecimalBucketCollectorMultiManager(bucketParams.min(), + bucketParams.max(), + bucketParams.buckets(), + bucketParams.bucketFieldName(), + bucketParams.valueFieldName() + ); + }) + .flatMap(cmm -> Flux + .fromIterable(indexSearchers) + .flatMap(shard -> Mono.fromCallable(() -> { + LLUtils.ensureBlocking(); + + var collector = cmm.get(shard); + + return shard.search(queryParams.query(), collector); + })) + .collectList() + .flatMap(results -> Mono.fromCallable(() -> { + LLUtils.ensureBlocking(); + return cmm.reduce(results); + })) + ); + } +}