From 29d9aad8bf29035b067cfa2e0e5f7988c7707299 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 19 Nov 2021 19:03:31 +0100 Subject: [PATCH] Update buckets --- pom.xml | 9 + .../dbengine/client/LuceneIndex.java | 7 +- .../dbengine/client/LuceneIndexImpl.java | 12 +- .../cavallium/dbengine/database/LLItem.java | 4 + .../dbengine/database/LLLuceneIndex.java | 8 +- .../cavallium/dbengine/database/LLType.java | 1 + .../cavallium/dbengine/database/LLUtils.java | 2 + .../database/disk/LLLocalLuceneIndex.java | 18 +- .../disk/LLLocalMultiLuceneIndex.java | 20 +- .../lucene/collector/BucketValueSource.java | 4 + .../dbengine/lucene/collector/Buckets.java | 29 +++ .../collector/CollectorMultiManager.java | 3 +- .../lucene/collector/ConstantValueSource.java | 3 + .../DecimalBucketCollectorMultiManager.java | 158 ------------- .../DecimalBucketMultiCollectorManager.java | 223 ++++++++++++++++++ .../collector/DoubleBucketValueSource.java | 5 + .../collector/LongBucketValueSource.java | 5 + .../lucene/collector/NullValueSource.java | 3 + .../lucene/searcher/BucketParams.java | 5 +- .../searcher/DecimalBucketMultiSearcher.java | 50 ++-- 20 files changed, 364 insertions(+), 205 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/collector/BucketValueSource.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/collector/Buckets.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/collector/ConstantValueSource.java delete mode 100644 src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketCollectorMultiManager.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketMultiCollectorManager.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/collector/DoubleBucketValueSource.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/collector/LongBucketValueSource.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/collector/NullValueSource.java diff --git a/pom.xml b/pom.xml index fc7a79b..6f1a563 100644 --- a/pom.xml +++ b/pom.xml @@ -222,6 +222,10 @@ org.apache.lucene lucene-misc + + org.apache.lucene + lucene-facet + org.jetbrains annotations @@ -420,6 +424,11 @@ lucene-misc 9.0.0-SNAPSHOT + + org.apache.lucene + lucene-facet + 9.0.0-SNAPSHOT + org.jetbrains annotations diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java index 83cd77c..7ba72b2 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java @@ -8,8 +8,10 @@ import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLSnapshottable; import it.cavallium.dbengine.database.collections.ValueGetter; import it.cavallium.dbengine.database.collections.ValueTransformer; +import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import java.util.List; import java.util.Map.Entry; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -55,7 +57,10 @@ public interface LuceneIndex extends LLSnapshottable { Mono>> search(ClientQueryParams queryParams); - Mono computeBuckets(ClientQueryParams queryParams, BucketParams bucketParams); + Mono computeBuckets(@Nullable CompositeSnapshot snapshot, + @NotNull List queries, + @Nullable Query normalizaitonQuery, + BucketParams bucketParams); Mono count(@Nullable CompositeSnapshot snapshot, Query query); diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 6361ace..3c34bfc 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -9,8 +9,10 @@ import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; +import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -113,10 +115,14 @@ public class LuceneIndexImpl implements LuceneIndex { } @Override - public Mono computeBuckets(ClientQueryParams queryParams, BucketParams bucketParams) { + public Mono computeBuckets(@Nullable CompositeSnapshot snapshot, + @NotNull List query, + @Nullable Query normalizaitonQuery, + BucketParams bucketParams) { return luceneIndex - .computeBuckets(resolveSnapshot(queryParams.snapshot()), - queryParams.toQueryParams(), + .computeBuckets(resolveSnapshot(snapshot), + query, + normalizaitonQuery, bucketParams ) .single(); diff --git a/src/main/java/it/cavallium/dbengine/database/LLItem.java b/src/main/java/it/cavallium/dbengine/database/LLItem.java index 26c4cca..ae57de0 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLItem.java +++ b/src/main/java/it/cavallium/dbengine/database/LLItem.java @@ -53,6 +53,10 @@ public class LLItem { return new LLItem(LLType.LongPoint, name, data); } + public static LLItem newLongStoredField(String name, long data) { + return new LLItem(LLType.LongStoredField, name, data); + } + public static LLItem newFloatPoint(String name, float data) { return new LLItem(LLType.FloatPoint, name, data); } diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index df3e587..0293e11 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -7,11 +7,14 @@ import it.cavallium.dbengine.client.query.current.data.NoSort; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -60,7 +63,10 @@ public interface LLLuceneIndex extends LLSnapshottable { /** * @return buckets with each value collected into one of the buckets */ - Mono computeBuckets(@Nullable LLSnapshot snapshot, QueryParams queryParams, BucketParams bucketParams); + Mono computeBuckets(@Nullable LLSnapshot snapshot, + @NotNull List queries, + @Nullable Query normalizationQuery, + BucketParams bucketParams); default Mono count(@Nullable LLSnapshot snapshot, Query query) { QueryParams params = QueryParams.of(query, 0, 0, Nullablefloat.empty(), NoSort.of(), false); diff --git a/src/main/java/it/cavallium/dbengine/database/LLType.java b/src/main/java/it/cavallium/dbengine/database/LLType.java index 67b248a..2e174b1 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLType.java +++ b/src/main/java/it/cavallium/dbengine/database/LLType.java @@ -5,6 +5,7 @@ public enum LLType { StringFieldStored, IntPoint, LongPoint, + LongStoredField, FloatPoint, SortedNumericDocValuesField, TextField, diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index a7b26af..9bab5cf 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -35,6 +35,7 @@ import org.apache.lucene.document.FloatPoint; import org.apache.lucene.document.IntPoint; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.StoredField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.Term; @@ -174,6 +175,7 @@ public class LLUtils { return switch (item.getType()) { case IntPoint -> new IntPoint(item.getName(), Ints.fromByteArray(item.getData())); case LongPoint -> new LongPoint(item.getName(), Longs.fromByteArray(item.getData())); + case LongStoredField -> new StoredField(item.getName(), Longs.fromByteArray(item.getData())); case FloatPoint -> new FloatPoint(item.getName(), ByteBuffer.wrap(item.getData()).getFloat()); case TextField -> new TextField(item.getName(), item.stringValue(), Field.Store.NO); case TextFieldStored -> new TextField(item.getName(), item.stringValue(), Field.Store.YES); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index d37bed5..84fe7cc 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -12,6 +12,8 @@ import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.client.NRTCachingOptions; +import it.cavallium.dbengine.client.query.QueryParser; +import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLIndexRequest; import it.cavallium.dbengine.database.LLSoftUpdateDocument; @@ -26,6 +28,7 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory; import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher; @@ -35,6 +38,7 @@ import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; import it.unimi.dsi.fastutil.doubles.DoubleArrayList; import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -62,6 +66,7 @@ import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.util.Constants; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.functional.IORunnable; import org.warp.commonutils.log.Logger; @@ -395,16 +400,21 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } @Override - public Mono computeBuckets(@Nullable LLSnapshot snapshot, - QueryParams queryParams, + public Mono computeBuckets(@Nullable LLSnapshot snapshot, + @NotNull List queries, + @Nullable Query normalizationQuery, BucketParams bucketParams) { - LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer); + List localQueries = new ArrayList<>(queries.size()); + for (Query query : queries) { + localQueries.add(QueryParser.toQuery(query, luceneAnalyzer)); + } + var localNormalizationQuery = QueryParser.toQuery(normalizationQuery, luceneAnalyzer); var searchers = searcherManager .retrieveSearcher(snapshot) .map(indexSearcher -> LLIndexSearchers.unsharded(indexSearcher).send()); return decimalBucketMultiSearcher - .collectMulti(searchers, bucketParams, localQueryParams, NO_TRANSFORMATION) + .collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery) .doOnDiscard(Send.class, Send::close) .doOnDiscard(Resource.class, Resource::close); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index c2040da..fb25801 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -6,6 +6,8 @@ import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.client.LuceneOptions; +import it.cavallium.dbengine.client.query.QueryParser; +import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLIndexRequest; import it.cavallium.dbengine.database.LLUpdateDocument; @@ -16,6 +18,7 @@ import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher; @@ -38,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -248,16 +252,22 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } @Override - public Mono computeBuckets(@Nullable LLSnapshot snapshot, - QueryParams queryParams, + public Mono computeBuckets(@Nullable LLSnapshot snapshot, + @NotNull List queries, + @Nullable Query normalizationQuery, BucketParams bucketParams) { - LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer); + List localQueries = new ArrayList<>(queries.size()); + for (Query query : queries) { + localQueries.add(QueryParser.toQuery(query, luceneAnalyzer)); + } + var localNormalizationQuery = QueryParser.toQuery(normalizationQuery, luceneAnalyzer); var searchers = getIndexSearchers(snapshot); // Collect all the shards results into a single global result return decimalBucketMultiSearcher - .collectMulti(searchers, bucketParams, localQueryParams, LLSearchTransformer.NO_TRANSFORMATION) - .doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close); + .collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery) + .doOnDiscard(Send.class, Send::close) + .doOnDiscard(Resource.class, Resource::close); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/BucketValueSource.java b/src/main/java/it/cavallium/dbengine/lucene/collector/BucketValueSource.java new file mode 100644 index 0000000..6a99623 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/BucketValueSource.java @@ -0,0 +1,4 @@ +package it.cavallium.dbengine.lucene.collector; + +public sealed interface BucketValueSource permits DoubleBucketValueSource, LongBucketValueSource, ConstantValueSource, + NullValueSource {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/Buckets.java b/src/main/java/it/cavallium/dbengine/lucene/collector/Buckets.java new file mode 100644 index 0000000..8672f8d --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/Buckets.java @@ -0,0 +1,29 @@ +package it.cavallium.dbengine.lucene.collector; + +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public record Buckets(List seriesValues, DoubleArrayList totals) { + + public Buckets { + for (DoubleArrayList values : seriesValues) { + if (values.size() != totals.size()) { + throw new IllegalArgumentException("Buckets size mismatch"); + } + } + } + + public List normalized() { + var normalizedSeries = new ArrayList(seriesValues.size()); + for (DoubleArrayList values : seriesValues) { + DoubleArrayList normalized = new DoubleArrayList(values.size()); + for (int i = 0; i < values.size(); i++) { + normalized.add(values.getDouble(i) / totals.getDouble(i)); + } + normalizedSeries.add(normalized); + } + return normalizedSeries; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/CollectorMultiManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/CollectorMultiManager.java index 5107b1a..64eee04 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/CollectorMultiManager.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/CollectorMultiManager.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.lucene.collector; +import java.io.IOException; import java.util.List; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.TopDocs; @@ -8,5 +9,5 @@ public interface CollectorMultiManager { ScoreMode scoreMode(); - U reduce(List results); + U reduce(List results) throws IOException; } diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/ConstantValueSource.java b/src/main/java/it/cavallium/dbengine/lucene/collector/ConstantValueSource.java new file mode 100644 index 0000000..0da7059 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/ConstantValueSource.java @@ -0,0 +1,3 @@ +package it.cavallium.dbengine.lucene.collector; + +public record ConstantValueSource(Number constant) implements BucketValueSource {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketCollectorMultiManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketCollectorMultiManager.java deleted file mode 100644 index e39d752..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketCollectorMultiManager.java +++ /dev/null @@ -1,158 +0,0 @@ -package it.cavallium.dbengine.lucene.collector; - -import it.cavallium.dbengine.lucene.LuceneUtils; -import it.unimi.dsi.fastutil.doubles.DoubleArrayList; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import org.apache.commons.lang3.NotImplementedException; -import org.apache.lucene.document.DocumentStoredFieldVisitor; -import org.apache.lucene.index.IndexableField; -import org.apache.lucene.search.Collector; -import org.apache.lucene.search.CollectorManager; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.SimpleCollector; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.search.TopFieldDocs; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -public class DecimalBucketCollectorMultiManager implements CollectorMultiManager { - - - private final String bucketField; - @Nullable - private final String valueField; - private final Set fieldsToLoad; - - private final double totalLength; - private final double bucketLength; - private final double minimum; - private final double maximum; - private final int buckets; - - public DecimalBucketCollectorMultiManager(double minimum, - double maximum, - double buckets, - String bucketField, - @Nullable String valueField) { - var bucketsInt = (int) Math.ceil(buckets); - this.minimum = minimum; - this.maximum = maximum; - this.buckets = bucketsInt; - this.bucketLength = (maximum - minimum) / bucketsInt; - this.totalLength = bucketLength * bucketsInt; - this.bucketField = bucketField; - this.valueField = valueField; - if (valueField != null) { - this.fieldsToLoad = Set.of(bucketField, valueField); - } else { - this.fieldsToLoad = Set.of(bucketField); - } - } - - public double[] newBuckets() { - return new double[buckets]; - } - - public CollectorManager get(IndexSearcher indexSearcher) { - - return new CollectorManager<>() { - @Override - public BucketsCollector newCollector() { - return new BucketsCollector(indexSearcher); - } - - @Override - public DoubleArrayList reduce(Collection collectors) { - double[] reducedBuckets = newBuckets(); - for (BucketsCollector collector : collectors) { - var buckets = collector.getBuckets(); - assert reducedBuckets.length == buckets.length; - for (int i = 0; i < buckets.length; i++) { - reducedBuckets[i] += buckets[i]; - } - } - return DoubleArrayList.wrap(reducedBuckets); - } - }; - } - - @Override - public ScoreMode scoreMode() { - throw new NotImplementedException(); - } - - @Override - public DoubleArrayList reduce(List reducedBucketsList) { - if (reducedBucketsList.size() == 1) { - return reducedBucketsList.get(0); - } - double[] reducedBuckets = newBuckets(); - for (DoubleArrayList buckets : reducedBucketsList) { - for (int i = 0; i < buckets.size(); i++) { - reducedBuckets[i] += buckets.getDouble(i); - } - } - return DoubleArrayList.wrap(reducedBuckets); - } - - private class BucketsCollector extends SimpleCollector { - - private final IndexSearcher indexSearcher; - private final DocumentStoredFieldVisitor documentStoredFieldVisitor; - private final double[] buckets; - - public BucketsCollector(IndexSearcher indexSearcher) { - super(); - this.indexSearcher = indexSearcher; - this.documentStoredFieldVisitor = new DocumentStoredFieldVisitor(fieldsToLoad); - this.buckets = newBuckets(); - } - - - @Override - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE_NO_SCORES; - } - - @Override - public void collect(int doc) throws IOException { - indexSearcher.doc(doc, documentStoredFieldVisitor); - var document = documentStoredFieldVisitor.getDocument(); - var bucketField = document.getField(DecimalBucketCollectorMultiManager.this.bucketField); - IndexableField valueField; - if (DecimalBucketCollectorMultiManager.this.valueField != null) { - valueField = document.getField(DecimalBucketCollectorMultiManager.this.valueField); - } else { - valueField = null; - } - var bucketValue = bucketField.numericValue().doubleValue(); - if (bucketValue >= minimum && bucketValue <= maximum) { - double value; - if (valueField != null) { - value = valueField.numericValue().doubleValue(); - } else { - value = 1.0d; - } - double bucketIndex = (bucketValue - minimum) / bucketLength; - int bucketIndexLow = (int) Math.floor(bucketIndex); - double ratio = (bucketIndex - bucketIndexLow); - assert ratio >= 0 && ratio <= 1; - double loValue = value * (1d - ratio); - double hiValue = value * ratio; - buckets[bucketIndexLow] += loValue; - if (hiValue > 0d) { - buckets[bucketIndexLow + 1] += hiValue; - } - } - } - - public double[] getBuckets() { - return buckets; - } - } -} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketMultiCollectorManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketMultiCollectorManager.java new file mode 100644 index 0000000..8a765d0 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketMultiCollectorManager.java @@ -0,0 +1,223 @@ +package it.cavallium.dbengine.lucene.collector; + +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.lucene.document.DocumentStoredFieldVisitor; +import org.apache.lucene.facet.FacetResult; +import org.apache.lucene.facet.Facets; +import org.apache.lucene.facet.FacetsCollector; +import org.apache.lucene.facet.FacetsCollectorManager; +import org.apache.lucene.facet.LabelAndValue; +import org.apache.lucene.facet.range.DoubleRange; +import org.apache.lucene.facet.range.DoubleRangeFacetCounts; +import org.apache.lucene.facet.range.LongRange; +import org.apache.lucene.facet.range.LongRangeFacetCounts; +import org.apache.lucene.facet.range.Range; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.DoubleValuesSource; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.LongValuesSource; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.SimpleCollector; +import org.jetbrains.annotations.Nullable; + +public class DecimalBucketMultiCollectorManager implements CollectorMultiManager { + + private final List queries; + private final Query normalizationQuery; + + private final String bucketField; + private final BucketValueSource bucketValueSource; + + private final double totalLength; + private final double bucketLength; + private final double minimum; + private final double maximum; + private final int buckets; + + private final Range[] bucketRanges; + + // todo: replace with an argument + private static final boolean USE_LONGS = true; + + public DecimalBucketMultiCollectorManager(double minimum, + double maximum, + double buckets, + String bucketField, + BucketValueSource bucketValueSource, + List queries, + Query normalizationQuery) { + this.queries = queries; + this.normalizationQuery = normalizationQuery; + var bucketsInt = (int) Math.ceil(buckets); + this.minimum = minimum; + this.maximum = maximum; + this.buckets = bucketsInt; + this.bucketLength = (maximum - minimum) / bucketsInt; + this.totalLength = bucketLength * bucketsInt; + this.bucketField = bucketField; + this.bucketValueSource = bucketValueSource; + + if (USE_LONGS) { + this.bucketRanges = new LongRange[bucketsInt]; + } else { + this.bucketRanges = new DoubleRange[bucketsInt]; + } + for (int i = 0; i < bucketsInt; i++) { + double offsetMin = minimum + (bucketLength * i); + double offsetMax = minimum + (bucketLength * (i + 1)); + if (USE_LONGS) { + this.bucketRanges[i] = new LongRange(Integer.toString(i), + (long) offsetMin, + true, + (long) offsetMax, + i == bucketsInt - 1 + ); + } else { + this.bucketRanges[i] = new DoubleRange(Integer.toString(i), + offsetMin, + true, + offsetMax, + i == bucketsInt - 1 + ); + } + } + } + + public double[] newBuckets() { + return new double[buckets]; + } + + public Buckets search(IndexSearcher indexSearcher) throws IOException { + var facetsCollectorManager = new FacetsCollectorManager(); + var facetsCollector = indexSearcher.search(normalizationQuery, facetsCollectorManager); + double[] reducedNormalizationBuckets = newBuckets(); + List seriesReducedBuckets = new ArrayList<>(queries.size()); + for (int i = 0; i < queries.size(); i++) { + var buckets = newBuckets(); + seriesReducedBuckets.add(DoubleArrayList.wrap(buckets)); + } + int serieIndex = 0; + for (Query query : queries) { + var reducedBuckets = seriesReducedBuckets.get(serieIndex); + Facets facets; + if (USE_LONGS) { + LongValuesSource valuesSource; + if (bucketValueSource instanceof NullValueSource) { + valuesSource = null; + } else if (bucketValueSource instanceof ConstantValueSource constantValueSource) { + valuesSource = LongValuesSource.constant(constantValueSource.constant().longValue()); + } else if (bucketValueSource instanceof LongBucketValueSource longBucketValueSource) { + valuesSource = longBucketValueSource.source(); + } else { + throw new IllegalArgumentException("Wrong value source type: " + bucketValueSource); + } + facets = new LongRangeFacetCounts(bucketField, + valuesSource, + facetsCollector, + query, + (LongRange[]) bucketRanges + ); + } else { + DoubleValuesSource valuesSource; + if (bucketValueSource instanceof NullValueSource) { + valuesSource = null; + } else if (bucketValueSource instanceof ConstantValueSource constantValueSource) { + valuesSource = DoubleValuesSource.constant(constantValueSource.constant().longValue()); + } else if (bucketValueSource instanceof DoubleBucketValueSource doubleBucketValueSource) { + valuesSource = doubleBucketValueSource.source(); + } else { + throw new IllegalArgumentException("Wrong value source type: " + bucketValueSource); + } + facets = new DoubleRangeFacetCounts(bucketField, + valuesSource, + facetsCollector, + query, + (DoubleRange[]) bucketRanges + ); + } + FacetResult children = facets.getTopChildren(100, bucketField); + for (LabelAndValue labelAndValue : children.labelValues) { + var index = Integer.parseInt(labelAndValue.label); + reducedBuckets.set(index, reducedBuckets.getDouble(index) + labelAndValue.value.doubleValue()); + } + serieIndex++; + } + + Facets normalizationFacets; + if (USE_LONGS) { + LongValuesSource valuesSource; + if (bucketValueSource instanceof NullValueSource) { + valuesSource = null; + } else if (bucketValueSource instanceof ConstantValueSource constantValueSource) { + valuesSource = LongValuesSource.constant(constantValueSource.constant().longValue()); + } else if (bucketValueSource instanceof LongBucketValueSource longBucketValueSource) { + valuesSource = longBucketValueSource.source(); + } else { + throw new IllegalArgumentException("Wrong value source type: " + bucketValueSource); + } + normalizationFacets = new LongRangeFacetCounts(bucketField, + valuesSource, + facetsCollector, + null, + (LongRange[]) bucketRanges + ); + } else { + DoubleValuesSource valuesSource; + if (bucketValueSource instanceof NullValueSource) { + valuesSource = null; + } else if (bucketValueSource instanceof ConstantValueSource constantValueSource) { + valuesSource = DoubleValuesSource.constant(constantValueSource.constant().longValue()); + } else if (bucketValueSource instanceof DoubleBucketValueSource doubleBucketValueSource) { + valuesSource = doubleBucketValueSource.source(); + } else { + throw new IllegalArgumentException("Wrong value source type: " + bucketValueSource); + } + normalizationFacets = new DoubleRangeFacetCounts(bucketField, + valuesSource, + facetsCollector, + null, + (DoubleRange[]) bucketRanges + ); + } + var normalizationChildren = normalizationFacets.getTopChildren(0, bucketField); + for (LabelAndValue labelAndValue : normalizationChildren.labelValues) { + var index = Integer.parseInt(labelAndValue.label); + reducedNormalizationBuckets[index] += labelAndValue.value.doubleValue(); + } + return new Buckets(seriesReducedBuckets, DoubleArrayList.wrap(reducedNormalizationBuckets)); + } + + @Override + public ScoreMode scoreMode() { + throw new NotImplementedException(); + } + + @Override + public Buckets reduce(List reducedBucketsList) throws IOException { + List seriesReducedValues = new ArrayList<>(); + double[] reducedTotals = newBuckets(); + for (var seriesBuckets : reducedBucketsList) { + for (DoubleArrayList values : seriesBuckets.seriesValues()) { + double[] reducedValues = newBuckets(); + for (int i = 0; i < values.size(); i++) { + reducedValues[i] += values.getDouble(i); + } + seriesReducedValues.add(DoubleArrayList.wrap(reducedValues)); + } + var totals = seriesBuckets.totals(); + for (int i = 0; i < totals.size(); i++) { + reducedTotals[i] += totals.getDouble(i); + } + } + return new Buckets(seriesReducedValues, DoubleArrayList.wrap(reducedTotals)); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/DoubleBucketValueSource.java b/src/main/java/it/cavallium/dbengine/lucene/collector/DoubleBucketValueSource.java new file mode 100644 index 0000000..f85329f --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/DoubleBucketValueSource.java @@ -0,0 +1,5 @@ +package it.cavallium.dbengine.lucene.collector; + +import org.apache.lucene.search.DoubleValuesSource; + +public record DoubleBucketValueSource(DoubleValuesSource source) implements BucketValueSource {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/LongBucketValueSource.java b/src/main/java/it/cavallium/dbengine/lucene/collector/LongBucketValueSource.java new file mode 100644 index 0000000..7434e86 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/LongBucketValueSource.java @@ -0,0 +1,5 @@ +package it.cavallium.dbengine.lucene.collector; + +import org.apache.lucene.search.LongValuesSource; + +public record LongBucketValueSource(LongValuesSource source) implements BucketValueSource {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/NullValueSource.java b/src/main/java/it/cavallium/dbengine/lucene/collector/NullValueSource.java new file mode 100644 index 0000000..1766e8e --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/NullValueSource.java @@ -0,0 +1,3 @@ +package it.cavallium.dbengine.lucene.collector; + +public record NullValueSource() implements BucketValueSource {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/BucketParams.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/BucketParams.java index 54aaba0..e501000 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/BucketParams.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/BucketParams.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.lucene.searcher; -import org.jetbrains.annotations.Nullable; +import it.cavallium.dbengine.lucene.collector.BucketValueSource; +import org.jetbrains.annotations.NotNull; public record BucketParams(double min, double max, int buckets, String bucketFieldName, - @Nullable String valueFieldName) {} + @NotNull BucketValueSource valueSource) {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java index a7e0171..7131245 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java @@ -1,18 +1,15 @@ package it.cavallium.dbengine.lucene.searcher; import io.net5.buffer.api.Send; -import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; -import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; -import it.cavallium.dbengine.lucene.FullDocs; -import it.cavallium.dbengine.lucene.LLFieldDoc; -import it.cavallium.dbengine.lucene.LuceneUtils; -import it.cavallium.dbengine.lucene.collector.DecimalBucketCollectorMultiManager; -import it.cavallium.dbengine.lucene.collector.LMDBFullFieldDocCollector; -import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; -import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import it.cavallium.dbengine.lucene.collector.Buckets; +import it.cavallium.dbengine.lucene.collector.DecimalBucketMultiCollectorManager; +import java.util.List; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; @@ -22,47 +19,40 @@ public class DecimalBucketMultiSearcher { protected static final Logger logger = LoggerFactory.getLogger(DecimalBucketMultiSearcher.class); - public Mono collectMulti(Mono> indexSearchersMono, + public Mono collectMulti(Mono> indexSearchersMono, BucketParams bucketParams, - LocalQueryParams queryParams, - LLSearchTransformer transformer) { - Mono queryParamsMono; - if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono - .fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true); - } + @NotNull List queries, + @Nullable Query normalizationQuery) { - return queryParamsMono.flatMap(queryParams2 -> LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this + return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this // Search results - .search(indexSearchers.shards(), bucketParams, queryParams2) + .search(indexSearchers.shards(), bucketParams, queries, normalizationQuery) // Ensure that one result is always returned .single(), - true)); + true); } - private Mono search(Iterable indexSearchers, + private Mono search(Iterable indexSearchers, BucketParams bucketParams, - LocalQueryParams queryParams) { + @NotNull List queries, + @Nullable Query normalizationQuery) { return Mono .fromCallable(() -> { LLUtils.ensureBlocking(); - return new DecimalBucketCollectorMultiManager(bucketParams.min(), + return new DecimalBucketMultiCollectorManager(bucketParams.min(), bucketParams.max(), bucketParams.buckets(), bucketParams.bucketFieldName(), - bucketParams.valueFieldName() + bucketParams.valueSource(), + queries, + normalizationQuery ); }) .flatMap(cmm -> Flux .fromIterable(indexSearchers) .flatMap(shard -> Mono.fromCallable(() -> { LLUtils.ensureBlocking(); - - var collector = cmm.get(shard); - - return shard.search(queryParams.query(), collector); + return cmm.search(shard); })) .collectList() .flatMap(results -> Mono.fromCallable(() -> {