Refactor results
This commit is contained in:
parent
d861e26a57
commit
37de47fd6c
2
.github/workflows/maven-publish.yml
vendored
2
.github/workflows/maven-publish.yml
vendored
@ -43,7 +43,7 @@ jobs:
|
||||
shell: bash
|
||||
run: |
|
||||
cd lucene
|
||||
./gradlew assemble -x test -x errorprone publishToMavenLocal
|
||||
./gradlew assemble -x test -x errorprone publishToMavenLocal -x signJarsPublication
|
||||
mvn deploy:deploy-file -DgroupId=org.lucene -DartifactId=lucene-core -Dversion=9.0.0-SNAPSHOT -DgeneratePom=true -Dpackaging=jar -DrepositoryId=mchv-release-distribution -Durl=https://mvn.mchv.eu/repository/mchv-snapshot -Dfile=lucene/core/build/libs/lucene-core-9.0.0-SNAPSHOT.jar -Djavadoc=lucene/core/build/libs/lucene-core-9.0.0-SNAPSHOT-javadoc.jar -Dsources=lucene/core/build/libs/lucene-core-9.0.0-SNAPSHOT-sources.jar
|
||||
mvn deploy:deploy-file -DgroupId=org.lucene -DartifactId=lucene-join -Dversion=9.0.0-SNAPSHOT -DgeneratePom=true -Dpackaging=jar -DrepositoryId=mchv-release-distribution -Durl=https://mvn.mchv.eu/repository/mchv-snapshot -Dfile=lucene/join/build/libs/lucene-join-9.0.0-SNAPSHOT.jar -Djavadoc=lucene/join/build/libs/lucene-join-9.0.0-SNAPSHOT-javadoc.jar -Dsources=lucene/join/build/libs/lucene-join-9.0.0-SNAPSHOT-sources.jar
|
||||
mvn deploy:deploy-file -DgroupId=org.lucene -DartifactId=lucene-analysis-common -Dversion=9.0.0-SNAPSHOT -DgeneratePom=true -Dpackaging=jar -DrepositoryId=mchv-release-distribution -Durl=https://mvn.mchv.eu/repository/mchv-snapshot -Dfile=lucene/analysis/common/build/libs/lucene-analysis-common-9.0.0-SNAPSHOT.jar -Djavadoc=lucene/analysis/common/build/libs/lucene-analysis-common-9.0.0-SNAPSHOT-javadoc.jar -Dsources=lucene/analysis/common/build/libs/lucene-analysis-common-9.0.0-SNAPSHOT-sources.jar
|
||||
|
2
pom.xml
2
pom.xml
@ -522,7 +522,7 @@
|
||||
<plugin>
|
||||
<groupId>it.cavallium</groupId>
|
||||
<artifactId>data-generator</artifactId>
|
||||
<version>0.9.69</version>
|
||||
<version>0.9.71</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>generate-lucene-query-sources</id>
|
||||
|
13
src/main/java/it/cavallium/dbengine/client/HitEntry.java
Normal file
13
src/main/java/it/cavallium/dbengine/client/HitEntry.java
Normal file
@ -0,0 +1,13 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public record HitEntry<T, U>(T key, U value, float score)
|
||||
implements Comparable<HitEntry<T, U>> {
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull HitEntry<T, U> o) {
|
||||
return Float.compare(o.score, this.score);
|
||||
}
|
||||
}
|
18
src/main/java/it/cavallium/dbengine/client/HitKey.java
Normal file
18
src/main/java/it/cavallium/dbengine/client/HitKey.java
Normal file
@ -0,0 +1,18 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public record HitKey<T>(T key, float score) implements Comparable<HitKey<T>> {
|
||||
|
||||
public <U> Mono<HitEntry<T, U>> withValue(Function<T, Mono<U>> valueGetter) {
|
||||
return valueGetter.apply(key).map(value -> new HitEntry<>(key, value, score));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull HitKey<T> o) {
|
||||
return Float.compare(o.score, this.score);
|
||||
}
|
||||
}
|
126
src/main/java/it/cavallium/dbengine/client/Hits.java
Normal file
126
src/main/java/it/cavallium/dbengine/client/Hits.java
Normal file
@ -0,0 +1,126 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import io.net5.buffer.api.Drop;
|
||||
import io.net5.buffer.api.Owned;
|
||||
import io.net5.buffer.api.Send;
|
||||
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
|
||||
import io.net5.buffer.api.internal.ResourceSupport;
|
||||
import it.cavallium.dbengine.database.collections.ValueGetter;
|
||||
import it.cavallium.dbengine.database.collections.ValueTransformer;
|
||||
import java.util.function.Function;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
public final class Hits<T> extends ResourceSupport<Hits<T>, Hits<T>> {
|
||||
|
||||
private static final Drop<Hits<?>> DROP = new Drop<>() {
|
||||
@Override
|
||||
public void drop(Hits<?> obj) {
|
||||
if (obj.onClose != null) {
|
||||
obj.onClose.run();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Drop<Hits<?>> fork() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void attach(Hits<?> obj) {
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
private Flux<T> results;
|
||||
private TotalHitsCount totalHitsCount;
|
||||
private Runnable onClose;
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public Hits(Flux<T> results, TotalHitsCount totalHitsCount, Runnable onClose) {
|
||||
super((Drop<Hits<T>>) (Drop) DROP);
|
||||
this.results = results;
|
||||
this.totalHitsCount = totalHitsCount;
|
||||
this.onClose = onClose;
|
||||
}
|
||||
|
||||
public static <T> Hits<T> empty() {
|
||||
return new Hits<>(Flux.empty(), TotalHitsCount.of(0, true), null);
|
||||
}
|
||||
|
||||
public static <K, V> Hits<LazyHitEntry<K, V>> withValuesLazy(Hits<LazyHitKey<K>> hits,
|
||||
ValueGetter<K, V> valuesGetter) {
|
||||
var hitsEntry = hits.results().map(hitKey -> hitKey.withValue(valuesGetter::get));
|
||||
|
||||
return new Hits<>(hitsEntry, hits.totalHitsCount, hits::close);
|
||||
}
|
||||
|
||||
public static <K, V> Hits<HitEntry<K, V>> withValues(Hits<HitKey<K>> hits, ValueGetter<K, V> valuesGetter) {
|
||||
var hitsEntry = hits.results().flatMap(hitKey -> hitKey.withValue(valuesGetter::get));
|
||||
|
||||
return new Hits<>(hitsEntry, hits.totalHitsCount, hits::close);
|
||||
}
|
||||
|
||||
public static <T, U> Function<Send<Hits<HitKey<T>>>, Send<Hits<LazyHitEntry<T, U>>>> generateMapper(
|
||||
ValueGetter<T, U> valueGetter) {
|
||||
return resultToReceive -> {
|
||||
var result = resultToReceive.receive();
|
||||
var hitsToTransform = result.results()
|
||||
.map(hit -> new LazyHitEntry<>(Mono.just(hit.key()), valueGetter.get(hit.key()), hit.score()));
|
||||
return new Hits<>(hitsToTransform, result.totalHitsCount(), result::close).send();
|
||||
};
|
||||
}
|
||||
|
||||
public static <T, U> Function<Send<Hits<HitKey<T>>>, Send<Hits<LazyHitEntry<T, U>>>> generateMapper(
|
||||
ValueTransformer<T, U> valueTransformer) {
|
||||
return resultToReceive -> {
|
||||
var result = resultToReceive.receive();
|
||||
var hitsToTransform = result.results().map(hit -> Tuples.of(hit.score(), hit.key()));
|
||||
var transformed = valueTransformer
|
||||
.transform(hitsToTransform)
|
||||
.filter(tuple3 -> tuple3.getT3().isPresent())
|
||||
.map(tuple3 -> new LazyHitEntry<>(Mono.just(tuple3.getT2()),
|
||||
Mono.just(tuple3.getT3().orElseThrow()),
|
||||
tuple3.getT1()
|
||||
));
|
||||
return new Hits<>(transformed, result.totalHitsCount(), result::close).send();
|
||||
};
|
||||
}
|
||||
|
||||
public Flux<T> results() {
|
||||
return results;
|
||||
}
|
||||
|
||||
public TotalHitsCount totalHitsCount() {
|
||||
return totalHitsCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Hits[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ']';
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RuntimeException createResourceClosedException() {
|
||||
return new IllegalStateException("Closed");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Owned<Hits<T>> prepareSend() {
|
||||
var results = this.results;
|
||||
var totalHitsCount = this.totalHitsCount;
|
||||
var onClose = this.onClose;
|
||||
return drop -> {
|
||||
var instance = new Hits<>(results, totalHitsCount, onClose);
|
||||
drop.attach(instance);
|
||||
return instance;
|
||||
};
|
||||
}
|
||||
|
||||
protected void makeInaccessible() {
|
||||
this.results = null;
|
||||
this.totalHitsCount = null;
|
||||
this.onClose = null;
|
||||
}
|
||||
}
|
11
src/main/java/it/cavallium/dbengine/client/LazyHitEntry.java
Normal file
11
src/main/java/it/cavallium/dbengine/client/LazyHitEntry.java
Normal file
@ -0,0 +1,11 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public record LazyHitEntry<T, U>(Mono<T> key, Mono<U> value, float score) {
|
||||
|
||||
public Mono<HitEntry<T, U>> resolve() {
|
||||
return Mono.zip(key, value, (k, v) -> new HitEntry<>(k, v, score));
|
||||
}
|
||||
}
|
19
src/main/java/it/cavallium/dbengine/client/LazyHitKey.java
Normal file
19
src/main/java/it/cavallium/dbengine/client/LazyHitKey.java
Normal file
@ -0,0 +1,19 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import java.util.function.Function;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public record LazyHitKey<T>(Mono<T> key, float score) {
|
||||
|
||||
public <U> LazyHitEntry<T, U> withValue(Function<T, Mono<U>> valueGetter) {
|
||||
return new LazyHitEntry<>(key, key.flatMap(valueGetter), score);
|
||||
}
|
||||
|
||||
public Mono<HitKey<T>> resolve() {
|
||||
return key.map(k -> new HitKey<>(k, score));
|
||||
}
|
||||
|
||||
public <U> Mono<HitEntry<T, U>> resolveWithValue(Function<T, Mono<U>> valueGetter) {
|
||||
return resolve().flatMap(key -> key.withValue(valueGetter));
|
||||
}
|
||||
}
|
@ -9,16 +9,11 @@ import it.cavallium.dbengine.database.LLSnapshottable;
|
||||
import it.cavallium.dbengine.database.collections.ValueGetter;
|
||||
import it.cavallium.dbengine.database.collections.ValueTransformer;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuple3;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public interface LuceneIndex<T, U> extends LLSnapshottable {
|
||||
|
||||
Mono<Void> addDocument(T key, U value);
|
||||
@ -53,32 +48,10 @@ public interface LuceneIndex<T, U> extends LLSnapshottable {
|
||||
|
||||
Mono<Void> deleteAll();
|
||||
|
||||
Mono<Send<SearchResultKeys<T>>> moreLikeThis(ClientQueryParams<SearchResultKey<T>> queryParams, T key, U mltDocumentValue);
|
||||
Mono<Send<Hits<HitKey<T>>>> moreLikeThis(ClientQueryParams queryParams, T key,
|
||||
U mltDocumentValue);
|
||||
|
||||
default Mono<Send<SearchResult<T, U>>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
|
||||
T key,
|
||||
U mltDocumentValue,
|
||||
ValueGetter<T, U> valueGetter) {
|
||||
return this.moreLikeThisWithTransformer(queryParams,
|
||||
key,
|
||||
mltDocumentValue,
|
||||
getValueGetterTransformer(valueGetter));
|
||||
}
|
||||
|
||||
Mono<Send<SearchResult<T, U>>> moreLikeThisWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
|
||||
T key,
|
||||
U mltDocumentValue,
|
||||
ValueTransformer<T, U> valueTransformer);
|
||||
|
||||
Mono<Send<SearchResultKeys<T>>> search(ClientQueryParams<SearchResultKey<T>> queryParams);
|
||||
|
||||
default Mono<Send<SearchResult<T, U>>> searchWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
|
||||
ValueGetter<T, U> valueGetter) {
|
||||
return this.searchWithTransformer(queryParams, getValueGetterTransformer(valueGetter));
|
||||
}
|
||||
|
||||
Mono<Send<SearchResult<T, U>>> searchWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
|
||||
ValueTransformer<T, U> valueTransformer);
|
||||
Mono<Send<Hits<HitKey<T>>>> search(ClientQueryParams queryParams);
|
||||
|
||||
Mono<TotalHitsCount> count(@Nullable CompositeSnapshot snapshot, Query query);
|
||||
|
||||
@ -89,17 +62,4 @@ public interface LuceneIndex<T, U> extends LLSnapshottable {
|
||||
Mono<Void> flush();
|
||||
|
||||
Mono<Void> refresh(boolean force);
|
||||
|
||||
private static <T, U> ValueTransformer<T, U> getValueGetterTransformer(ValueGetter<T, U> valueGetter) {
|
||||
return new ValueTransformer<T, U>() {
|
||||
@Override
|
||||
public <X> Flux<Tuple3<X, T, Optional<U>>> transform(Flux<Tuple2<X, T>> keys) {
|
||||
return keys.flatMapSequential(key -> valueGetter
|
||||
.get(key.getT2())
|
||||
.map(result -> Tuples.of(key.getT1(), key.getT2(), Optional.of(result)))
|
||||
.switchIfEmpty(Mono.fromSupplier(() -> Tuples.of(key.getT1(), key.getT2(), Optional.empty())))
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -19,7 +19,6 @@ import it.cavallium.dbengine.database.LLLuceneIndex;
|
||||
import it.cavallium.dbengine.database.LLSearchResultShard;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.LLTerm;
|
||||
import it.cavallium.dbengine.database.collections.ValueGetter;
|
||||
import it.cavallium.dbengine.database.collections.ValueTransformer;
|
||||
import java.lang.ref.Cleaner;
|
||||
import java.util.Map;
|
||||
@ -36,11 +35,9 @@ import reactor.core.publisher.Sinks;
|
||||
import reactor.core.publisher.Sinks.EmitResult;
|
||||
import reactor.core.publisher.Sinks.Empty;
|
||||
import reactor.core.publisher.Sinks.Many;
|
||||
import reactor.core.publisher.Sinks.One;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.util.concurrent.Queues;
|
||||
import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
|
||||
|
||||
@ -202,140 +199,47 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
|
||||
return Mono.create(sink -> emitActionOptimistically(new IndexAction.DeleteAll(sink)));
|
||||
}
|
||||
|
||||
private Mono<Send<SearchResultKeys<T>>> transformLuceneResultWithTransformer(
|
||||
Mono<Send<LLSearchResultShard>> llSearchResultMono) {
|
||||
return llSearchResultMono.map(llSearchResultToReceive -> {
|
||||
var llSearchResult = llSearchResultToReceive.receive();
|
||||
return new SearchResultKeys<>(llSearchResult.results()
|
||||
.map(signal -> new SearchResultKey<>(Mono
|
||||
.fromCallable(signal::key)
|
||||
.map(indicizer::getKey), signal.score())),
|
||||
llSearchResult.totalHitsCount(),
|
||||
llSearchResult::close
|
||||
).send();
|
||||
});
|
||||
}
|
||||
|
||||
private Mono<Send<SearchResult<T, U>>> transformLuceneResultWithValues(
|
||||
Mono<Send<LLSearchResultShard>> llSearchResultMono,
|
||||
ValueGetter<T, U> valueGetter) {
|
||||
return llSearchResultMono.map(llSearchResultToReceive -> {
|
||||
var llSearchResult = llSearchResultToReceive.receive();
|
||||
return new SearchResult<>(llSearchResult.results().map(signal -> {
|
||||
var key = Mono.fromCallable(signal::key).map(indicizer::getKey);
|
||||
return new SearchResultItem<>(key, key.flatMap(valueGetter::get), signal.score());
|
||||
}), llSearchResult.totalHitsCount(), llSearchResult::close).send();
|
||||
});
|
||||
}
|
||||
|
||||
private Mono<Send<SearchResult<T, U>>> transformLuceneResultWithTransformer(
|
||||
Mono<Send<LLSearchResultShard>> llSearchResultMono,
|
||||
ValueTransformer<T, U> valueTransformer) {
|
||||
return llSearchResultMono
|
||||
.map(llSearchResultToReceive -> {
|
||||
var llSearchResult = llSearchResultToReceive.receive();
|
||||
var scoresWithKeysFlux = llSearchResult
|
||||
.results()
|
||||
.flatMapSequential(signal -> Mono
|
||||
.fromCallable(signal::key)
|
||||
.map(indicizer::getKey)
|
||||
.map(key -> Tuples.of(signal.score(), key))
|
||||
);
|
||||
var resultItemsFlux = valueTransformer
|
||||
.transform(scoresWithKeysFlux)
|
||||
.filter(tuple3 -> tuple3.getT3().isPresent())
|
||||
.map(tuple3 -> new SearchResultItem<>(Mono.just(tuple3.getT2()),
|
||||
Mono.just(tuple3.getT3().orElseThrow()),
|
||||
tuple3.getT1()
|
||||
));
|
||||
return new SearchResult<>(resultItemsFlux, llSearchResult.totalHitsCount(), llSearchResult::close).send();
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Send<SearchResultKeys<T>>> moreLikeThis(ClientQueryParams<SearchResultKey<T>> queryParams,
|
||||
public Mono<Send<Hits<HitKey<T>>>> moreLikeThis(ClientQueryParams queryParams,
|
||||
T key,
|
||||
U mltDocumentValue) {
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFields
|
||||
= indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue);
|
||||
return luceneIndex
|
||||
.moreLikeThis(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName(), mltDocumentFields)
|
||||
.transform(this::transformLuceneResultWithTransformer);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Send<SearchResult<T, U>>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
|
||||
T key,
|
||||
U mltDocumentValue,
|
||||
ValueGetter<T, U> valueGetter) {
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFields
|
||||
= indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue);
|
||||
return luceneIndex
|
||||
.moreLikeThis(resolveSnapshot(queryParams.snapshot()),
|
||||
queryParams.toQueryParams(),
|
||||
indicizer.getKeyFieldName(),
|
||||
mltDocumentFields
|
||||
)
|
||||
.transform(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult,
|
||||
valueGetter));
|
||||
.map(this::mapResults)
|
||||
.single();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Send<SearchResult<T, U>>> moreLikeThisWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
|
||||
T key,
|
||||
U mltDocumentValue,
|
||||
ValueTransformer<T, U> valueTransformer) {
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFields
|
||||
= indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue);
|
||||
return luceneIndex
|
||||
.moreLikeThis(resolveSnapshot(queryParams.snapshot()),
|
||||
queryParams.toQueryParams(),
|
||||
indicizer.getKeyFieldName(),
|
||||
mltDocumentFields
|
||||
)
|
||||
.transform(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult,
|
||||
valueTransformer));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Send<SearchResultKeys<T>>> search(ClientQueryParams<SearchResultKey<T>> queryParams) {
|
||||
public Mono<Send<Hits<HitKey<T>>>> search(ClientQueryParams queryParams) {
|
||||
return luceneIndex
|
||||
.search(resolveSnapshot(queryParams.snapshot()),
|
||||
queryParams.toQueryParams(),
|
||||
indicizer.getKeyFieldName()
|
||||
)
|
||||
.single()
|
||||
.transform(this::transformLuceneResultWithTransformer)
|
||||
.map(this::mapResults)
|
||||
.single();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Send<SearchResult<T, U>>> searchWithValues(
|
||||
ClientQueryParams<SearchResultItem<T, U>> queryParams,
|
||||
ValueGetter<T, U> valueGetter) {
|
||||
return luceneIndex
|
||||
.search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(),
|
||||
indicizer.getKeyFieldName())
|
||||
.transform(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult,
|
||||
valueGetter));
|
||||
}
|
||||
private Send<Hits<HitKey<T>>> mapResults(Send<LLSearchResultShard> llSearchResultToReceive) {
|
||||
var llSearchResult = llSearchResultToReceive.receive();
|
||||
var scoresWithKeysFlux = llSearchResult
|
||||
.results()
|
||||
.map(hit -> new HitKey<>(indicizer.getKey(hit.key()), hit.score()));
|
||||
|
||||
@Override
|
||||
public Mono<Send<SearchResult<T, U>>> searchWithTransformer(
|
||||
ClientQueryParams<SearchResultItem<T, U>> queryParams,
|
||||
ValueTransformer<T, U> valueTransformer) {
|
||||
return luceneIndex
|
||||
.search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(),
|
||||
indicizer.getKeyFieldName())
|
||||
.transform(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult,
|
||||
valueTransformer));
|
||||
return new Hits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), llSearchResult::close).send();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<TotalHitsCount> count(@Nullable CompositeSnapshot snapshot, Query query) {
|
||||
return this
|
||||
.search(ClientQueryParams.<SearchResultKey<T>>builder().snapshot(snapshot).query(query).limit(0).build())
|
||||
.search(ClientQueryParams.<LazyHitKey<T>>builder().snapshot(snapshot).query(query).limit(0).build())
|
||||
.single()
|
||||
.map(searchResultKeysSend -> {
|
||||
try (var searchResultKeys = searchResultKeysSend.receive()) {
|
||||
|
@ -1,81 +0,0 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import io.net5.buffer.api.Drop;
|
||||
import io.net5.buffer.api.Owned;
|
||||
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
|
||||
import io.net5.buffer.api.internal.ResourceSupport;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public final class SearchResult<T, U> extends ResourceSupport<SearchResult<T, U>, SearchResult<T, U>> {
|
||||
|
||||
private static final Drop<SearchResult<?, ?>> DROP = new Drop<>() {
|
||||
@Override
|
||||
public void drop(SearchResult<?, ?> obj) {
|
||||
if (obj.onClose != null) {
|
||||
obj.onClose.run();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Drop<SearchResult<?, ?>> fork() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void attach(SearchResult<?, ?> obj) {
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
private Flux<SearchResultItem<T, U>> results;
|
||||
private TotalHitsCount totalHitsCount;
|
||||
private Runnable onClose;
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public SearchResult(Flux<SearchResultItem<T, U>> results, TotalHitsCount totalHitsCount, Runnable onClose) {
|
||||
super((Drop<SearchResult<T,U>>) (Drop) DROP);
|
||||
this.results = results;
|
||||
this.totalHitsCount = totalHitsCount;
|
||||
this.onClose = onClose;
|
||||
}
|
||||
|
||||
public static <T, U> SearchResult<T, U> empty() {
|
||||
return new SearchResult<T, U>(Flux.empty(), TotalHitsCount.of(0, true), null);
|
||||
}
|
||||
|
||||
public Flux<SearchResultItem<T, U>> results() {
|
||||
return results;
|
||||
}
|
||||
|
||||
public TotalHitsCount totalHitsCount() {
|
||||
return totalHitsCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SearchResult[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ']';
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RuntimeException createResourceClosedException() {
|
||||
return new IllegalStateException("Closed");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Owned<SearchResult<T, U>> prepareSend() {
|
||||
var results = this.results;
|
||||
var totalHitsCount = this.totalHitsCount;
|
||||
var onClose = this.onClose;
|
||||
return drop -> {
|
||||
var instance = new SearchResult<>(results, totalHitsCount, onClose);
|
||||
drop.attach(instance);
|
||||
return instance;
|
||||
};
|
||||
}
|
||||
|
||||
protected void makeInaccessible() {
|
||||
this.results = null;
|
||||
this.totalHitsCount = null;
|
||||
this.onClose = null;
|
||||
}
|
||||
}
|
@ -1,13 +0,0 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public record SearchResultItem<T, U>(Mono<T> key, Mono<U> value, float score)
|
||||
implements Comparable<SearchResultItem<T, U>> {
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull SearchResultItem<T, U> o) {
|
||||
return Float.compare(o.score, this.score);
|
||||
}
|
||||
}
|
@ -1,7 +0,0 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.StringJoiner;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public record SearchResultKey<T>(Mono<T> key, float score) {}
|
@ -1,96 +0,0 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import io.net5.buffer.api.Drop;
|
||||
import io.net5.buffer.api.Owned;
|
||||
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
|
||||
import io.net5.buffer.api.internal.ResourceSupport;
|
||||
import it.cavallium.dbengine.database.collections.ValueGetter;
|
||||
import org.warp.commonutils.log.Logger;
|
||||
import org.warp.commonutils.log.LoggerFactory;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public final class SearchResultKeys<T> extends ResourceSupport<SearchResultKeys<T>, SearchResultKeys<T>> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(SearchResultKeys.class);
|
||||
|
||||
private static final Drop<SearchResultKeys<?>> DROP = new Drop<>() {
|
||||
@Override
|
||||
public void drop(SearchResultKeys<?> obj) {
|
||||
if (obj.onClose != null) {
|
||||
obj.onClose.run();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Drop<SearchResultKeys<?>> fork() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void attach(SearchResultKeys<?> obj) {
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
private Flux<SearchResultKey<T>> results;
|
||||
private TotalHitsCount totalHitsCount;
|
||||
private Runnable onClose;
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public SearchResultKeys(Flux<SearchResultKey<T>> results, TotalHitsCount totalHitsCount, Runnable onClose) {
|
||||
super((Drop<SearchResultKeys<T>>) (Drop) DROP);
|
||||
this.results = results;
|
||||
this.totalHitsCount = totalHitsCount;
|
||||
this.onClose = onClose;
|
||||
}
|
||||
|
||||
public static <T> SearchResultKeys<T> empty() {
|
||||
return new SearchResultKeys<>(Flux.empty(), TotalHitsCount.of(0, true), null);
|
||||
}
|
||||
|
||||
public <U> SearchResult<T, U> withValues(ValueGetter<T, U> valuesGetter) {
|
||||
return new SearchResult<>(results.map(item -> new SearchResultItem<>(item.key(),
|
||||
item.key().flatMap(valuesGetter::get),
|
||||
item.score()
|
||||
)), totalHitsCount, this::close);
|
||||
}
|
||||
|
||||
public Flux<SearchResultKey<T>> results() {
|
||||
return results;
|
||||
}
|
||||
|
||||
public TotalHitsCount totalHitsCount() {
|
||||
return totalHitsCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SearchResultKeys[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ']';
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RuntimeException createResourceClosedException() {
|
||||
return new IllegalStateException("Closed");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Owned<SearchResultKeys<T>> prepareSend() {
|
||||
var results = this.results;
|
||||
var totalHitsCount = this.totalHitsCount;
|
||||
var onClose = this.onClose;
|
||||
makeInaccessible();
|
||||
return drop -> {
|
||||
var instance = new SearchResultKeys<>(results, totalHitsCount, onClose);
|
||||
drop.attach(instance);
|
||||
return instance;
|
||||
};
|
||||
}
|
||||
|
||||
protected void makeInaccessible() {
|
||||
this.results = null;
|
||||
this.totalHitsCount = null;
|
||||
this.onClose = null;
|
||||
}
|
||||
|
||||
}
|
@ -12,7 +12,7 @@ import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
@RecordBuilder
|
||||
public final record ClientQueryParams<T>(@Nullable CompositeSnapshot snapshot,
|
||||
public final record ClientQueryParams(@Nullable CompositeSnapshot snapshot,
|
||||
@NotNull Query query,
|
||||
long offset,
|
||||
long limit,
|
||||
@ -20,9 +20,9 @@ public final record ClientQueryParams<T>(@Nullable CompositeSnapshot snapshot,
|
||||
@Nullable Sort sort,
|
||||
boolean complete) {
|
||||
|
||||
public static <T> ClientQueryParamsBuilder<T> builder() {
|
||||
public static ClientQueryParamsBuilder builder() {
|
||||
return ClientQueryParamsBuilder
|
||||
.<T>builder()
|
||||
.builder()
|
||||
.snapshot(null)
|
||||
.offset(0)
|
||||
.limit(Long.MAX_VALUE)
|
||||
|
@ -7,7 +7,6 @@ import io.net5.buffer.api.Send;
|
||||
import io.net5.buffer.api.internal.ResourceSupport;
|
||||
import it.cavallium.dbengine.client.BadBlock;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.client.SearchResultKeys;
|
||||
import it.cavallium.dbengine.database.Delta;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.LLDictionaryResultType;
|
||||
|
@ -4,22 +4,14 @@ import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
|
||||
import static it.cavallium.dbengine.database.disk.LLLocalDictionary.getRocksIterator;
|
||||
|
||||
import io.net5.buffer.api.Buffer;
|
||||
import io.net5.buffer.api.BufferAllocator;
|
||||
import io.net5.buffer.api.Drop;
|
||||
import io.net5.buffer.api.Owned;
|
||||
import io.net5.buffer.api.Resource;
|
||||
import io.net5.buffer.api.Send;
|
||||
import io.net5.buffer.api.internal.ResourceSupport;
|
||||
import io.net5.util.IllegalReferenceCountException;
|
||||
import it.cavallium.dbengine.client.SearchResult;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.netty.NullableBuffer;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.rocksdb.ColumnFamilyHandle;
|
||||
import org.rocksdb.ReadOptions;
|
||||
import org.rocksdb.RocksDB;
|
||||
import org.rocksdb.RocksDBException;
|
||||
import org.warp.commonutils.log.Logger;
|
||||
import org.warp.commonutils.log.LoggerFactory;
|
||||
|
@ -10,10 +10,11 @@ import static org.junit.jupiter.api.Assertions.fail;
|
||||
import io.net5.buffer.PooledByteBufAllocator;
|
||||
import it.cavallium.dbengine.DbTestUtils.TempDb;
|
||||
import it.cavallium.dbengine.DbTestUtils.TestAllocator;
|
||||
import it.cavallium.dbengine.client.HitKey;
|
||||
import it.cavallium.dbengine.client.Hits;
|
||||
import it.cavallium.dbengine.client.LuceneIndex;
|
||||
import it.cavallium.dbengine.client.Sort;
|
||||
import it.cavallium.dbengine.client.SearchResultKey;
|
||||
import it.cavallium.dbengine.client.SearchResultKeys;
|
||||
import it.cavallium.dbengine.client.LazyHitKey;
|
||||
import it.cavallium.dbengine.client.query.ClientQueryParams;
|
||||
import it.cavallium.dbengine.client.query.ClientQueryParamsBuilder;
|
||||
import it.cavallium.dbengine.client.query.current.data.BooleanQuery;
|
||||
@ -237,7 +238,7 @@ public class TestLuceneSearches {
|
||||
}
|
||||
|
||||
private boolean supportsPreciseHitsCount(LocalSearcher searcher,
|
||||
ClientQueryParams<SearchResultKey<String>> query) {
|
||||
ClientQueryParams query) {
|
||||
var sorted = query.isSorted();
|
||||
if (searcher instanceof UnsortedUnscoredStreamingMultiSearcher) {
|
||||
return false;
|
||||
@ -248,7 +249,7 @@ public class TestLuceneSearches {
|
||||
}
|
||||
}
|
||||
|
||||
public void testSearch(ClientQueryParamsBuilder<SearchResultKey<String>> queryParamsBuilder,
|
||||
public void testSearch(ClientQueryParamsBuilder queryParamsBuilder,
|
||||
ExpectedQueryType expectedQueryType) throws Throwable {
|
||||
|
||||
runSearchers(expectedQueryType, searcher -> {
|
||||
@ -291,7 +292,7 @@ public class TestLuceneSearches {
|
||||
@MethodSource("provideQueryArgumentsScoreModeAndSort")
|
||||
public void testSearchNoDocs(boolean shards, Sort multiSort) throws Throwable {
|
||||
var queryBuilder = ClientQueryParams
|
||||
.<SearchResultKey<String>>builder()
|
||||
.<LazyHitKey<String>>builder()
|
||||
.query(new MatchNoDocsQuery())
|
||||
.snapshot(null)
|
||||
.complete(true)
|
||||
@ -305,7 +306,7 @@ public class TestLuceneSearches {
|
||||
@MethodSource("provideQueryArgumentsScoreModeAndSort")
|
||||
public void testSearchAllDocs(boolean shards, Sort multiSort) throws Throwable {
|
||||
var queryBuilder = ClientQueryParams
|
||||
.<SearchResultKey<String>>builder()
|
||||
.<LazyHitKey<String>>builder()
|
||||
.query(new MatchAllDocsQuery())
|
||||
.snapshot(null)
|
||||
.complete(true)
|
||||
@ -319,7 +320,7 @@ public class TestLuceneSearches {
|
||||
@MethodSource("provideQueryArgumentsScoreModeAndSort")
|
||||
public void testSearchAdvancedText(boolean shards, Sort multiSort) throws Throwable {
|
||||
var queryBuilder = ClientQueryParams
|
||||
.<SearchResultKey<String>>builder()
|
||||
.<LazyHitKey<String>>builder()
|
||||
.query(new BooleanQuery(List.of(
|
||||
new BooleanQueryPart(new BoostQuery(new TermQuery(new Term("text", "hello")), 3), new OccurShould()),
|
||||
new BooleanQueryPart(new TermQuery(new Term("text", "world")), new OccurShould()),
|
||||
@ -357,14 +358,10 @@ public class TestLuceneSearches {
|
||||
assertEquals(new TotalHitsCount(expectedCount, true), hits);
|
||||
}
|
||||
|
||||
private List<Scored> getResults(SearchResultKeys<String> results) {
|
||||
private List<Scored> getResults(Hits<HitKey<String>> results) {
|
||||
return run(results
|
||||
.results()
|
||||
.flatMapSequential(searchResultKey -> searchResultKey
|
||||
.key()
|
||||
.single()
|
||||
.map(key -> new Scored(key, searchResultKey.score()))
|
||||
)
|
||||
.map(key -> new Scored(key.key(), key.score()))
|
||||
.collectList());
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user