Implement numeric buckets collector
This commit is contained in:
parent
e014266b8d
commit
ed00d474d6
@ -8,6 +8,8 @@ import it.cavallium.dbengine.database.Delta;
|
|||||||
import it.cavallium.dbengine.database.LLSnapshottable;
|
import it.cavallium.dbengine.database.LLSnapshottable;
|
||||||
import it.cavallium.dbengine.database.collections.ValueGetter;
|
import it.cavallium.dbengine.database.collections.ValueGetter;
|
||||||
import it.cavallium.dbengine.database.collections.ValueTransformer;
|
import it.cavallium.dbengine.database.collections.ValueTransformer;
|
||||||
|
import it.cavallium.dbengine.lucene.searcher.BucketParams;
|
||||||
|
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
@ -53,6 +55,8 @@ public interface LuceneIndex<T, U> extends LLSnapshottable {
|
|||||||
|
|
||||||
Mono<Hits<HitKey<T>>> search(ClientQueryParams queryParams);
|
Mono<Hits<HitKey<T>>> search(ClientQueryParams queryParams);
|
||||||
|
|
||||||
|
Mono<DoubleArrayList> computeBuckets(ClientQueryParams queryParams, BucketParams bucketParams);
|
||||||
|
|
||||||
Mono<TotalHitsCount> count(@Nullable CompositeSnapshot snapshot, Query query);
|
Mono<TotalHitsCount> count(@Nullable CompositeSnapshot snapshot, Query query);
|
||||||
|
|
||||||
boolean isLowMemoryMode();
|
boolean isLowMemoryMode();
|
||||||
|
@ -3,11 +3,14 @@ package it.cavallium.dbengine.client;
|
|||||||
import io.net5.buffer.api.Send;
|
import io.net5.buffer.api.Send;
|
||||||
import it.cavallium.dbengine.client.query.ClientQueryParams;
|
import it.cavallium.dbengine.client.query.ClientQueryParams;
|
||||||
import it.cavallium.dbengine.client.query.current.data.Query;
|
import it.cavallium.dbengine.client.query.current.data.Query;
|
||||||
|
import it.cavallium.dbengine.client.query.current.data.QueryParams;
|
||||||
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
|
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
|
||||||
import it.cavallium.dbengine.database.LLLuceneIndex;
|
import it.cavallium.dbengine.database.LLLuceneIndex;
|
||||||
import it.cavallium.dbengine.database.LLSearchResultShard;
|
import it.cavallium.dbengine.database.LLSearchResultShard;
|
||||||
import it.cavallium.dbengine.database.LLSnapshot;
|
import it.cavallium.dbengine.database.LLSnapshot;
|
||||||
import it.cavallium.dbengine.database.LLTerm;
|
import it.cavallium.dbengine.database.LLTerm;
|
||||||
|
import it.cavallium.dbengine.lucene.searcher.BucketParams;
|
||||||
|
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@ -109,6 +112,16 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
|
|||||||
.single();
|
.single();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<DoubleArrayList> computeBuckets(ClientQueryParams queryParams, BucketParams bucketParams) {
|
||||||
|
return luceneIndex
|
||||||
|
.computeBuckets(resolveSnapshot(queryParams.snapshot()),
|
||||||
|
queryParams.toQueryParams(),
|
||||||
|
bucketParams
|
||||||
|
)
|
||||||
|
.single();
|
||||||
|
}
|
||||||
|
|
||||||
private Hits<HitKey<T>> mapResults(LLSearchResultShard llSearchResult) {
|
private Hits<HitKey<T>> mapResults(LLSearchResultShard llSearchResult) {
|
||||||
var scoresWithKeysFlux = llSearchResult
|
var scoresWithKeysFlux = llSearchResult
|
||||||
.results()
|
.results()
|
||||||
|
@ -7,6 +7,8 @@ import it.cavallium.dbengine.client.query.current.data.NoSort;
|
|||||||
import it.cavallium.dbengine.client.query.current.data.Query;
|
import it.cavallium.dbengine.client.query.current.data.Query;
|
||||||
import it.cavallium.dbengine.client.query.current.data.QueryParams;
|
import it.cavallium.dbengine.client.query.current.data.QueryParams;
|
||||||
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
|
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
|
||||||
|
import it.cavallium.dbengine.lucene.searcher.BucketParams;
|
||||||
|
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@ -55,6 +57,11 @@ public interface LLLuceneIndex extends LLSnapshottable {
|
|||||||
*/
|
*/
|
||||||
Mono<LLSearchResultShard> search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName);
|
Mono<LLSearchResultShard> search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return buckets with each value collected into one of the buckets
|
||||||
|
*/
|
||||||
|
Mono<DoubleArrayList> computeBuckets(@Nullable LLSnapshot snapshot, QueryParams queryParams, BucketParams bucketParams);
|
||||||
|
|
||||||
default Mono<TotalHitsCount> count(@Nullable LLSnapshot snapshot, Query query) {
|
default Mono<TotalHitsCount> count(@Nullable LLSnapshot snapshot, Query query) {
|
||||||
QueryParams params = QueryParams.of(query, 0, 0, Nullablefloat.empty(), NoSort.of(), false);
|
QueryParams params = QueryParams.of(query, 0, 0, Nullablefloat.empty(), NoSort.of(), false);
|
||||||
return Mono.from(this.search(snapshot, params, null)
|
return Mono.from(this.search(snapshot, params, null)
|
||||||
|
@ -6,6 +6,7 @@ import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANS
|
|||||||
import io.micrometer.core.instrument.MeterRegistry;
|
import io.micrometer.core.instrument.MeterRegistry;
|
||||||
import io.net5.buffer.api.Resource;
|
import io.net5.buffer.api.Resource;
|
||||||
import io.net5.buffer.api.Send;
|
import io.net5.buffer.api.Send;
|
||||||
|
import io.net5.buffer.api.internal.ResourceSupport;
|
||||||
import it.cavallium.dbengine.client.DirectIOOptions;
|
import it.cavallium.dbengine.client.DirectIOOptions;
|
||||||
import it.cavallium.dbengine.client.IndicizerAnalyzers;
|
import it.cavallium.dbengine.client.IndicizerAnalyzers;
|
||||||
import it.cavallium.dbengine.client.IndicizerSimilarities;
|
import it.cavallium.dbengine.client.IndicizerSimilarities;
|
||||||
@ -26,11 +27,15 @@ import it.cavallium.dbengine.lucene.LuceneHacks;
|
|||||||
import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory;
|
import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory;
|
||||||
import it.cavallium.dbengine.lucene.LuceneUtils;
|
import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||||
import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
|
import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
|
||||||
|
import it.cavallium.dbengine.lucene.searcher.BucketParams;
|
||||||
|
import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher;
|
||||||
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
|
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
|
||||||
import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
|
import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
|
||||||
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
|
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
|
||||||
|
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@ -72,6 +77,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
|||||||
|
|
||||||
protected static final Logger logger = LoggerFactory.getLogger(LLLocalLuceneIndex.class);
|
protected static final Logger logger = LoggerFactory.getLogger(LLLocalLuceneIndex.class);
|
||||||
private final LocalSearcher localSearcher;
|
private final LocalSearcher localSearcher;
|
||||||
|
private final DecimalBucketMultiSearcher decimalBucketMultiSearcher = new DecimalBucketMultiSearcher();
|
||||||
/**
|
/**
|
||||||
* Global lucene index scheduler.
|
* Global lucene index scheduler.
|
||||||
* There is only a single thread globally to not overwhelm the disk with
|
* There is only a single thread globally to not overwhelm the disk with
|
||||||
@ -388,6 +394,21 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
|||||||
.doOnDiscard(Resource.class, Resource::close);
|
.doOnDiscard(Resource.class, Resource::close);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<DoubleArrayList> computeBuckets(@Nullable LLSnapshot snapshot,
|
||||||
|
QueryParams queryParams,
|
||||||
|
BucketParams bucketParams) {
|
||||||
|
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
|
||||||
|
var searchers = searcherManager
|
||||||
|
.retrieveSearcher(snapshot)
|
||||||
|
.map(indexSearcher -> LLIndexSearchers.unsharded(indexSearcher).send());
|
||||||
|
|
||||||
|
return decimalBucketMultiSearcher
|
||||||
|
.collectMulti(searchers, bucketParams, localQueryParams, NO_TRANSFORMATION)
|
||||||
|
.doOnDiscard(Send.class, Send::close)
|
||||||
|
.doOnDiscard(Resource.class, Resource::close);
|
||||||
|
}
|
||||||
|
|
||||||
public Mono<Send<LLIndexSearcher>> retrieveSearcher(@Nullable LLSnapshot snapshot) {
|
public Mono<Send<LLIndexSearcher>> retrieveSearcher(@Nullable LLSnapshot snapshot) {
|
||||||
return searcherManager
|
return searcherManager
|
||||||
.retrieveSearcher(snapshot)
|
.retrieveSearcher(snapshot)
|
||||||
|
@ -17,9 +17,12 @@ import it.cavallium.dbengine.database.LLTerm;
|
|||||||
import it.cavallium.dbengine.lucene.LuceneHacks;
|
import it.cavallium.dbengine.lucene.LuceneHacks;
|
||||||
import it.cavallium.dbengine.lucene.LuceneUtils;
|
import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||||
import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher;
|
import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher;
|
||||||
|
import it.cavallium.dbengine.lucene.searcher.BucketParams;
|
||||||
|
import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher;
|
||||||
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
|
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
|
||||||
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
|
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
|
||||||
import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
|
import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
|
||||||
|
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
@ -51,6 +54,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
|||||||
private final PerFieldSimilarityWrapper luceneSimilarity;
|
private final PerFieldSimilarityWrapper luceneSimilarity;
|
||||||
|
|
||||||
private final MultiSearcher multiSearcher;
|
private final MultiSearcher multiSearcher;
|
||||||
|
private final DecimalBucketMultiSearcher decimalBucketMultiSearcher = new DecimalBucketMultiSearcher();
|
||||||
|
|
||||||
public LLLocalMultiLuceneIndex(LLTempLMDBEnv env,
|
public LLLocalMultiLuceneIndex(LLTempLMDBEnv env,
|
||||||
Path lucene,
|
Path lucene,
|
||||||
@ -243,6 +247,19 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
|||||||
.doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close);
|
.doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<DoubleArrayList> computeBuckets(@Nullable LLSnapshot snapshot,
|
||||||
|
QueryParams queryParams,
|
||||||
|
BucketParams bucketParams) {
|
||||||
|
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
|
||||||
|
var searchers = getIndexSearchers(snapshot);
|
||||||
|
|
||||||
|
// Collect all the shards results into a single global result
|
||||||
|
return decimalBucketMultiSearcher
|
||||||
|
.collectMulti(searchers, bucketParams, localQueryParams, LLSearchTransformer.NO_TRANSFORMATION)
|
||||||
|
.doOnDiscard(Send.class, Send::close).doOnDiscard(Resource.class, Resource::close);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<Void> close() {
|
public Mono<Void> close() {
|
||||||
return Flux
|
return Flux
|
||||||
|
@ -0,0 +1,158 @@
|
|||||||
|
package it.cavallium.dbengine.lucene.collector;
|
||||||
|
|
||||||
|
import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||||
|
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import org.apache.commons.lang3.NotImplementedException;
|
||||||
|
import org.apache.lucene.document.DocumentStoredFieldVisitor;
|
||||||
|
import org.apache.lucene.index.IndexableField;
|
||||||
|
import org.apache.lucene.search.Collector;
|
||||||
|
import org.apache.lucene.search.CollectorManager;
|
||||||
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
|
import org.apache.lucene.search.Query;
|
||||||
|
import org.apache.lucene.search.ScoreMode;
|
||||||
|
import org.apache.lucene.search.SimpleCollector;
|
||||||
|
import org.apache.lucene.search.TopDocs;
|
||||||
|
import org.apache.lucene.search.TopFieldDocs;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
|
||||||
|
public class DecimalBucketCollectorMultiManager implements CollectorMultiManager<DoubleArrayList, DoubleArrayList> {
|
||||||
|
|
||||||
|
|
||||||
|
private final String bucketField;
|
||||||
|
@Nullable
|
||||||
|
private final String valueField;
|
||||||
|
private final Set<String> fieldsToLoad;
|
||||||
|
|
||||||
|
private final double totalLength;
|
||||||
|
private final double bucketLength;
|
||||||
|
private final double minimum;
|
||||||
|
private final double maximum;
|
||||||
|
private final int buckets;
|
||||||
|
|
||||||
|
public DecimalBucketCollectorMultiManager(double minimum,
|
||||||
|
double maximum,
|
||||||
|
double buckets,
|
||||||
|
String bucketField,
|
||||||
|
@Nullable String valueField) {
|
||||||
|
var bucketsInt = (int) Math.ceil(buckets);
|
||||||
|
this.minimum = minimum;
|
||||||
|
this.maximum = maximum;
|
||||||
|
this.buckets = bucketsInt;
|
||||||
|
this.bucketLength = (maximum - minimum) / bucketsInt;
|
||||||
|
this.totalLength = bucketLength * bucketsInt;
|
||||||
|
this.bucketField = bucketField;
|
||||||
|
this.valueField = valueField;
|
||||||
|
if (valueField != null) {
|
||||||
|
this.fieldsToLoad = Set.of(bucketField, valueField);
|
||||||
|
} else {
|
||||||
|
this.fieldsToLoad = Set.of(bucketField);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public double[] newBuckets() {
|
||||||
|
return new double[buckets];
|
||||||
|
}
|
||||||
|
|
||||||
|
public CollectorManager<BucketsCollector, DoubleArrayList> get(IndexSearcher indexSearcher) {
|
||||||
|
|
||||||
|
return new CollectorManager<>() {
|
||||||
|
@Override
|
||||||
|
public BucketsCollector newCollector() {
|
||||||
|
return new BucketsCollector(indexSearcher);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleArrayList reduce(Collection<BucketsCollector> collectors) {
|
||||||
|
double[] reducedBuckets = newBuckets();
|
||||||
|
for (BucketsCollector collector : collectors) {
|
||||||
|
var buckets = collector.getBuckets();
|
||||||
|
assert reducedBuckets.length == buckets.length;
|
||||||
|
for (int i = 0; i < buckets.length; i++) {
|
||||||
|
reducedBuckets[i] += buckets[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return DoubleArrayList.wrap(reducedBuckets);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScoreMode scoreMode() {
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleArrayList reduce(List<DoubleArrayList> reducedBucketsList) {
|
||||||
|
if (reducedBucketsList.size() == 1) {
|
||||||
|
return reducedBucketsList.get(0);
|
||||||
|
}
|
||||||
|
double[] reducedBuckets = newBuckets();
|
||||||
|
for (DoubleArrayList buckets : reducedBucketsList) {
|
||||||
|
for (int i = 0; i < buckets.size(); i++) {
|
||||||
|
reducedBuckets[i] += buckets.getDouble(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return DoubleArrayList.wrap(reducedBuckets);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class BucketsCollector extends SimpleCollector {
|
||||||
|
|
||||||
|
private final IndexSearcher indexSearcher;
|
||||||
|
private final DocumentStoredFieldVisitor documentStoredFieldVisitor;
|
||||||
|
private final double[] buckets;
|
||||||
|
|
||||||
|
public BucketsCollector(IndexSearcher indexSearcher) {
|
||||||
|
super();
|
||||||
|
this.indexSearcher = indexSearcher;
|
||||||
|
this.documentStoredFieldVisitor = new DocumentStoredFieldVisitor(fieldsToLoad);
|
||||||
|
this.buckets = newBuckets();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScoreMode scoreMode() {
|
||||||
|
return ScoreMode.COMPLETE_NO_SCORES;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void collect(int doc) throws IOException {
|
||||||
|
indexSearcher.doc(doc, documentStoredFieldVisitor);
|
||||||
|
var document = documentStoredFieldVisitor.getDocument();
|
||||||
|
var bucketField = document.getField(DecimalBucketCollectorMultiManager.this.bucketField);
|
||||||
|
IndexableField valueField;
|
||||||
|
if (DecimalBucketCollectorMultiManager.this.valueField != null) {
|
||||||
|
valueField = document.getField(DecimalBucketCollectorMultiManager.this.valueField);
|
||||||
|
} else {
|
||||||
|
valueField = null;
|
||||||
|
}
|
||||||
|
var bucketValue = bucketField.numericValue().doubleValue();
|
||||||
|
if (bucketValue >= minimum && bucketValue <= maximum) {
|
||||||
|
double value;
|
||||||
|
if (valueField != null) {
|
||||||
|
value = valueField.numericValue().doubleValue();
|
||||||
|
} else {
|
||||||
|
value = 1.0d;
|
||||||
|
}
|
||||||
|
double bucketIndex = (bucketValue - minimum) / bucketLength;
|
||||||
|
int bucketIndexLow = (int) Math.floor(bucketIndex);
|
||||||
|
double ratio = (bucketIndex - bucketIndexLow);
|
||||||
|
assert ratio >= 0 && ratio <= 1;
|
||||||
|
double loValue = value * (1d - ratio);
|
||||||
|
double hiValue = value * ratio;
|
||||||
|
buckets[bucketIndexLow] += loValue;
|
||||||
|
if (hiValue > 0d) {
|
||||||
|
buckets[bucketIndexLow + 1] += hiValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public double[] getBuckets() {
|
||||||
|
return buckets;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,6 @@
|
|||||||
|
package it.cavallium.dbengine.lucene.searcher;
|
||||||
|
|
||||||
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
|
||||||
|
public record BucketParams(double min, double max, int buckets, String bucketFieldName,
|
||||||
|
@Nullable String valueFieldName) {}
|
@ -0,0 +1,74 @@
|
|||||||
|
package it.cavallium.dbengine.lucene.searcher;
|
||||||
|
|
||||||
|
import io.net5.buffer.api.Send;
|
||||||
|
import it.cavallium.dbengine.database.LLKeyScore;
|
||||||
|
import it.cavallium.dbengine.database.LLUtils;
|
||||||
|
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
|
||||||
|
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
||||||
|
import it.cavallium.dbengine.lucene.FullDocs;
|
||||||
|
import it.cavallium.dbengine.lucene.LLFieldDoc;
|
||||||
|
import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||||
|
import it.cavallium.dbengine.lucene.collector.DecimalBucketCollectorMultiManager;
|
||||||
|
import it.cavallium.dbengine.lucene.collector.LMDBFullFieldDocCollector;
|
||||||
|
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
|
||||||
|
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
|
||||||
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
|
import org.warp.commonutils.log.Logger;
|
||||||
|
import org.warp.commonutils.log.LoggerFactory;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
public class DecimalBucketMultiSearcher {
|
||||||
|
|
||||||
|
protected static final Logger logger = LoggerFactory.getLogger(DecimalBucketMultiSearcher.class);
|
||||||
|
|
||||||
|
public Mono<DoubleArrayList> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
|
||||||
|
BucketParams bucketParams,
|
||||||
|
LocalQueryParams queryParams,
|
||||||
|
LLSearchTransformer transformer) {
|
||||||
|
Mono<LocalQueryParams> queryParamsMono;
|
||||||
|
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
|
||||||
|
queryParamsMono = Mono.just(queryParams);
|
||||||
|
} else {
|
||||||
|
queryParamsMono = LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> transformer.transform(Mono
|
||||||
|
.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams))), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
return queryParamsMono.flatMap(queryParams2 -> LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this
|
||||||
|
// Search results
|
||||||
|
.search(indexSearchers.shards(), bucketParams, queryParams2)
|
||||||
|
// Ensure that one result is always returned
|
||||||
|
.single(),
|
||||||
|
true));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Mono<DoubleArrayList> search(Iterable<IndexSearcher> indexSearchers,
|
||||||
|
BucketParams bucketParams,
|
||||||
|
LocalQueryParams queryParams) {
|
||||||
|
return Mono
|
||||||
|
.fromCallable(() -> {
|
||||||
|
LLUtils.ensureBlocking();
|
||||||
|
return new DecimalBucketCollectorMultiManager(bucketParams.min(),
|
||||||
|
bucketParams.max(),
|
||||||
|
bucketParams.buckets(),
|
||||||
|
bucketParams.bucketFieldName(),
|
||||||
|
bucketParams.valueFieldName()
|
||||||
|
);
|
||||||
|
})
|
||||||
|
.flatMap(cmm -> Flux
|
||||||
|
.fromIterable(indexSearchers)
|
||||||
|
.flatMap(shard -> Mono.fromCallable(() -> {
|
||||||
|
LLUtils.ensureBlocking();
|
||||||
|
|
||||||
|
var collector = cmm.get(shard);
|
||||||
|
|
||||||
|
return shard.search(queryParams.query(), collector);
|
||||||
|
}))
|
||||||
|
.collectList()
|
||||||
|
.flatMap(results -> Mono.fromCallable(() -> {
|
||||||
|
LLUtils.ensureBlocking();
|
||||||
|
return cmm.reduce(results);
|
||||||
|
}))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user