diff --git a/pom.xml b/pom.xml
index fc7a79b..6f1a563 100644
--- a/pom.xml
+++ b/pom.xml
@@ -222,6 +222,10 @@
org.apache.lucene
lucene-misc
+
+ org.apache.lucene
+ lucene-facet
+
org.jetbrains
annotations
@@ -420,6 +424,11 @@
lucene-misc
9.0.0-SNAPSHOT
+
+ org.apache.lucene
+ lucene-facet
+ 9.0.0-SNAPSHOT
+
org.jetbrains
annotations
diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
index 83cd77c..7ba72b2 100644
--- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
@@ -8,8 +8,10 @@ import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLSnapshottable;
import it.cavallium.dbengine.database.collections.ValueGetter;
import it.cavallium.dbengine.database.collections.ValueTransformer;
+import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import java.util.List;
import java.util.Map.Entry;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -55,7 +57,10 @@ public interface LuceneIndex extends LLSnapshottable {
Mono>> search(ClientQueryParams queryParams);
- Mono computeBuckets(ClientQueryParams queryParams, BucketParams bucketParams);
+ Mono computeBuckets(@Nullable CompositeSnapshot snapshot,
+ @NotNull List queries,
+ @Nullable Query normalizaitonQuery,
+ BucketParams bucketParams);
Mono count(@Nullable CompositeSnapshot snapshot, Query query);
diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
index 6361ace..3c34bfc 100644
--- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
+++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
@@ -9,8 +9,10 @@ import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
+import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -113,10 +115,14 @@ public class LuceneIndexImpl implements LuceneIndex {
}
@Override
- public Mono computeBuckets(ClientQueryParams queryParams, BucketParams bucketParams) {
+ public Mono computeBuckets(@Nullable CompositeSnapshot snapshot,
+ @NotNull List query,
+ @Nullable Query normalizaitonQuery,
+ BucketParams bucketParams) {
return luceneIndex
- .computeBuckets(resolveSnapshot(queryParams.snapshot()),
- queryParams.toQueryParams(),
+ .computeBuckets(resolveSnapshot(snapshot),
+ query,
+ normalizaitonQuery,
bucketParams
)
.single();
diff --git a/src/main/java/it/cavallium/dbengine/database/LLItem.java b/src/main/java/it/cavallium/dbengine/database/LLItem.java
index 26c4cca..ae57de0 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLItem.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLItem.java
@@ -53,6 +53,10 @@ public class LLItem {
return new LLItem(LLType.LongPoint, name, data);
}
+ public static LLItem newLongStoredField(String name, long data) {
+ return new LLItem(LLType.LongStoredField, name, data);
+ }
+
public static LLItem newFloatPoint(String name, float data) {
return new LLItem(LLType.FloatPoint, name, data);
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
index df3e587..0293e11 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
@@ -7,11 +7,14 @@ import it.cavallium.dbengine.client.query.current.data.NoSort;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
+import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -60,7 +63,10 @@ public interface LLLuceneIndex extends LLSnapshottable {
/**
* @return buckets with each value collected into one of the buckets
*/
- Mono computeBuckets(@Nullable LLSnapshot snapshot, QueryParams queryParams, BucketParams bucketParams);
+ Mono computeBuckets(@Nullable LLSnapshot snapshot,
+ @NotNull List queries,
+ @Nullable Query normalizationQuery,
+ BucketParams bucketParams);
default Mono count(@Nullable LLSnapshot snapshot, Query query) {
QueryParams params = QueryParams.of(query, 0, 0, Nullablefloat.empty(), NoSort.of(), false);
diff --git a/src/main/java/it/cavallium/dbengine/database/LLType.java b/src/main/java/it/cavallium/dbengine/database/LLType.java
index 67b248a..2e174b1 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLType.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLType.java
@@ -5,6 +5,7 @@ public enum LLType {
StringFieldStored,
IntPoint,
LongPoint,
+ LongStoredField,
FloatPoint,
SortedNumericDocValuesField,
TextField,
diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
index a7b26af..9bab5cf 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
@@ -35,6 +35,7 @@ import org.apache.lucene.document.FloatPoint;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
+import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.Term;
@@ -174,6 +175,7 @@ public class LLUtils {
return switch (item.getType()) {
case IntPoint -> new IntPoint(item.getName(), Ints.fromByteArray(item.getData()));
case LongPoint -> new LongPoint(item.getName(), Longs.fromByteArray(item.getData()));
+ case LongStoredField -> new StoredField(item.getName(), Longs.fromByteArray(item.getData()));
case FloatPoint -> new FloatPoint(item.getName(), ByteBuffer.wrap(item.getData()).getFloat());
case TextField -> new TextField(item.getName(), item.stringValue(), Field.Store.NO);
case TextFieldStored -> new TextField(item.getName(), item.stringValue(), Field.Store.YES);
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
index d37bed5..84fe7cc 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
@@ -12,6 +12,8 @@ import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.NRTCachingOptions;
+import it.cavallium.dbengine.client.query.QueryParser;
+import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLIndexRequest;
import it.cavallium.dbengine.database.LLSoftUpdateDocument;
@@ -26,6 +28,7 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory;
import it.cavallium.dbengine.lucene.LuceneUtils;
+import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher;
@@ -35,6 +38,7 @@ import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -62,6 +66,7 @@ import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.Constants;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.functional.IORunnable;
import org.warp.commonutils.log.Logger;
@@ -395,16 +400,21 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
@Override
- public Mono computeBuckets(@Nullable LLSnapshot snapshot,
- QueryParams queryParams,
+ public Mono computeBuckets(@Nullable LLSnapshot snapshot,
+ @NotNull List queries,
+ @Nullable Query normalizationQuery,
BucketParams bucketParams) {
- LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
+ List localQueries = new ArrayList<>(queries.size());
+ for (Query query : queries) {
+ localQueries.add(QueryParser.toQuery(query, luceneAnalyzer));
+ }
+ var localNormalizationQuery = QueryParser.toQuery(normalizationQuery, luceneAnalyzer);
var searchers = searcherManager
.retrieveSearcher(snapshot)
.map(indexSearcher -> LLIndexSearchers.unsharded(indexSearcher).send());
return decimalBucketMultiSearcher
- .collectMulti(searchers, bucketParams, localQueryParams, NO_TRANSFORMATION)
+ .collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery)
.doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
index c2040da..fb25801 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
@@ -6,6 +6,8 @@ import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
+import it.cavallium.dbengine.client.query.QueryParser;
+import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLIndexRequest;
import it.cavallium.dbengine.database.LLUpdateDocument;
@@ -16,6 +18,7 @@ import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.LuceneUtils;
+import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher;
@@ -38,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -248,16 +252,22 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
@Override
- public Mono computeBuckets(@Nullable LLSnapshot snapshot,
- QueryParams queryParams,
+ public Mono computeBuckets(@Nullable LLSnapshot snapshot,
+ @NotNull List queries,
+ @Nullable Query normalizationQuery,
BucketParams bucketParams) {
- LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
+ List localQueries = new ArrayList<>(queries.size());
+ for (Query query : queries) {
+ localQueries.add(QueryParser.toQuery(query, luceneAnalyzer));
+ }
+ var localNormalizationQuery = QueryParser.toQuery(normalizationQuery, luceneAnalyzer);
var searchers = getIndexSearchers(snapshot);
// Collect all the shards results into a single global result
return decimalBucketMultiSearcher
- .collectMulti(searchers, bucketParams, localQueryParams, LLSearchTransformer.NO_TRANSFORMATION)
- .doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close);
+ .collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery)
+ .doOnDiscard(Send.class, Send::close)
+ .doOnDiscard(Resource.class, Resource::close);
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/BucketValueSource.java b/src/main/java/it/cavallium/dbengine/lucene/collector/BucketValueSource.java
new file mode 100644
index 0000000..6a99623
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/lucene/collector/BucketValueSource.java
@@ -0,0 +1,4 @@
+package it.cavallium.dbengine.lucene.collector;
+
+public sealed interface BucketValueSource permits DoubleBucketValueSource, LongBucketValueSource, ConstantValueSource,
+ NullValueSource {}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/Buckets.java b/src/main/java/it/cavallium/dbengine/lucene/collector/Buckets.java
new file mode 100644
index 0000000..8672f8d
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/lucene/collector/Buckets.java
@@ -0,0 +1,29 @@
+package it.cavallium.dbengine.lucene.collector;
+
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public record Buckets(List seriesValues, DoubleArrayList totals) {
+
+ public Buckets {
+ for (DoubleArrayList values : seriesValues) {
+ if (values.size() != totals.size()) {
+ throw new IllegalArgumentException("Buckets size mismatch");
+ }
+ }
+ }
+
+ public List normalized() {
+ var normalizedSeries = new ArrayList(seriesValues.size());
+ for (DoubleArrayList values : seriesValues) {
+ DoubleArrayList normalized = new DoubleArrayList(values.size());
+ for (int i = 0; i < values.size(); i++) {
+ normalized.add(values.getDouble(i) / totals.getDouble(i));
+ }
+ normalizedSeries.add(normalized);
+ }
+ return normalizedSeries;
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/CollectorMultiManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/CollectorMultiManager.java
index 5107b1a..64eee04 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/collector/CollectorMultiManager.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/collector/CollectorMultiManager.java
@@ -1,5 +1,6 @@
package it.cavallium.dbengine.lucene.collector;
+import java.io.IOException;
import java.util.List;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.TopDocs;
@@ -8,5 +9,5 @@ public interface CollectorMultiManager {
ScoreMode scoreMode();
- U reduce(List results);
+ U reduce(List results) throws IOException;
}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/ConstantValueSource.java b/src/main/java/it/cavallium/dbengine/lucene/collector/ConstantValueSource.java
new file mode 100644
index 0000000..0da7059
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/lucene/collector/ConstantValueSource.java
@@ -0,0 +1,3 @@
+package it.cavallium.dbengine.lucene.collector;
+
+public record ConstantValueSource(Number constant) implements BucketValueSource {}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketCollectorMultiManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketCollectorMultiManager.java
deleted file mode 100644
index e39d752..0000000
--- a/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketCollectorMultiManager.java
+++ /dev/null
@@ -1,158 +0,0 @@
-package it.cavallium.dbengine.lucene.collector;
-
-import it.cavallium.dbengine.lucene.LuceneUtils;
-import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import org.apache.commons.lang3.NotImplementedException;
-import org.apache.lucene.document.DocumentStoredFieldVisitor;
-import org.apache.lucene.index.IndexableField;
-import org.apache.lucene.search.Collector;
-import org.apache.lucene.search.CollectorManager;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreMode;
-import org.apache.lucene.search.SimpleCollector;
-import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.search.TopFieldDocs;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-public class DecimalBucketCollectorMultiManager implements CollectorMultiManager {
-
-
- private final String bucketField;
- @Nullable
- private final String valueField;
- private final Set fieldsToLoad;
-
- private final double totalLength;
- private final double bucketLength;
- private final double minimum;
- private final double maximum;
- private final int buckets;
-
- public DecimalBucketCollectorMultiManager(double minimum,
- double maximum,
- double buckets,
- String bucketField,
- @Nullable String valueField) {
- var bucketsInt = (int) Math.ceil(buckets);
- this.minimum = minimum;
- this.maximum = maximum;
- this.buckets = bucketsInt;
- this.bucketLength = (maximum - minimum) / bucketsInt;
- this.totalLength = bucketLength * bucketsInt;
- this.bucketField = bucketField;
- this.valueField = valueField;
- if (valueField != null) {
- this.fieldsToLoad = Set.of(bucketField, valueField);
- } else {
- this.fieldsToLoad = Set.of(bucketField);
- }
- }
-
- public double[] newBuckets() {
- return new double[buckets];
- }
-
- public CollectorManager get(IndexSearcher indexSearcher) {
-
- return new CollectorManager<>() {
- @Override
- public BucketsCollector newCollector() {
- return new BucketsCollector(indexSearcher);
- }
-
- @Override
- public DoubleArrayList reduce(Collection collectors) {
- double[] reducedBuckets = newBuckets();
- for (BucketsCollector collector : collectors) {
- var buckets = collector.getBuckets();
- assert reducedBuckets.length == buckets.length;
- for (int i = 0; i < buckets.length; i++) {
- reducedBuckets[i] += buckets[i];
- }
- }
- return DoubleArrayList.wrap(reducedBuckets);
- }
- };
- }
-
- @Override
- public ScoreMode scoreMode() {
- throw new NotImplementedException();
- }
-
- @Override
- public DoubleArrayList reduce(List reducedBucketsList) {
- if (reducedBucketsList.size() == 1) {
- return reducedBucketsList.get(0);
- }
- double[] reducedBuckets = newBuckets();
- for (DoubleArrayList buckets : reducedBucketsList) {
- for (int i = 0; i < buckets.size(); i++) {
- reducedBuckets[i] += buckets.getDouble(i);
- }
- }
- return DoubleArrayList.wrap(reducedBuckets);
- }
-
- private class BucketsCollector extends SimpleCollector {
-
- private final IndexSearcher indexSearcher;
- private final DocumentStoredFieldVisitor documentStoredFieldVisitor;
- private final double[] buckets;
-
- public BucketsCollector(IndexSearcher indexSearcher) {
- super();
- this.indexSearcher = indexSearcher;
- this.documentStoredFieldVisitor = new DocumentStoredFieldVisitor(fieldsToLoad);
- this.buckets = newBuckets();
- }
-
-
- @Override
- public ScoreMode scoreMode() {
- return ScoreMode.COMPLETE_NO_SCORES;
- }
-
- @Override
- public void collect(int doc) throws IOException {
- indexSearcher.doc(doc, documentStoredFieldVisitor);
- var document = documentStoredFieldVisitor.getDocument();
- var bucketField = document.getField(DecimalBucketCollectorMultiManager.this.bucketField);
- IndexableField valueField;
- if (DecimalBucketCollectorMultiManager.this.valueField != null) {
- valueField = document.getField(DecimalBucketCollectorMultiManager.this.valueField);
- } else {
- valueField = null;
- }
- var bucketValue = bucketField.numericValue().doubleValue();
- if (bucketValue >= minimum && bucketValue <= maximum) {
- double value;
- if (valueField != null) {
- value = valueField.numericValue().doubleValue();
- } else {
- value = 1.0d;
- }
- double bucketIndex = (bucketValue - minimum) / bucketLength;
- int bucketIndexLow = (int) Math.floor(bucketIndex);
- double ratio = (bucketIndex - bucketIndexLow);
- assert ratio >= 0 && ratio <= 1;
- double loValue = value * (1d - ratio);
- double hiValue = value * ratio;
- buckets[bucketIndexLow] += loValue;
- if (hiValue > 0d) {
- buckets[bucketIndexLow + 1] += hiValue;
- }
- }
- }
-
- public double[] getBuckets() {
- return buckets;
- }
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketMultiCollectorManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketMultiCollectorManager.java
new file mode 100644
index 0000000..8a765d0
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/lucene/collector/DecimalBucketMultiCollectorManager.java
@@ -0,0 +1,223 @@
+package it.cavallium.dbengine.lucene.collector;
+
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.lucene.document.DocumentStoredFieldVisitor;
+import org.apache.lucene.facet.FacetResult;
+import org.apache.lucene.facet.Facets;
+import org.apache.lucene.facet.FacetsCollector;
+import org.apache.lucene.facet.FacetsCollectorManager;
+import org.apache.lucene.facet.LabelAndValue;
+import org.apache.lucene.facet.range.DoubleRange;
+import org.apache.lucene.facet.range.DoubleRangeFacetCounts;
+import org.apache.lucene.facet.range.LongRange;
+import org.apache.lucene.facet.range.LongRangeFacetCounts;
+import org.apache.lucene.facet.range.Range;
+import org.apache.lucene.search.CollectorManager;
+import org.apache.lucene.search.DoubleValuesSource;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.LongValuesSource;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Scorable;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.SimpleCollector;
+import org.jetbrains.annotations.Nullable;
+
+public class DecimalBucketMultiCollectorManager implements CollectorMultiManager {
+
+ private final List queries;
+ private final Query normalizationQuery;
+
+ private final String bucketField;
+ private final BucketValueSource bucketValueSource;
+
+ private final double totalLength;
+ private final double bucketLength;
+ private final double minimum;
+ private final double maximum;
+ private final int buckets;
+
+ private final Range[] bucketRanges;
+
+ // todo: replace with an argument
+ private static final boolean USE_LONGS = true;
+
+ public DecimalBucketMultiCollectorManager(double minimum,
+ double maximum,
+ double buckets,
+ String bucketField,
+ BucketValueSource bucketValueSource,
+ List queries,
+ Query normalizationQuery) {
+ this.queries = queries;
+ this.normalizationQuery = normalizationQuery;
+ var bucketsInt = (int) Math.ceil(buckets);
+ this.minimum = minimum;
+ this.maximum = maximum;
+ this.buckets = bucketsInt;
+ this.bucketLength = (maximum - minimum) / bucketsInt;
+ this.totalLength = bucketLength * bucketsInt;
+ this.bucketField = bucketField;
+ this.bucketValueSource = bucketValueSource;
+
+ if (USE_LONGS) {
+ this.bucketRanges = new LongRange[bucketsInt];
+ } else {
+ this.bucketRanges = new DoubleRange[bucketsInt];
+ }
+ for (int i = 0; i < bucketsInt; i++) {
+ double offsetMin = minimum + (bucketLength * i);
+ double offsetMax = minimum + (bucketLength * (i + 1));
+ if (USE_LONGS) {
+ this.bucketRanges[i] = new LongRange(Integer.toString(i),
+ (long) offsetMin,
+ true,
+ (long) offsetMax,
+ i == bucketsInt - 1
+ );
+ } else {
+ this.bucketRanges[i] = new DoubleRange(Integer.toString(i),
+ offsetMin,
+ true,
+ offsetMax,
+ i == bucketsInt - 1
+ );
+ }
+ }
+ }
+
+ public double[] newBuckets() {
+ return new double[buckets];
+ }
+
+ public Buckets search(IndexSearcher indexSearcher) throws IOException {
+ var facetsCollectorManager = new FacetsCollectorManager();
+ var facetsCollector = indexSearcher.search(normalizationQuery, facetsCollectorManager);
+ double[] reducedNormalizationBuckets = newBuckets();
+ List seriesReducedBuckets = new ArrayList<>(queries.size());
+ for (int i = 0; i < queries.size(); i++) {
+ var buckets = newBuckets();
+ seriesReducedBuckets.add(DoubleArrayList.wrap(buckets));
+ }
+ int serieIndex = 0;
+ for (Query query : queries) {
+ var reducedBuckets = seriesReducedBuckets.get(serieIndex);
+ Facets facets;
+ if (USE_LONGS) {
+ LongValuesSource valuesSource;
+ if (bucketValueSource instanceof NullValueSource) {
+ valuesSource = null;
+ } else if (bucketValueSource instanceof ConstantValueSource constantValueSource) {
+ valuesSource = LongValuesSource.constant(constantValueSource.constant().longValue());
+ } else if (bucketValueSource instanceof LongBucketValueSource longBucketValueSource) {
+ valuesSource = longBucketValueSource.source();
+ } else {
+ throw new IllegalArgumentException("Wrong value source type: " + bucketValueSource);
+ }
+ facets = new LongRangeFacetCounts(bucketField,
+ valuesSource,
+ facetsCollector,
+ query,
+ (LongRange[]) bucketRanges
+ );
+ } else {
+ DoubleValuesSource valuesSource;
+ if (bucketValueSource instanceof NullValueSource) {
+ valuesSource = null;
+ } else if (bucketValueSource instanceof ConstantValueSource constantValueSource) {
+ valuesSource = DoubleValuesSource.constant(constantValueSource.constant().longValue());
+ } else if (bucketValueSource instanceof DoubleBucketValueSource doubleBucketValueSource) {
+ valuesSource = doubleBucketValueSource.source();
+ } else {
+ throw new IllegalArgumentException("Wrong value source type: " + bucketValueSource);
+ }
+ facets = new DoubleRangeFacetCounts(bucketField,
+ valuesSource,
+ facetsCollector,
+ query,
+ (DoubleRange[]) bucketRanges
+ );
+ }
+ FacetResult children = facets.getTopChildren(100, bucketField);
+ for (LabelAndValue labelAndValue : children.labelValues) {
+ var index = Integer.parseInt(labelAndValue.label);
+ reducedBuckets.set(index, reducedBuckets.getDouble(index) + labelAndValue.value.doubleValue());
+ }
+ serieIndex++;
+ }
+
+ Facets normalizationFacets;
+ if (USE_LONGS) {
+ LongValuesSource valuesSource;
+ if (bucketValueSource instanceof NullValueSource) {
+ valuesSource = null;
+ } else if (bucketValueSource instanceof ConstantValueSource constantValueSource) {
+ valuesSource = LongValuesSource.constant(constantValueSource.constant().longValue());
+ } else if (bucketValueSource instanceof LongBucketValueSource longBucketValueSource) {
+ valuesSource = longBucketValueSource.source();
+ } else {
+ throw new IllegalArgumentException("Wrong value source type: " + bucketValueSource);
+ }
+ normalizationFacets = new LongRangeFacetCounts(bucketField,
+ valuesSource,
+ facetsCollector,
+ null,
+ (LongRange[]) bucketRanges
+ );
+ } else {
+ DoubleValuesSource valuesSource;
+ if (bucketValueSource instanceof NullValueSource) {
+ valuesSource = null;
+ } else if (bucketValueSource instanceof ConstantValueSource constantValueSource) {
+ valuesSource = DoubleValuesSource.constant(constantValueSource.constant().longValue());
+ } else if (bucketValueSource instanceof DoubleBucketValueSource doubleBucketValueSource) {
+ valuesSource = doubleBucketValueSource.source();
+ } else {
+ throw new IllegalArgumentException("Wrong value source type: " + bucketValueSource);
+ }
+ normalizationFacets = new DoubleRangeFacetCounts(bucketField,
+ valuesSource,
+ facetsCollector,
+ null,
+ (DoubleRange[]) bucketRanges
+ );
+ }
+ var normalizationChildren = normalizationFacets.getTopChildren(0, bucketField);
+ for (LabelAndValue labelAndValue : normalizationChildren.labelValues) {
+ var index = Integer.parseInt(labelAndValue.label);
+ reducedNormalizationBuckets[index] += labelAndValue.value.doubleValue();
+ }
+ return new Buckets(seriesReducedBuckets, DoubleArrayList.wrap(reducedNormalizationBuckets));
+ }
+
+ @Override
+ public ScoreMode scoreMode() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Buckets reduce(List reducedBucketsList) throws IOException {
+ List seriesReducedValues = new ArrayList<>();
+ double[] reducedTotals = newBuckets();
+ for (var seriesBuckets : reducedBucketsList) {
+ for (DoubleArrayList values : seriesBuckets.seriesValues()) {
+ double[] reducedValues = newBuckets();
+ for (int i = 0; i < values.size(); i++) {
+ reducedValues[i] += values.getDouble(i);
+ }
+ seriesReducedValues.add(DoubleArrayList.wrap(reducedValues));
+ }
+ var totals = seriesBuckets.totals();
+ for (int i = 0; i < totals.size(); i++) {
+ reducedTotals[i] += totals.getDouble(i);
+ }
+ }
+ return new Buckets(seriesReducedValues, DoubleArrayList.wrap(reducedTotals));
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/DoubleBucketValueSource.java b/src/main/java/it/cavallium/dbengine/lucene/collector/DoubleBucketValueSource.java
new file mode 100644
index 0000000..f85329f
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/lucene/collector/DoubleBucketValueSource.java
@@ -0,0 +1,5 @@
+package it.cavallium.dbengine.lucene.collector;
+
+import org.apache.lucene.search.DoubleValuesSource;
+
+public record DoubleBucketValueSource(DoubleValuesSource source) implements BucketValueSource {}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/LongBucketValueSource.java b/src/main/java/it/cavallium/dbengine/lucene/collector/LongBucketValueSource.java
new file mode 100644
index 0000000..7434e86
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/lucene/collector/LongBucketValueSource.java
@@ -0,0 +1,5 @@
+package it.cavallium.dbengine.lucene.collector;
+
+import org.apache.lucene.search.LongValuesSource;
+
+public record LongBucketValueSource(LongValuesSource source) implements BucketValueSource {}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/NullValueSource.java b/src/main/java/it/cavallium/dbengine/lucene/collector/NullValueSource.java
new file mode 100644
index 0000000..1766e8e
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/lucene/collector/NullValueSource.java
@@ -0,0 +1,3 @@
+package it.cavallium.dbengine.lucene.collector;
+
+public record NullValueSource() implements BucketValueSource {}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/BucketParams.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/BucketParams.java
index 54aaba0..e501000 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/searcher/BucketParams.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/BucketParams.java
@@ -1,6 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
-import org.jetbrains.annotations.Nullable;
+import it.cavallium.dbengine.lucene.collector.BucketValueSource;
+import org.jetbrains.annotations.NotNull;
public record BucketParams(double min, double max, int buckets, String bucketFieldName,
- @Nullable String valueFieldName) {}
+ @NotNull BucketValueSource valueSource) {}
diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java
index a7e0171..7131245 100644
--- a/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java
+++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java
@@ -1,18 +1,15 @@
package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Send;
-import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
-import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
-import it.cavallium.dbengine.lucene.FullDocs;
-import it.cavallium.dbengine.lucene.LLFieldDoc;
-import it.cavallium.dbengine.lucene.LuceneUtils;
-import it.cavallium.dbengine.lucene.collector.DecimalBucketCollectorMultiManager;
-import it.cavallium.dbengine.lucene.collector.LMDBFullFieldDocCollector;
-import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
-import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.cavallium.dbengine.lucene.collector.Buckets;
+import it.cavallium.dbengine.lucene.collector.DecimalBucketMultiCollectorManager;
+import java.util.List;
import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
@@ -22,47 +19,40 @@ public class DecimalBucketMultiSearcher {
protected static final Logger logger = LoggerFactory.getLogger(DecimalBucketMultiSearcher.class);
- public Mono collectMulti(Mono> indexSearchersMono,
+ public Mono collectMulti(Mono> indexSearchersMono,
BucketParams bucketParams,
- LocalQueryParams queryParams,
- LLSearchTransformer transformer) {
- Mono queryParamsMono;
- if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
- queryParamsMono = Mono.just(queryParams);
- } else {
- queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono
- .fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true);
- }
+ @NotNull List queries,
+ @Nullable Query normalizationQuery) {
- return queryParamsMono.flatMap(queryParams2 -> LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this
+ return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this
// Search results
- .search(indexSearchers.shards(), bucketParams, queryParams2)
+ .search(indexSearchers.shards(), bucketParams, queries, normalizationQuery)
// Ensure that one result is always returned
.single(),
- true));
+ true);
}
- private Mono search(Iterable indexSearchers,
+ private Mono search(Iterable indexSearchers,
BucketParams bucketParams,
- LocalQueryParams queryParams) {
+ @NotNull List queries,
+ @Nullable Query normalizationQuery) {
return Mono
.fromCallable(() -> {
LLUtils.ensureBlocking();
- return new DecimalBucketCollectorMultiManager(bucketParams.min(),
+ return new DecimalBucketMultiCollectorManager(bucketParams.min(),
bucketParams.max(),
bucketParams.buckets(),
bucketParams.bucketFieldName(),
- bucketParams.valueFieldName()
+ bucketParams.valueSource(),
+ queries,
+ normalizationQuery
);
})
.flatMap(cmm -> Flux
.fromIterable(indexSearchers)
.flatMap(shard -> Mono.fromCallable(() -> {
LLUtils.ensureBlocking();
-
- var collector = cmm.get(shard);
-
- return shard.search(queryParams.query(), collector);
+ return cmm.search(shard);
}))
.collectList()
.flatMap(results -> Mono.fromCallable(() -> {