diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
index 4f0e281..cbc0ff1 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
@@ -2,13 +2,10 @@ package it.cavallium.dbengine.database;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.jetbrains.annotations.Nullable;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
public interface LLLuceneIndex extends Closeable, LLSnapshottable {
@@ -31,20 +28,12 @@ public interface LLLuceneIndex extends Closeable, LLSnapshottable {
* @param limit the limit is valid for each lucene instance.
* If you have 15 instances, the number of elements returned
* can be at most limit * 15
+ * @return the collection has one or more flux
*/
- Collection search(@Nullable LLSnapshot snapshot, String query, int limit, @Nullable LLSort sort, String keyFieldName)
- throws IOException;
-
- /**
- *
- * @param limit the limit is valid for each lucene instance.
- * If you have 15 instances, the number of elements returned
- * can be at most limit * 15
- */
- Collection moreLikeThis(@Nullable LLSnapshot snapshot,
+ Mono moreLikeThis(@Nullable LLSnapshot snapshot,
Map> mltDocumentFields,
int limit,
- String keyFieldName) throws IOException;
+ String keyFieldName);
/**
*
@@ -53,10 +42,11 @@ public interface LLLuceneIndex extends Closeable, LLSnapshottable {
* can be at most limit * 15
* @return the collection has one or more flux
*/
- Tuple2, Collection>> searchStream(@Nullable LLSnapshot snapshot,
+ Mono search(@Nullable LLSnapshot snapshot,
String query,
int limit,
@Nullable LLSort sort,
+ LLScoreMode scoreMode,
String keyFieldName);
long count(@Nullable LLSnapshot snapshot, String query) throws IOException;
diff --git a/src/main/java/it/cavallium/dbengine/database/LLScoreMode.java b/src/main/java/it/cavallium/dbengine/database/LLScoreMode.java
new file mode 100644
index 0000000..906f51c
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLScoreMode.java
@@ -0,0 +1,7 @@
+package it.cavallium.dbengine.database;
+
+public enum LLScoreMode {
+ COMPLETE,
+ TOP_SCORES,
+ COMPLETE_NO_SCORES
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLSearchResult.java b/src/main/java/it/cavallium/dbengine/database/LLSearchResult.java
new file mode 100644
index 0000000..12a6f33
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLSearchResult.java
@@ -0,0 +1,71 @@
+package it.cavallium.dbengine.database;
+
+import java.util.Objects;
+import java.util.function.BiFunction;
+import org.jetbrains.annotations.NotNull;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class LLSearchResult {
+
+ private final Mono totalHitsCount;
+ private final Flux> results;
+
+ public LLSearchResult(Mono totalHitsCount, Flux> results) {
+ this.totalHitsCount = totalHitsCount;
+ this.results = results;
+ }
+
+ public static LLSearchResult empty() {
+ return new LLSearchResult(Mono.just(0L), Flux.just(Flux.empty()));
+ }
+
+ @NotNull
+ public static BiFunction accumulator() {
+ return (a, b) -> {
+ var mergedTotals = a.totalHitsCount.flatMap(aL -> b.totalHitsCount.map(bL -> aL + bL));
+ var mergedResults = Flux.merge(a.results, b.results);
+ return new LLSearchResult(mergedTotals, mergedResults);
+ };
+ }
+
+ public Mono totalHitsCount() {
+ return this.totalHitsCount;
+ }
+
+ public Flux> results() {
+ return this.results;
+ }
+
+ public boolean equals(final Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof LLSearchResult)) {
+ return false;
+ }
+ final LLSearchResult other = (LLSearchResult) o;
+ final Object this$totalHitsCount = this.totalHitsCount();
+ final Object other$totalHitsCount = other.totalHitsCount();
+ if (!Objects.equals(this$totalHitsCount, other$totalHitsCount)) {
+ return false;
+ }
+ final Object this$results = this.results();
+ final Object other$results = other.results();
+ return Objects.equals(this$results, other$results);
+ }
+
+ public int hashCode() {
+ final int PRIME = 59;
+ int result = 1;
+ final Object $totalHitsCount = this.totalHitsCount();
+ result = result * PRIME + ($totalHitsCount == null ? 43 : $totalHitsCount.hashCode());
+ final Object $results = this.results();
+ result = result * PRIME + ($results == null ? 43 : $results.hashCode());
+ return result;
+ }
+
+ public String toString() {
+ return "LLSearchResult(totalHitsCount=" + this.totalHitsCount() + ", results=" + this.results() + ")";
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
index 787b519..1b9582c 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
@@ -4,6 +4,8 @@ import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.protobuf.ByteString;
import it.cavallium.dbengine.database.utils.RandomSortField;
+import it.cavallium.dbengine.proto.LLKeyScore;
+import it.cavallium.dbengine.proto.LLType;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -19,13 +21,13 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
+import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.jetbrains.annotations.Nullable;
-import it.cavallium.dbengine.proto.LLKeyScore;
-import it.cavallium.dbengine.proto.LLType;
+@SuppressWarnings("unused")
public class LLUtils {
private static final byte[] RESPONSE_TRUE = new byte[]{1};
@@ -52,6 +54,15 @@ public class LLUtils {
return null;
}
+ public static ScoreMode toScoreMode(LLScoreMode scoreMode) {
+ switch (scoreMode) {
+ case COMPLETE: return ScoreMode.COMPLETE;
+ case TOP_SCORES: return ScoreMode.TOP_SCORES;
+ case COMPLETE_NO_SCORES: return ScoreMode.COMPLETE_NO_SCORES;
+ default: throw new IllegalStateException("Unexpected value: " + scoreMode);
+ }
+ }
+
public static Term toTerm(LLTerm term) {
return new Term(term.getKey(), term.getValue());
}
@@ -176,6 +187,7 @@ public class LLUtils {
return termItemsList.stream().map(LLUtils::toLocal).collect(Collectors.toList());
}
+ @SuppressWarnings("ConstantConditions")
private static LLItem toLocal(it.cavallium.dbengine.proto.LLItem item) {
var data2 = item.getData2() != null ? item.getData2().toByteArray() : null;
return new LLItem(it.cavallium.dbengine.database.LLType.valueOf(item.getType().toString()),
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
index d592746..256089c 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
@@ -3,58 +3,57 @@ package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex;
+import it.cavallium.dbengine.database.LLScoreMode;
+import it.cavallium.dbengine.database.LLSearchResult;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLSort;
import it.cavallium.dbengine.database.LLTerm;
-import it.cavallium.dbengine.database.LLTopKeys;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.LuceneUtils;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.database.luceneutil.AdaptiveStreamSearcher;
import it.cavallium.dbengine.database.luceneutil.LuceneStreamSearcher;
+import it.cavallium.dbengine.database.luceneutil.PagedStreamSearcher;
import it.cavallium.luceneserializer.luceneserializer.ParseException;
import it.cavallium.luceneserializer.luceneserializer.QueryParser;
-import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.queries.mlt.MoreLikeThis;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.Sort;
-import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.concurrency.executor.ScheduledTaskLifecycle;
import org.warp.commonutils.functional.IOFunction;
import org.warp.commonutils.type.ShortNamedThreadFactory;
-import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.EmissionException;
+import reactor.core.publisher.Sinks.EmitResult;
+import reactor.core.publisher.Sinks.Many;
+import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class LLLocalLuceneIndex implements LLLuceneIndex {
@@ -221,147 +220,134 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
indexWriter.commit();
}
- @Override
- public Collection search(@Nullable LLSnapshot snapshot, String queryString, int limit, @Nullable LLSort sort,
- String keyFieldName)
- throws IOException {
- try {
- var luceneIndexSnapshot = resolveSnapshot(snapshot);
-
- Query query = QueryParser.parse(queryString);
- Sort luceneSort = LLUtils.toSort(sort);
-
- return Collections.singleton(runSearch(luceneIndexSnapshot, (indexSearcher) -> {
- return blockingSearch(indexSearcher, limit, query, luceneSort, keyFieldName);
- }));
- } catch (ParseException e) {
- throw new IOException("Error during query count!", e);
- }
- }
-
- @Override
- public Collection moreLikeThis(@Nullable LLSnapshot snapshot, Map> mltDocumentFields, int limit,
- String keyFieldName)
- throws IOException {
- var luceneIndexSnapshot = resolveSnapshot(snapshot);
-
- if (mltDocumentFields.isEmpty()) {
- return Collections.singleton(new LLTopKeys(0, new LLKeyScore[0]));
- }
-
- return Collections.singleton(runSearch(luceneIndexSnapshot, (indexSearcher) -> {
-
- var mlt = new MoreLikeThis(indexSearcher.getIndexReader());
- mlt.setAnalyzer(indexWriter.getAnalyzer());
- mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
- mlt.setMinTermFreq(1);
- //mlt.setMinDocFreq(1);
- mlt.setBoost(true);
-
- // Get the reference doc and apply it to MoreLikeThis, to generate the query
- @SuppressWarnings({"unchecked", "rawtypes"})
- Query query = mlt.like((Map) mltDocumentFields);
-
- // Search
- return blockingSearch(indexSearcher, limit, query, null, keyFieldName);
- }));
- }
-
- private static LLTopKeys blockingSearch(IndexSearcher indexSearcher,
- int limit,
- Query query,
- Sort luceneSort,
- String keyFieldName) throws IOException {
- TopDocs results;
- List keyScores;
-
- results = luceneSort != null ? indexSearcher.search(query, limit, luceneSort)
- : indexSearcher.search(query, limit);
- var hits = ObjectArrayList.wrap(results.scoreDocs);
- keyScores = new LinkedList<>();
- for (ScoreDoc hit : hits) {
- int docId = hit.doc;
- float score = hit.score;
- Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
- if (d.getFields().isEmpty()) {
- System.err.println("The document docId:" + docId + ",score:" + score + " is empty.");
- var realFields = indexSearcher.doc(docId).getFields();
- if (!realFields.isEmpty()) {
- System.err.println("Present fields:");
- for (IndexableField field : realFields) {
- System.err.println(" - " + field.name());
- }
- }
+ private Mono acquireSearcherWrapper(LLSnapshot snapshot) {
+ return Mono.fromCallable(() -> {
+ if (snapshot == null) {
+ return searcherManager.acquire();
} else {
- var field = d.getField(keyFieldName);
- if (field == null) {
- System.err.println("Can't get key of document docId:" + docId + ",score:" + score);
- } else {
- keyScores.add(new LLKeyScore(field.stringValue(), score));
+ return resolveSnapshot(snapshot).getIndexSearcher();
+ }
+ }).subscribeOn(Schedulers.boundedElastic());
+ }
+
+ private Mono releaseSearcherWrapper(LLSnapshot snapshot, IndexSearcher indexSearcher) {
+ return Mono.fromRunnable(() -> {
+ if (snapshot == null) {
+ try {
+ searcherManager.release(indexSearcher);
+ } catch (IOException e) {
+ e.printStackTrace();
}
}
- }
- return new LLTopKeys(results.totalHits.value, keyScores.toArray(new LLKeyScore[0]));
+ }).subscribeOn(Schedulers.boundedElastic());
}
- @SuppressWarnings("UnnecessaryLocalVariable")
+ @SuppressWarnings({"Convert2MethodRef", "unchecked", "rawtypes"})
@Override
- public Tuple2, Collection>> searchStream(@Nullable LLSnapshot snapshot, String queryString, int limit,
- @Nullable LLSort sort, String keyFieldName) {
- try {
- Query query = QueryParser.parse(queryString);
- Sort luceneSort = LLUtils.toSort(sort);
-
- var acquireSearcherWrappedBlocking = Mono
- .fromCallable(() -> {
- if (snapshot == null) {
- return searcherManager.acquire();
- } else {
- return resolveSnapshot(snapshot).getIndexSearcher();
- }
- })
- .subscribeOn(Schedulers.boundedElastic());
-
- EmitterProcessor countProcessor = EmitterProcessor.create();
- EmitterProcessor resultsProcessor = EmitterProcessor.create();
-
- var publisher = acquireSearcherWrappedBlocking.flatMapMany(indexSearcher -> {
- return Flux.