Finalize and test the new implementation

This commit is contained in:
Andrea Cavalli 2023-02-26 21:41:20 +01:00
parent daa7047614
commit 0e21c72e0a
25 changed files with 248 additions and 419 deletions

View File

@ -1,29 +1,28 @@
package it.cavallium.dbengine.client;
import com.google.common.collect.Lists;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.DiscardingCloseable;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.collections.ValueGetter;
import it.cavallium.dbengine.lucene.LuceneCloseable;
import it.cavallium.dbengine.utils.SimpleResource;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class Hits<T> extends SimpleResource implements DiscardingCloseable {
public class Hits<T> {
private static final Logger LOG = LogManager.getLogger(Hits.class);
private static final Hits<?> EMPTY_HITS = new Hits<>(Stream.empty(), TotalHitsCount.of(0, true), false);
private final Stream<T> results;
private static final Hits<?> EMPTY_HITS = new Hits<>(List.of(), TotalHitsCount.of(0, true));
private final List<T> results;
private final TotalHitsCount totalHitsCount;
public Hits(Stream<T> results, TotalHitsCount totalHitsCount) {
this(results, totalHitsCount, true);
}
private Hits(Stream<T> results, TotalHitsCount totalHitsCount, boolean canClose) {
super(canClose);
public Hits(List<T> results, TotalHitsCount totalHitsCount) {
this.results = results;
this.totalHitsCount = totalHitsCount;
}
@ -36,27 +35,18 @@ public class Hits<T> extends SimpleResource implements DiscardingCloseable {
public static <T, U> Function<Hits<HitKey<T>>, Hits<HitEntry<T, U>>> generateMapper(
ValueGetter<T, U> valueGetter) {
return result -> {
var hitsToTransform = result.results()
.map(hit -> new HitEntry<>(hit.key(), valueGetter.get(hit.key()), hit.score()));
return Hits.withResource(hitsToTransform, result.totalHitsCount(), result);
List<HitEntry<T, U>> hitsToTransform = LLUtils.mapList(result.results,
hit -> new HitEntry<>(hit.key(), valueGetter.get(hit.key()), hit.score())
);
return new Hits<>(hitsToTransform, result.totalHitsCount());
};
}
public static <T> Hits<T> withResource(Stream<T> hits, TotalHitsCount count, SafeCloseable resource) {
if (resource instanceof LuceneCloseable luceneCloseable) {
return new LuceneHits<>(hits, count, luceneCloseable);
} else {
return new CloseableHits<>(hits, count, resource);
}
}
public Stream<T> results() {
ensureOpen();
public List<T> results() {
return results;
}
public TotalHitsCount totalHitsCount() {
ensureOpen();
return totalHitsCount;
}
@ -64,48 +54,4 @@ public class Hits<T> extends SimpleResource implements DiscardingCloseable {
public String toString() {
return "Hits[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ']';
}
@Override
protected void onClose() {
}
public static final class LuceneHits<U> extends Hits<U> implements LuceneCloseable {
private final LuceneCloseable resource;
public LuceneHits(Stream<U> hits, TotalHitsCount count, LuceneCloseable resource) {
super(hits, count);
this.resource = resource;
}
@Override
protected void onClose() {
try {
resource.close();
} catch (Throwable ex) {
LOG.error("Failed to close resource", ex);
}
super.onClose();
}
}
public static final class CloseableHits<U> extends Hits<U> {
private final SafeCloseable resource;
public CloseableHits(Stream<U> hits, TotalHitsCount count, SafeCloseable resource) {
super(hits, count);
this.resource = resource;
}
@Override
protected void onClose() {
try {
resource.close();
} catch (Throwable ex) {
LOG.error("Failed to close resource", ex);
}
super.onClose();
}
}
}

View File

@ -1,28 +1,28 @@
package it.cavallium.dbengine.client;
import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_SCHEDULER;
import static it.cavallium.dbengine.utils.StreamUtils.collect;
import static it.cavallium.dbengine.utils.StreamUtils.collectOn;
import static it.cavallium.dbengine.utils.StreamUtils.fastListing;
import static it.cavallium.dbengine.utils.StreamUtils.toListOn;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import it.cavallium.dbengine.client.Hits.CloseableHits;
import it.cavallium.dbengine.client.Hits.LuceneHits;
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSearchResultShard.LuceneLLSearchResultShard;
import it.cavallium.dbengine.database.LLSearchResultShard.ResourcesLLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.lucene.LuceneCloseable;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.cavallium.dbengine.utils.StreamUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.time.Duration;
import java.util.List;
@ -39,7 +39,6 @@ import org.jetbrains.annotations.Nullable;
public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
private static final Duration MAX_COUNT_TIME = Duration.ofSeconds(30);
private static final Logger LOG = LogManager.getLogger(LuceneIndex.class);
private final LLLuceneIndex luceneIndex;
private final Indicizer<T,U> indicizer;
@ -120,14 +119,10 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
}
private Hits<HitKey<T>> mapResults(LLSearchResultShard llSearchResult) {
Stream<HitKey<T>> scoresWithKeysFlux = llSearchResult.results()
.map(hit -> new HitKey<>(indicizer.getKey(hit.key()), hit.score()));
if (llSearchResult instanceof LuceneCloseable luceneCloseable) {
return new LuceneHits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), luceneCloseable);
} else {
return new CloseableHits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), llSearchResult);
}
List<HitKey<T>> scoresWithKeys = LLUtils.mapList(llSearchResult.results(),
hit -> new HitKey<>(indicizer.getKey(hit.key()), hit.score())
);
return new Hits<>(scoresWithKeys, llSearchResult.totalHitsCount());
}
@Override
@ -203,20 +198,14 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
}
TotalHitsCount count = null;
ObjectArrayList<Stream<LLKeyScore>> results = new ObjectArrayList<>(shards.size());
ObjectArrayList resources = new ObjectArrayList(shards.size());
boolean luceneResources = false;
var maxLimit = queryParams.offset() + queryParams.limit();
for (LLSearchResultShard shard : shards) {
if (!luceneResources && shard instanceof LuceneCloseable) {
luceneResources = true;
}
if (count == null) {
count = shard.totalHitsCount();
} else {
count = LuceneUtils.sum(count, shard.totalHitsCount());
}
var maxLimit = queryParams.offset() + queryParams.limit();
results.add(shard.results().limit(maxLimit));
resources.add(shard);
results.add(shard.results().stream().limit(maxLimit));
}
Objects.requireNonNull(count);
Stream<LLKeyScore> resultsFlux;
@ -225,13 +214,9 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
} else if (results.size() == 1) {
resultsFlux = results.get(0);
} else {
resultsFlux = results.stream().flatMap(Function.identity());
}
if (luceneResources) {
return new LuceneLLSearchResultShard(resultsFlux, count, (List<LuceneCloseable>) resources);
} else {
return new ResourcesLLSearchResultShard(resultsFlux, count, (List<SafeCloseable>) resources);
resultsFlux = results.stream().flatMap(Function.identity()).limit(maxLimit);
}
return new LLSearchResultShard(StreamUtils.toList(resultsFlux), count);
}
}

View File

@ -37,6 +37,7 @@ public interface LLLuceneIndex extends LLSnapshottable, IBackuppable, SafeClosea
void deleteAll();
// todo: add a filterer parameter?
/**
* @param queryParams the limit is valid for each lucene instance. If you have 15 instances, the number of elements
* returned can be at most <code>limit * 15</code>.
@ -49,6 +50,7 @@ public interface LLLuceneIndex extends LLSnapshottable, IBackuppable, SafeClosea
@Nullable String keyFieldName,
Multimap<String, String> mltDocumentFields);
// todo: add a filterer parameter?
/**
* @param queryParams the limit is valid for each lucene instance. If you have 15 instances, the number of elements
* returned can be at most <code>limit * 15</code>

View File

@ -9,35 +9,23 @@ import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class LLSearchResultShard extends SimpleResource implements DiscardingCloseable {
public class LLSearchResultShard {
private static final Logger LOG = LogManager.getLogger(LLSearchResultShard.class);
private final Stream<LLKeyScore> results;
private final List<LLKeyScore> results;
private final TotalHitsCount totalHitsCount;
public LLSearchResultShard(Stream<LLKeyScore> results, TotalHitsCount totalHitsCount) {
public LLSearchResultShard(List<LLKeyScore> results, TotalHitsCount totalHitsCount) {
this.results = results;
this.totalHitsCount = totalHitsCount;
}
public static LLSearchResultShard withResource(Stream<LLKeyScore> results,
TotalHitsCount totalHitsCount,
SafeCloseable closeableResource) {
if (closeableResource instanceof LuceneCloseable luceneCloseable) {
return new LuceneLLSearchResultShard(results, totalHitsCount, List.of(luceneCloseable));
} else {
return new ResourcesLLSearchResultShard(results, totalHitsCount, List.of(closeableResource));
}
}
public Stream<LLKeyScore> results() {
ensureOpen();
public List<LLKeyScore> results() {
return results;
}
public TotalHitsCount totalHitsCount() {
ensureOpen();
return totalHitsCount;
}
@ -60,65 +48,4 @@ public class LLSearchResultShard extends SimpleResource implements DiscardingClo
public String toString() {
return "LLSearchResultShard[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ']';
}
@Override
public void onClose() {
results.close();
}
public static class ResourcesLLSearchResultShard extends LLSearchResultShard {
private final List<SafeCloseable> resources;
public ResourcesLLSearchResultShard(Stream<LLKeyScore> resultsFlux,
TotalHitsCount count,
List<SafeCloseable> resources) {
super(resultsFlux, count);
this.resources = resources;
}
@Override
public void onClose() {
try {
for (SafeCloseable resource : resources) {
try {
resource.close();
} catch (Throwable ex) {
LOG.error("Failed to close resource", ex);
}
}
} catch (Throwable ex) {
LOG.error("Failed to close resources", ex);
}
super.onClose();
}
}
public static class LuceneLLSearchResultShard extends LLSearchResultShard implements LuceneCloseable {
private final List<LuceneCloseable> resources;
public LuceneLLSearchResultShard(Stream<LLKeyScore> resultsFlux,
TotalHitsCount count,
List<LuceneCloseable> resources) {
super(resultsFlux, count);
this.resources = resources;
}
@Override
public void onClose() {
try {
for (LuceneCloseable resource : resources) {
try {
resource.close();
} catch (Throwable ex) {
LOG.error("Failed to close resource", ex);
}
}
} catch (Throwable ex) {
LOG.error("Failed to close resources", ex);
}
super.onClose();
}
}
}

View File

@ -6,6 +6,8 @@ import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.netty.util.IllegalReferenceCountException;
import it.cavallium.dbengine.buffers.Buf;
import it.cavallium.dbengine.client.HitEntry;
import it.cavallium.dbengine.client.HitKey;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.LuceneCloseable;
import it.cavallium.dbengine.lucene.LuceneUtils;
@ -24,6 +26,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -676,6 +679,11 @@ public class LLUtils {
}
}
public static <T, U> List<U> mapList(List<T> input, Function<T, U> mapper) {
//todo: optimize hits mapping
return input.stream().map(mapper).toList();
}
private static class FakeBytesRefBuilder extends BytesRefBuilder {
private final LLTerm term;

View File

@ -13,6 +13,7 @@ import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import it.unimi.dsi.fastutil.objects.ObjectArraySet;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@ -188,12 +189,15 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
@Override
public Stream<Entry<T, U>> setAllValuesAndGetPrevious(Stream<Entry<T, U>> entries) {
return entries.mapMulti((entry, sink) -> {
List<Entry<T, U>> prevList = entries.map(entry -> {
var prev = this.at(null, entry.getKey()).setAndGetPrevious(entry.getValue());
if (prev != null) {
sink.accept(Map.entry(entry.getKey(), prev));
return Map.entry(entry.getKey(), prev);
} else {
return null;
}
});
}).filter(Objects::nonNull).toList();
return prevList.stream();
}
@Override

View File

@ -7,6 +7,7 @@ import it.cavallium.dbengine.utils.SimpleResource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.lucene.search.IndexSearcher;
public interface LLIndexSearchers extends DiscardingCloseable {
@ -27,11 +28,12 @@ public interface LLIndexSearchers extends DiscardingCloseable {
LLIndexSearcher llShard(int shardIndex);
class UnshardedIndexSearchers extends SimpleResource implements LLIndexSearchers, LuceneCloseable {
class UnshardedIndexSearchers implements LLIndexSearchers, LuceneCloseable {
private final LLIndexSearcher indexSearcher;
public UnshardedIndexSearchers(LLIndexSearcher indexSearcher) {
Objects.requireNonNull(indexSearcher);
this.indexSearcher = indexSearcher;
}
@ -70,12 +72,12 @@ public interface LLIndexSearchers extends DiscardingCloseable {
}
@Override
protected void onClose() {
public void close() {
indexSearcher.close();
}
}
class ShardedIndexSearchers extends SimpleResource implements LLIndexSearchers, LuceneCloseable {
class ShardedIndexSearchers implements LLIndexSearchers, LuceneCloseable {
private final List<LLIndexSearcher> indexSearchers;
private final List<IndexSearcher> indexSearchersVals;
@ -117,7 +119,7 @@ public interface LLIndexSearchers extends DiscardingCloseable {
}
@Override
protected void onClose() {
public void close() {
for (LLIndexSearcher indexSearcher : indexSearchers) {
indexSearcher.close();
}

View File

@ -4,6 +4,7 @@ import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE;
import static it.cavallium.dbengine.database.LLUtils.toDocument;
import static it.cavallium.dbengine.database.LLUtils.toFields;
import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE;
import static it.cavallium.dbengine.lucene.searcher.LuceneSearchResult.EMPTY_COUNT;
import static it.cavallium.dbengine.utils.StreamUtils.collect;
import static it.cavallium.dbengine.utils.StreamUtils.fastListing;
import static java.util.Objects.requireNonNull;
@ -60,6 +61,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.stream.Stream;
@ -494,33 +496,38 @@ public class LLLocalLuceneIndex extends SimpleResource implements IBackuppable,
var searcher = this.searcherManager.retrieveSearcher(snapshot);
var transformer = new MoreLikeThisTransformer(mltDocumentFieldsFlux, luceneAnalyzer, luceneSimilarity);
var result = localSearcher.collect(searcher, localQueryParams, keyFieldName, transformer);
return Stream.of(LLSearchResultShard.withResource(result.results(), result.totalHitsCount(), result));
var result = localSearcher.collect(searcher, localQueryParams, keyFieldName, transformer, Function.identity());
return Stream.of(new LLSearchResultShard(result.results(), result.totalHitsCount()));
}
@Override
public Stream<LLSearchResultShard> search(@Nullable LLSnapshot snapshot, QueryParams queryParams,
@Nullable String keyFieldName) {
var result = searchInternal(snapshot, queryParams, keyFieldName);
var shard = LLSearchResultShard.withResource(result.results(), result.totalHitsCount(), result);
return Stream.of(shard).onClose(shard::close);
var shard = new LLSearchResultShard(result.results(), result.totalHitsCount());
return Stream.of(shard);
}
public LuceneSearchResult searchInternal(@Nullable LLSnapshot snapshot, QueryParams queryParams,
@Nullable String keyFieldName) {
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
try (var searcher = searcherManager.retrieveSearcher(snapshot)) {
return localSearcher.collect(searcher, localQueryParams, keyFieldName, NO_REWRITE);
if (searcher != null) {
return localSearcher.collect(searcher, localQueryParams, keyFieldName, NO_REWRITE, Function.identity());
} else {
return LuceneSearchResult.EMPTY;
}
}
}
@Override
public TotalHitsCount count(@Nullable LLSnapshot snapshot, Query query, @Nullable Duration timeout) {
var params = LuceneUtils.getCountQueryParams(query);
try (var result = this.searchInternal(snapshot, params, null)) {
if (result == null) return TotalHitsCount.of(0, true);
var result = this.searchInternal(snapshot, params, null);
if (result != null) {
return result.totalHitsCount();
} else {
return EMPTY_COUNT;
}
}
@ -534,9 +541,10 @@ public class LLLocalLuceneIndex extends SimpleResource implements IBackuppable,
localQueries.add(QueryParser.toQuery(query, luceneAnalyzer));
}
var localNormalizationQuery = QueryParser.toQuery(normalizationQuery, luceneAnalyzer);
LLIndexSearchers searchers = LLIndexSearchers.unsharded(searcherManager.retrieveSearcher(snapshot));
try (LLIndexSearchers searchers = LLIndexSearchers.unsharded(searcherManager.retrieveSearcher(snapshot))) {
return decimalBucketMultiSearcher.collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery);
return decimalBucketMultiSearcher.collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery);
}
}
public LLIndexSearcher retrieveSearcher(@Nullable LLSnapshot snapshot) {

View File

@ -58,6 +58,7 @@ import java.util.Objects;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
@ -210,14 +211,20 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI
String keyFieldName,
Multimap<String, String> mltDocumentFields) {
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
var searchers = this.getIndexSearchers(snapshot);
var transformer = new MoreLikeThisTransformer(mltDocumentFields, luceneAnalyzer, luceneSimilarity);
try (var searchers = this.getIndexSearchers(snapshot)) {
var transformer = new MoreLikeThisTransformer(mltDocumentFields, luceneAnalyzer, luceneSimilarity);
// Collect all the shards results into a single global result
LuceneSearchResult result = multiSearcher.collectMulti(searchers, localQueryParams, keyFieldName, transformer);
// Collect all the shards results into a single global result
LuceneSearchResult result = multiSearcher.collectMulti(searchers,
localQueryParams,
keyFieldName,
transformer,
Function.identity()
);
// Transform the result type
return Stream.of(new LLSearchResultShard(result.results(), result.totalHitsCount()));
// Transform the result type
return Stream.of(new LLSearchResultShard(result.results(), result.totalHitsCount()));
}
}
@Override
@ -227,17 +234,23 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI
LuceneSearchResult result = searchInternal(snapshot, queryParams, keyFieldName);
// Transform the result type
var shard = new LLSearchResultShard(result.results(), result.totalHitsCount());
return Stream.of(shard).onClose(shard::close);
return Stream.of(shard);
}
private LuceneSearchResult searchInternal(@Nullable LLSnapshot snapshot,
QueryParams queryParams,
@Nullable String keyFieldName) {
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
var searchers = getIndexSearchers(snapshot);
try (var searchers = getIndexSearchers(snapshot)) {
// Collect all the shards results into a single global result
return multiSearcher.collectMulti(searchers, localQueryParams, keyFieldName, GlobalQueryRewrite.NO_REWRITE);
// Collect all the shards results into a single global result
return multiSearcher.collectMulti(searchers,
localQueryParams,
keyFieldName,
GlobalQueryRewrite.NO_REWRITE,
Function.identity()
);
}
}
@Override
@ -257,10 +270,11 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI
localQueries.add(QueryParser.toQuery(query, luceneAnalyzer));
}
var localNormalizationQuery = QueryParser.toQuery(normalizationQuery, luceneAnalyzer);
var searchers = getIndexSearchers(snapshot);
try (var searchers = getIndexSearchers(snapshot)) {
// Collect all the shards results into a single global result
return decimalBucketMultiSearcher.collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery);
// Collect all the shards results into a single global result
return decimalBucketMultiSearcher.collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery);
}
}
@Override

View File

@ -64,6 +64,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
@ -722,10 +723,11 @@ public class LuceneUtils {
LLIndexSearcher indexSearcher,
LocalQueryParams queryParams,
String keyFieldName,
GlobalQueryRewrite transformer) {
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
var indexSearchers = LLIndexSearchers.unsharded(indexSearcher);
var queryParams2 = transformer.rewrite(indexSearchers, queryParams);
return localSearcher.collect(indexSearcher, queryParams2, keyFieldName, NO_REWRITE);
return localSearcher.collect(indexSearcher, queryParams2, keyFieldName, NO_REWRITE, filterer);
}
/**
@ -735,9 +737,10 @@ public class LuceneUtils {
LLIndexSearchers indexSearchers,
LocalQueryParams queryParams,
String keyFieldName,
GlobalQueryRewrite transformer) {
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
var queryParams2 = transformer.rewrite(indexSearchers, queryParams);
return multiSearcher.collectMulti(indexSearchers, queryParams2, keyFieldName, NO_REWRITE);
return multiSearcher.collectMulti(indexSearchers, queryParams2, keyFieldName, NO_REWRITE, filterer);
}
public static void checkLuceneThread() {

View File

@ -2,9 +2,12 @@ package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.function.Function;
import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;
public class AdaptiveLocalSearcher implements LocalSearcher {
@ -30,11 +33,12 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
public LuceneSearchResult collect(LLIndexSearcher indexSearcher,
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
if (transformer != NO_REWRITE) {
return LuceneUtils.rewrite(this, indexSearcher, queryParams, keyFieldName, transformer);
return LuceneUtils.rewrite(this, indexSearcher, queryParams, keyFieldName, transformer, filterer);
}
return transformedCollect(indexSearcher, queryParams, keyFieldName, transformer);
return transformedCollect(indexSearcher, queryParams, keyFieldName, transformer, filterer);
}
@Override
@ -46,35 +50,36 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
public LuceneSearchResult transformedCollect(LLIndexSearcher indexSearcher,
LocalQueryParams queryParams,
String keyFieldName,
GlobalQueryRewrite transformer) {
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
// offset + limit
long realLimit = queryParams.offsetLong() + queryParams.limitLong();
long maxAllowedInMemoryLimit
= Math.max(maxInMemoryResultEntries, (long) queryParams.pageLimits().getPageLimit(0));
if (queryParams.limitLong() == 0) {
return countSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer);
return countSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer, filterer);
} else if (realLimit <= maxInMemoryResultEntries) {
return standardSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer);
return standardSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer, filterer);
} else if (queryParams.isSorted()) {
if (realLimit <= maxAllowedInMemoryLimit) {
return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer);
return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer, filterer);
} else {
if (queryParams.isSortedByScore()) {
if (queryParams.limitLong() < maxInMemoryResultEntries) {
throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
}
return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer);
return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer, filterer);
} else {
if (queryParams.limitLong() < maxInMemoryResultEntries) {
throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
}
return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer);
return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer, filterer);
}
}
} else {
// Run large/unbounded searches using the continuous multi searcher
return unsortedUnscoredContinuous.collect(indexSearcher, queryParams, keyFieldName, transformer);
return unsortedUnscoredContinuous.collect(indexSearcher, queryParams, keyFieldName, transformer, filterer);
}
}
}

View File

@ -2,9 +2,11 @@ package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.function.Function;
import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;
public class AdaptiveMultiSearcher implements MultiSearcher {
@ -30,46 +32,48 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
public LuceneSearchResult collectMulti(LLIndexSearchers indexSearchers,
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
if (transformer != NO_REWRITE) {
return LuceneUtils.rewriteMulti(this, indexSearchers, queryParams, keyFieldName, transformer);
return LuceneUtils.rewriteMulti(this, indexSearchers, queryParams, keyFieldName, transformer, filterer);
}
return transformedCollectMulti(indexSearchers, queryParams, keyFieldName, transformer);
return transformedCollectMulti(indexSearchers, queryParams, keyFieldName, transformer, filterer);
}
// Remember to change also AdaptiveLocalSearcher
public LuceneSearchResult transformedCollectMulti(LLIndexSearchers indexSearchers,
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
// offset + limit
long realLimit = queryParams.offsetLong() + queryParams.limitLong();
long maxAllowedInMemoryLimit
= Math.max(maxInMemoryResultEntries, (long) queryParams.pageLimits().getPageLimit(0));
if (queryParams.limitLong() == 0) {
return count.collectMulti(indexSearchers, queryParams, keyFieldName, transformer);
return count.collectMulti(indexSearchers, queryParams, keyFieldName, transformer, filterer);
} else if (realLimit <= maxInMemoryResultEntries) {
return standardSearcher.collectMulti(indexSearchers, queryParams, keyFieldName, transformer);
return standardSearcher.collectMulti(indexSearchers, queryParams, keyFieldName, transformer, filterer);
} else if (queryParams.isSorted()) {
if (realLimit <= maxAllowedInMemoryLimit) {
return scoredPaged.collectMulti(indexSearchers, queryParams, keyFieldName, transformer);
return scoredPaged.collectMulti(indexSearchers, queryParams, keyFieldName, transformer, filterer);
} else {
if (queryParams.isSortedByScore()) {
if (queryParams.limitLong() < maxInMemoryResultEntries) {
throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
}
return scoredPaged.collectMulti(indexSearchers, queryParams, keyFieldName, transformer);
return scoredPaged.collectMulti(indexSearchers, queryParams, keyFieldName, transformer, filterer);
} else {
if (queryParams.limitLong() < maxInMemoryResultEntries) {
throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
}
return scoredPaged.collectMulti(indexSearchers, queryParams, keyFieldName, transformer);
return scoredPaged.collectMulti(indexSearchers, queryParams, keyFieldName, transformer, filterer);
}
}
} else {
// Run large/unbounded searches using the continuous multi searcher
return unsortedUnscoredContinuous.collectMulti(indexSearchers, queryParams, keyFieldName, transformer);
return unsortedUnscoredContinuous.collectMulti(indexSearchers, queryParams, keyFieldName, transformer, filterer);
}
}

View File

@ -1,12 +1,15 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.utils.DBException;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -21,10 +24,11 @@ public class CountMultiSearcher implements MultiSearcher {
@Override
public LuceneSearchResult collectMulti(LLIndexSearchers indexSearchers,
LocalQueryParams queryParams,
String keyFieldName,
GlobalQueryRewrite transformer) {
@Nullable String keyFieldName,
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
if (transformer != GlobalQueryRewrite.NO_REWRITE) {
return LuceneUtils.rewriteMulti(this, indexSearchers, queryParams, keyFieldName, transformer);
return LuceneUtils.rewriteMulti(this, indexSearchers, queryParams, keyFieldName, transformer, filterer);
}
if (queryParams.isSorted() && queryParams.limitLong() > 0) {
throw new UnsupportedOperationException(
@ -38,34 +42,39 @@ public class CountMultiSearcher implements MultiSearcher {
var results = indexSearchers
.llShards()
.stream()
.map(searcher -> this.collect(searcher, queryParams, keyFieldName, transformer))
.map(searcher -> this.collect(searcher,
queryParams,
keyFieldName,
transformer,
f -> filterer.apply(f).limit(0)
))
.toList();
boolean exactTotalHitsCount = true;
long totalHitsCountValue = 0;
for (LuceneSearchResult result : results) {
exactTotalHitsCount &= result.totalHitsCount().exact();
totalHitsCountValue += result.totalHitsCount().value();
result.close();
}
var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount);
return new LuceneSearchResult(totalHitsCount, Stream.empty());
return new LuceneSearchResult(totalHitsCount, List.of());
}
@Override
public LuceneSearchResult collect(LLIndexSearcher indexSearcher,
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
if (transformer != GlobalQueryRewrite.NO_REWRITE) {
return LuceneUtils.rewrite(this, indexSearcher, queryParams, keyFieldName, transformer);
return LuceneUtils.rewrite(this, indexSearcher, queryParams, keyFieldName, transformer, filterer);
}
try {
var is = indexSearcher.getIndexSearcher();
is.setTimeout(new QueryTimeoutImpl(queryParams.timeout().toMillis()));
var count = is.count(queryParams.query());
return new LuceneSearchResult(TotalHitsCount.of(count, true), Stream.empty());
return new LuceneSearchResult(TotalHitsCount.of(count, true), List.of());
} catch (IOException e) {
throw new DBException(e);
}

View File

@ -1,6 +1,9 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import java.util.function.Function;
import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;
public interface LocalSearcher {
@ -10,11 +13,13 @@ public interface LocalSearcher {
* @param queryParams the query parameters
* @param keyFieldName the name of the key field
* @param transformer the search query transformer
* @param filterer the search result filterer
*/
LuceneSearchResult collect(LLIndexSearcher indexSearcher,
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer);
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer);
/**
* Get the name of this searcher type

View File

@ -4,19 +4,23 @@ import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.DiscardingCloseable;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.utils.SimpleResource;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class LuceneSearchResult extends SimpleResource implements DiscardingCloseable {
public class LuceneSearchResult {
public static final TotalHitsCount EMPTY_COUNT = new TotalHitsCount(0, true);
public static final LuceneSearchResult EMPTY = new LuceneSearchResult(EMPTY_COUNT, List.of());
private static final Logger logger = LogManager.getLogger(LuceneSearchResult.class);
private final TotalHitsCount totalHitsCount;
private final Stream<LLKeyScore> results;
private final List<LLKeyScore> results;
public LuceneSearchResult(TotalHitsCount totalHitsCount, Stream<LLKeyScore> results) {
public LuceneSearchResult(TotalHitsCount totalHitsCount, List<LLKeyScore> results) {
this.totalHitsCount = totalHitsCount;
this.results = results;
}
@ -25,7 +29,7 @@ public class LuceneSearchResult extends SimpleResource implements DiscardingClos
return totalHitsCount;
}
public Stream<LLKeyScore> results() {
public List<LLKeyScore> results() {
return results;
}
@ -49,8 +53,4 @@ public class LuceneSearchResult extends SimpleResource implements DiscardingClos
return "LuceneSearchResult[" + "totalHitsCount=" + totalHitsCount + ", " + "results=" + results + ']';
}
@Override
protected void onClose() {
results.close();
}
}

View File

@ -1,36 +1,43 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import java.io.IOException;
import java.util.function.Function;
import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;
public interface MultiSearcher extends LocalSearcher {
/**
* @param indexSearchersMono Lucene index searcher
* @param indexSearchers Lucene index searcher
* @param queryParams the query parameters
* @param keyFieldName the name of the key field
* @param transformer the search query transformer
* @param filterer the search result filterer
*/
LuceneSearchResult collectMulti(LLIndexSearchers indexSearchersMono,
LuceneSearchResult collectMulti(LLIndexSearchers indexSearchers,
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer);
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer);
/**
* @param indexSearcher Lucene index searcher
* @param queryParams the query parameters
* @param keyFieldName the name of the key field
* @param transformer the search query transformer
* @param filterer the search result filterer
*/
@Override
default LuceneSearchResult collect(LLIndexSearcher indexSearcher,
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
LLIndexSearchers searchers = LLIndexSearchers.unsharded(indexSearcher);
return this.collectMulti(searchers, queryParams, keyFieldName, transformer);
return this.collectMulti(searchers, queryParams, keyFieldName, transformer, filterer);
}
}

View File

@ -2,21 +2,21 @@ package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
import static it.cavallium.dbengine.utils.StreamUtils.fastListing;
import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneCloseable;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.TopDocsCollectorMultiManager;
import it.cavallium.dbengine.utils.DBException;
import it.cavallium.dbengine.utils.StreamUtils;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -35,9 +35,10 @@ public class PagedLocalSearcher implements LocalSearcher {
public LuceneSearchResult collect(LLIndexSearcher indexSearcher,
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
if (transformer != GlobalQueryRewrite.NO_REWRITE) {
return LuceneUtils.rewrite(this, indexSearcher, queryParams, keyFieldName, transformer);
return LuceneUtils.rewrite(this, indexSearcher, queryParams, keyFieldName, transformer, filterer);
}
PaginationInfo paginationInfo = getPaginationInfo(queryParams);
@ -51,12 +52,7 @@ public class PagedLocalSearcher implements LocalSearcher {
keyFieldName,
queryParams
);
return this.computeOtherResults(firstResult,
indexSearchers.shards(),
queryParams,
keyFieldName,
() -> indexSearchers.close()
);
return this.computeOtherResults(firstResult, indexSearchers.shards(), queryParams, keyFieldName, filterer);
}
@Override
@ -113,7 +109,7 @@ public class PagedLocalSearcher implements LocalSearcher {
List<IndexSearcher> indexSearchers,
LocalQueryParams queryParams,
String keyFieldName,
Runnable onClose) {
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
var totalHitsCount = firstResult.totalHitsCount();
var firstPageHitsStream = firstResult.firstPageHitsStream();
var secondPageInfo = firstResult.nextPageInfo();
@ -121,7 +117,7 @@ public class PagedLocalSearcher implements LocalSearcher {
Stream<LLKeyScore> nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo);
Stream<LLKeyScore> combinedFlux = Stream.concat(firstPageHitsStream, nextHitsFlux);
return new MyLuceneSearchResult(totalHitsCount, combinedFlux, onClose);
return new LuceneSearchResult(totalHitsCount, StreamUtils.collect(filterer.apply(combinedFlux), fastListing()));
}
/**
@ -193,24 +189,4 @@ public class PagedLocalSearcher implements LocalSearcher {
return new PageIterationStepResult(EMPTY_STATUS, null);
}
}
private static class MyLuceneSearchResult extends LuceneSearchResult implements LuceneCloseable {
private final Runnable onClose;
public MyLuceneSearchResult(TotalHitsCount totalHitsCount, Stream<LLKeyScore> combinedStream, Runnable onClose) {
super(totalHitsCount, combinedStream);
this.onClose = onClose;
}
@Override
protected void onClose() {
try {
onClose.run();
} catch (Throwable ex) {
LOG.error("Failed to close the search result", ex);
}
super.onClose();
}
}
}

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_SCHEDULER;
import static it.cavallium.dbengine.utils.StreamUtils.fastListing;
import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull;
import static it.cavallium.dbengine.utils.StreamUtils.toListOn;
@ -15,12 +16,14 @@ import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.PageLimits;
import it.cavallium.dbengine.lucene.collector.ScoringShardsCollectorMultiManager;
import it.cavallium.dbengine.utils.DBException;
import it.cavallium.dbengine.utils.StreamUtils;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -42,9 +45,10 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
public LuceneSearchResult collectMulti(LLIndexSearchers indexSearchers,
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
if (transformer != GlobalQueryRewrite.NO_REWRITE) {
return LuceneUtils.rewriteMulti(this, indexSearchers, queryParams, keyFieldName, transformer);
return LuceneUtils.rewriteMulti(this, indexSearchers, queryParams, keyFieldName, transformer, filterer);
}
PaginationInfo paginationInfo = getPaginationInfo(queryParams);
@ -57,7 +61,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
indexSearchers.shards(),
queryParams,
keyFieldName,
() -> indexSearchers.close()
filterer
);
}
@ -119,7 +123,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
List<IndexSearcher> indexSearchers,
LocalQueryParams queryParams,
String keyFieldName,
Runnable onClose) {
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
var totalHitsCount = firstResult.totalHitsCount();
var firstPageHitsStream = firstResult.firstPageHitsStream();
var secondPageInfo = firstResult.nextPageInfo();
@ -127,7 +131,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
Stream<LLKeyScore> nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo);
Stream<LLKeyScore> combinedStream = Stream.concat(firstPageHitsStream, nextHitsFlux);
return new MyLuceneSearchResult(totalHitsCount, combinedStream, onClose);
return new LuceneSearchResult(totalHitsCount, StreamUtils.collect(filterer.apply(combinedStream), fastListing()));
}
/**
@ -214,24 +218,4 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
return "scored paged multi";
}
private static class MyLuceneSearchResult extends LuceneSearchResult implements LuceneCloseable {
private final Runnable onClose;
public MyLuceneSearchResult(TotalHitsCount totalHitsCount, Stream<LLKeyScore> combinedFlux, Runnable onClose) {
super(totalHitsCount, combinedFlux);
this.onClose = onClose;
}
@Override
protected void onClose() {
try {
onClose.run();
} catch (Throwable ex) {
LOG.error("Failed to close the search result", ex);
}
super.onClose();
}
}
}

View File

@ -1,17 +1,19 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.utils.StreamUtils.toList;
import static java.util.Objects.requireNonNull;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneCloseable;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.utils.DBException;
import it.cavallium.dbengine.utils.StreamUtils;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -36,14 +38,15 @@ public class StandardSearcher implements MultiSearcher {
public LuceneSearchResult collectMulti(LLIndexSearchers indexSearchers,
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
if (transformer != GlobalQueryRewrite.NO_REWRITE) {
return LuceneUtils.rewriteMulti(this, indexSearchers, queryParams, keyFieldName, transformer);
return LuceneUtils.rewriteMulti(this, indexSearchers, queryParams, keyFieldName, transformer, filterer);
}
// Search results
var fullDocs = this.search(indexSearchers.shards(), queryParams);
// Compute the results
return this.computeResults(fullDocs, indexSearchers, keyFieldName, queryParams);
return this.computeResults(fullDocs, indexSearchers, keyFieldName, queryParams, filterer);
}
/**
@ -55,10 +58,12 @@ public class StandardSearcher implements MultiSearcher {
CollectorManager<? extends TopDocsCollector<?>, ? extends TopDocs> sharedManager;
if (queryParams.isSorted() && !queryParams.isSortedByScore()) {
sharedManager = TopFieldCollector.createSharedManager(queryParams.sort(),
queryParams.limitInt(), null, totalHitsThreshold);
queryParams.limitInt(), null, totalHitsThreshold
);
} else {
sharedManager = TopScoreDocCollector.createSharedManager(queryParams.limitInt(), null, totalHitsThreshold);
};
}
;
var collectors = indexSearchers.stream().map(shard -> {
try {
TopDocsCollector<?> collector;
@ -112,43 +117,20 @@ public class StandardSearcher implements MultiSearcher {
private LuceneSearchResult computeResults(TopDocs data,
LLIndexSearchers indexSearchers,
String keyFieldName,
LocalQueryParams queryParams) {
LocalQueryParams queryParams,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
var totalHitsCount = LuceneUtils.convertTotalHitsCount(data.totalHits);
Stream<LLKeyScore> hitsStream = LuceneUtils
.convertHits(Stream.of(data.scoreDocs),
indexSearchers.shards(), keyFieldName
)
.convertHits(Stream.of(data.scoreDocs), indexSearchers.shards(), keyFieldName)
.skip(queryParams.offsetLong())
.limit(queryParams.limitLong());
return new MyLuceneSearchResult(totalHitsCount, hitsStream, indexSearchers);
return new LuceneSearchResult(totalHitsCount, toList(filterer.apply(hitsStream)));
}
@Override
public String getName() {
return "standard";
}
private static class MyLuceneSearchResult extends LuceneSearchResult implements LuceneCloseable {
private final LLIndexSearchers indexSearchers;
public MyLuceneSearchResult(TotalHitsCount totalHitsCount,
Stream<LLKeyScore> hitsStream,
LLIndexSearchers indexSearchers) {
super(totalHitsCount, hitsStream);
this.indexSearchers = indexSearchers;
}
@Override
protected void onClose() {
try {
indexSearchers.close();
} catch (Throwable e) {
LOG.error("Can't close index searchers", e);
}
super.onClose();
}
}
}

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import static com.google.common.collect.Streams.mapWithIndex;
import static it.cavallium.dbengine.utils.StreamUtils.toList;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
@ -26,9 +27,10 @@ public class UnsortedStreamingMultiSearcher implements MultiSearcher {
public LuceneSearchResult collectMulti(LLIndexSearchers indexSearchers,
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
if (transformer != GlobalQueryRewrite.NO_REWRITE) {
return LuceneUtils.rewriteMulti(this, indexSearchers, queryParams, keyFieldName, transformer);
return LuceneUtils.rewriteMulti(this, indexSearchers, queryParams, keyFieldName, transformer, filterer);
}
if (queryParams.isSorted() && queryParams.limitLong() > 0) {
throw new UnsupportedOperationException("Sorted queries are not supported" + " by UnsortedContinuousLuceneMultiSearcher");
@ -44,7 +46,7 @@ public class UnsortedStreamingMultiSearcher implements MultiSearcher {
var totalHitsCount = new TotalHitsCount(0, false);
Stream<LLKeyScore> mergedFluxes = resultsFlux.skip(queryParams.offsetLong()).limit(queryParams.limitLong());
return new MyLuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers);
return new LuceneSearchResult(totalHitsCount, toList(filterer.apply(mergedFluxes)));
}
private Stream<ScoreDoc> getScoreDocs(LocalQueryParams localQueryParams, List<IndexSearcher> shards) {
@ -68,26 +70,4 @@ public class UnsortedStreamingMultiSearcher implements MultiSearcher {
public String getName() {
return "unsorted streaming multi";
}
private static class MyLuceneSearchResult extends LuceneSearchResult implements LuceneCloseable {
private final LLIndexSearchers indexSearchers;
public MyLuceneSearchResult(TotalHitsCount totalHitsCount,
Stream<LLKeyScore> hitsFlux,
LLIndexSearchers indexSearchers) {
super(totalHitsCount, hitsFlux);
this.indexSearchers = indexSearchers;
}
@Override
protected void onClose() {
try {
indexSearchers.close();
} catch (Throwable e) {
LOG.error("Can't close index searchers", e);
}
super.onClose();
}
}
}

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.tests;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElseGet;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite;
@ -13,6 +14,8 @@ import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;
public class SwappableLuceneSearcher implements LocalSearcher, MultiSearcher, Closeable {
@ -28,13 +31,14 @@ public class SwappableLuceneSearcher implements LocalSearcher, MultiSearcher, Cl
public LuceneSearchResult collect(LLIndexSearcher indexSearcher,
LocalQueryParams queryParams,
@Nullable String keyFieldName,
GlobalQueryRewrite transformer) {
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
var single = this.single.get();
if (single == null) {
single = this.multi.get();
}
requireNonNull(single, "LuceneLocalSearcher not set");
return single.collect(indexSearcher, queryParams, keyFieldName, transformer);
return single.collect(indexSearcher, queryParams, keyFieldName, transformer, filterer);
}
@Override
@ -55,10 +59,11 @@ public class SwappableLuceneSearcher implements LocalSearcher, MultiSearcher, Cl
@Override
public LuceneSearchResult collectMulti(LLIndexSearchers indexSearchers,
LocalQueryParams queryParams,
String keyFieldName,
GlobalQueryRewrite transformer) {
@Nullable String keyFieldName,
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
var multi = requireNonNull(this.multi.get(), "LuceneMultiSearcher not set");
return multi.collectMulti(indexSearchers, queryParams, keyFieldName, transformer);
return multi.collectMulti(indexSearchers, queryParams, keyFieldName, transformer, filterer);
}
public void setSingle(LocalSearcher single) {

View File

@ -8,6 +8,7 @@ import static it.cavallium.dbengine.tests.DbTestUtils.runVoid;
import static it.cavallium.dbengine.tests.DbTestUtils.tempDatabaseMapDictionaryDeepMap;
import static it.cavallium.dbengine.tests.DbTestUtils.tempDb;
import static it.cavallium.dbengine.tests.DbTestUtils.tempDictionary;
import static it.cavallium.dbengine.utils.StreamUtils.toList;
import com.google.common.collect.Streams;
import it.cavallium.dbengine.database.UpdateMode;
@ -805,11 +806,11 @@ public abstract class TestDictionaryMapDeep {
var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6);
map.putMulti(entries.entrySet().stream());
return map.getAllStages(null, false).map(stage -> {
return toList(map.getAllStages(null, false).map(stage -> {
var v = stage.getValue().get(null);
if (v == null) return null;
return Map.entry(stage.getKey(), v);
}).filter(Objects::nonNull).toList();
}).filter(Objects::nonNull));
}));
if (shouldFail) {
this.checkLeaks = false;

View File

@ -7,6 +7,7 @@ import static it.cavallium.dbengine.tests.DbTestUtils.run;
import static it.cavallium.dbengine.tests.DbTestUtils.tempDatabaseMapDictionaryDeepMapHashMap;
import static it.cavallium.dbengine.tests.DbTestUtils.tempDb;
import static it.cavallium.dbengine.tests.DbTestUtils.tempDictionary;
import static it.cavallium.dbengine.utils.StreamUtils.toList;
import com.google.common.collect.Streams;
import it.cavallium.dbengine.database.UpdateMode;
@ -103,12 +104,11 @@ public abstract class TestDictionaryMapDeepHashMap {
var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
var map = tempDatabaseMapDictionaryDeepMapHashMap(tempDictionary(db, updateMode), 5);
map.at(null, key1).putValue(key2, value);
return map
return toList(map
.getAllValues(null, false)
.map(Entry::getValue)
.flatMap(maps -> maps.entrySet().stream())
.map(Entry::getValue)
.toList();
.map(Entry::getValue));
}));
if (shouldFail) {
this.checkLeaks = false;

View File

@ -237,9 +237,9 @@ public class TestLuceneSearches {
ExpectedQueryType expectedQueryType) throws Throwable {
runSearchers(expectedQueryType, searcher -> {
var luceneIndex = getLuceneIndex(expectedQueryType.shard(), searcher);
var query = queryParamsBuilder.build();
try (var results = luceneIndex.search(query)) {
try (var luceneIndex1 = getLuceneIndex(expectedQueryType.shard(), searcher)) {
var query = queryParamsBuilder.build();
var results = luceneIndex1.search(query);
var hits = results.totalHitsCount();
var keys = getResults(results);
if (hits.exact()) {
@ -249,9 +249,9 @@ public class TestLuceneSearches {
}
var standardSearcher = new StandardSearcher();
luceneIndex = getLuceneIndex(expectedQueryType.shard(), standardSearcher);
var officialQuery = queryParamsBuilder.limit(ELEMENTS.size() * 2L).build();
try (var officialResults = luceneIndex.search(officialQuery)) {
try (var luceneIndex2 = getLuceneIndex(expectedQueryType.shard(), standardSearcher)) {
var officialQuery = queryParamsBuilder.limit(ELEMENTS.size() * 2L).build();
var officialResults = luceneIndex2.search(officialQuery);
var officialHits = officialResults.totalHitsCount();
var officialKeys = getResults(officialResults);
if (officialHits.exact()) {
@ -343,10 +343,7 @@ public class TestLuceneSearches {
}
private List<Scored> getResults(Hits<HitKey<String>> results) {
return results
.results()
.map(key -> new Scored(key.key(), key.score()))
.toList();
return results.results().stream().map(key -> new Scored(key.key(), key.score())).toList();
}
}

View File

@ -17,9 +17,11 @@ import it.cavallium.dbengine.utils.SimpleResource;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher {
@ -34,10 +36,11 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher {
@Override
public LuceneSearchResult collectMulti(LLIndexSearchers indexSearchers,
LocalQueryParams queryParams,
String keyFieldName,
GlobalQueryRewrite transformer) {
@Nullable String keyFieldName,
GlobalQueryRewrite transformer,
Function<Stream<LLKeyScore>, Stream<LLKeyScore>> filterer) {
if (transformer != NO_REWRITE) {
return LuceneUtils.rewriteMulti(this, indexSearchers, queryParams, keyFieldName, transformer);
return LuceneUtils.rewriteMulti(this, indexSearchers, queryParams, keyFieldName, transformer, filterer);
}
if (queryParams.isSorted() && queryParams.limitLong() > 0) {
throw new UnsupportedOperationException(
@ -50,15 +53,13 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher {
var localQueryParams = getLocalQueryParams(queryParams);
var results = indexSearchers.llShards().stream()
.map(searcher -> localSearcher.collect(searcher, localQueryParams, keyFieldName, transformer))
.map(searcher -> localSearcher.collect(searcher, localQueryParams, keyFieldName, transformer, filterer))
.toList();
List<LuceneSearchResult> resultsToDrop = new ArrayList<>(results.size());
List<Stream<LLKeyScore>> resultsFluxes = new ArrayList<>(results.size());
boolean exactTotalHitsCount = true;
long totalHitsCountValue = 0;
for (LuceneSearchResult result : results) {
resultsToDrop.add(result);
resultsFluxes.add(result.results());
resultsFluxes.add(result.results().stream());
exactTotalHitsCount &= result.totalHitsCount().exact();
totalHitsCountValue += result.totalHitsCount().value();
}
@ -69,7 +70,7 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher {
.skip(queryParams.offsetLong())
.limit(queryParams.limitLong());
return new MyLuceneSearchResult(totalHitsCount, mergedFluxes, resultsToDrop, indexSearchers);
return new LuceneSearchResult(totalHitsCount, mergedFluxes.toList());
}
private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) {
@ -87,30 +88,4 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher {
public String getName() {
return "unsorted unscored simple multi";
}
private static class MyLuceneSearchResult extends LuceneSearchResult implements LuceneCloseable {
private final List<LuceneSearchResult> resultsToDrop;
private final LLIndexSearchers indexSearchers;
public MyLuceneSearchResult(TotalHitsCount totalHitsCount,
Stream<LLKeyScore> mergedFluxes,
List<LuceneSearchResult> resultsToDrop,
LLIndexSearchers indexSearchers) {
super(totalHitsCount, mergedFluxes);
this.resultsToDrop = resultsToDrop;
this.indexSearchers = indexSearchers;
}
@Override
protected void onClose() {
resultsToDrop.forEach(SimpleResource::close);
try {
indexSearchers.close();
} catch (UncheckedIOException e) {
LOG.error("Can't close index searchers", e);
}
super.onClose();
}
}
}