Working indicization
This commit is contained in:
parent
df3c12f776
commit
5c98465637
@ -0,0 +1,96 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import it.cavallium.dbengine.database.LLDocument;
|
||||
import it.cavallium.dbengine.database.LLItem;
|
||||
import it.cavallium.dbengine.database.LLLuceneIndex;
|
||||
import it.cavallium.dbengine.database.LLScoreMode;
|
||||
import it.cavallium.dbengine.database.LLTerm;
|
||||
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
|
||||
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
|
||||
import it.cavallium.dbengine.lucene.serializer.TermQuery;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.Comparator;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import org.apache.lucene.document.Field.Store;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
public class IndicizationExample {
|
||||
|
||||
public static void main(String[] args) {
|
||||
tempIndex(true)
|
||||
.flatMap(index -> index
|
||||
.addDocument(new LLTerm("id", "123"),
|
||||
new LLDocument(new LLItem[]{
|
||||
LLItem.newStringField("id", "123", Store.YES),
|
||||
LLItem.newStringField("name", "Mario", Store.NO),
|
||||
LLItem.newStringField("surname", "Rossi", Store.NO)
|
||||
})
|
||||
)
|
||||
.then(index.refresh())
|
||||
.then(index.search(null, new TermQuery("name", "Mario"), 1, null, LLScoreMode.COMPLETE_NO_SCORES, "id"))
|
||||
.flatMap(results -> results
|
||||
.results()
|
||||
.flatMap(r -> r)
|
||||
.doOnNext(value -> System.out.println("Value: " + value))
|
||||
.then(results.totalHitsCount())
|
||||
)
|
||||
.doOnNext(count -> System.out.println("Total hits: " + count))
|
||||
.doOnTerminate(() -> System.out.println("Completed"))
|
||||
.then(index.close())
|
||||
)
|
||||
.subscribeOn(Schedulers.parallel())
|
||||
.block();
|
||||
}
|
||||
|
||||
public static final class CurrentCustomType {
|
||||
|
||||
private final int number;
|
||||
|
||||
public CurrentCustomType(int number) {
|
||||
this.number = number;
|
||||
}
|
||||
|
||||
public int getNumber() {
|
||||
return number;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ", CurrentCustomType.class.getSimpleName() + "[", "]")
|
||||
.add("number=" + number)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private static <U> Mono<? extends LLLuceneIndex> tempIndex(boolean delete) {
|
||||
var wrkspcPath = Path.of("/tmp/tempdb/");
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
if (delete && Files.exists(wrkspcPath)) {
|
||||
Files.walk(wrkspcPath).sorted(Comparator.reverseOrder()).forEach(file -> {
|
||||
try {
|
||||
Files.delete(file);
|
||||
} catch (IOException ex) {
|
||||
throw new CompletionException(ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
Files.createDirectories(wrkspcPath);
|
||||
return null;
|
||||
})
|
||||
.subscribeOn(Schedulers.boundedElastic())
|
||||
.then(new LLLocalDatabaseConnection(wrkspcPath, true).connect())
|
||||
.flatMap(conn -> conn.getLuceneIndex("testindices",
|
||||
3,
|
||||
TextFieldsAnalyzer.PartialWords,
|
||||
Duration.ofSeconds(5),
|
||||
Duration.ofSeconds(5),
|
||||
false
|
||||
));
|
||||
}
|
||||
}
|
@ -1,5 +1,6 @@
|
||||
package it.cavallium.dbengine.database;
|
||||
|
||||
import it.cavallium.dbengine.lucene.serializer.Query;
|
||||
import java.util.Set;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
@ -43,14 +44,14 @@ public interface LLLuceneIndex extends LLSnapshottable {
|
||||
* @return the collection has one or more flux
|
||||
*/
|
||||
Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot,
|
||||
String query,
|
||||
Query query,
|
||||
int limit,
|
||||
@Nullable LLSort sort,
|
||||
LLScoreMode scoreMode,
|
||||
String keyFieldName);
|
||||
|
||||
default Mono<Long> count(@Nullable LLSnapshot snapshot, String queryString) {
|
||||
return this.search(snapshot, queryString, 0, null, null, null)
|
||||
default Mono<Long> count(@Nullable LLSnapshot snapshot, Query query) {
|
||||
return this.search(snapshot, query, 0, null, null, null)
|
||||
.flatMap(LLSearchResult::totalHitsCount)
|
||||
.single();
|
||||
}
|
||||
@ -58,4 +59,14 @@ public interface LLLuceneIndex extends LLSnapshottable {
|
||||
boolean isLowMemoryMode();
|
||||
|
||||
Mono<Void> close();
|
||||
|
||||
/**
|
||||
* Flush writes to disk
|
||||
*/
|
||||
Mono<Void> flush();
|
||||
|
||||
/**
|
||||
* Refresh index searcher
|
||||
*/
|
||||
Mono<Void> refresh();
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import it.cavallium.dbengine.database.LLTerm;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.LuceneUtils;
|
||||
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
|
||||
import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle;
|
||||
import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.PagedStreamSearcher;
|
||||
@ -25,8 +26,6 @@ import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
@ -43,8 +42,6 @@ import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.warp.commonutils.concurrency.executor.ScheduledTaskLifecycle;
|
||||
import org.warp.commonutils.type.ShortNamedThreadFactory;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.GroupedFlux;
|
||||
import reactor.core.publisher.Mono;
|
||||
@ -66,9 +63,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
* There is only a single thread globally to not overwhelm the disk with
|
||||
* parallel commits or parallel refreshes.
|
||||
*/
|
||||
private static final ScheduledExecutorService scheduler
|
||||
= Executors.newSingleThreadScheduledExecutor(new ShortNamedThreadFactory("Lucene"));
|
||||
private static final Scheduler luceneScheduler = Schedulers.fromExecutorService(scheduler);
|
||||
private static final Scheduler luceneScheduler = Schedulers.newBoundedElastic(1,
|
||||
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
|
||||
"Lucene",
|
||||
120,
|
||||
true
|
||||
);
|
||||
|
||||
private final String luceneIndexName;
|
||||
private final SnapshotDeletionPolicy snapshotter;
|
||||
@ -124,7 +124,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
}
|
||||
|
||||
private void registerScheduledFixedTask(Runnable task, Duration duration) {
|
||||
scheduledTasksLifecycle.registerScheduledTask(scheduler.scheduleAtFixedRate(() -> {
|
||||
scheduledTasksLifecycle.registerScheduledTask(luceneScheduler.schedulePeriodically(() -> {
|
||||
scheduledTasksLifecycle.startScheduledTask();
|
||||
try {
|
||||
task.run();
|
||||
@ -358,21 +358,21 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
|
||||
@SuppressWarnings("Convert2MethodRef")
|
||||
@Override
|
||||
public Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot, String queryString, int limit,
|
||||
public Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot, it.cavallium.dbengine.lucene.serializer.Query query, int limit,
|
||||
@Nullable LLSort sort, LLScoreMode scoreMode, String keyFieldName) {
|
||||
|
||||
return acquireSearcherWrapper(snapshot)
|
||||
.flatMap(indexSearcher -> Mono
|
||||
.fromCallable(() -> {
|
||||
Query query = QueryParser.parse(queryString);
|
||||
Query luceneQuery = QueryParser.parse(query);
|
||||
Sort luceneSort = LLUtils.toSort(sort);
|
||||
org.apache.lucene.search.ScoreMode luceneScoreMode = LLUtils.toScoreMode(scoreMode);
|
||||
return Tuples.of(query, Optional.ofNullable(luceneSort), luceneScoreMode);
|
||||
return Tuples.of(luceneQuery, Optional.ofNullable(luceneSort), luceneScoreMode);
|
||||
})
|
||||
.subscribeOn(luceneScheduler)
|
||||
.flatMap(tuple -> Mono
|
||||
.fromCallable(() -> {
|
||||
Query query = tuple.getT1();
|
||||
Query luceneQuery = tuple.getT1();
|
||||
Sort luceneSort = tuple.getT2().orElse(null);
|
||||
ScoreMode luceneScoreMode = tuple.getT3();
|
||||
|
||||
@ -385,7 +385,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
luceneScheduler.schedule(() -> {
|
||||
try {
|
||||
streamSearcher.search(indexSearcher,
|
||||
query,
|
||||
luceneQuery,
|
||||
limit,
|
||||
luceneSort,
|
||||
luceneScoreMode,
|
||||
@ -430,6 +430,37 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
.subscribeOn(luceneScheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> flush() {
|
||||
return Mono
|
||||
.<Void>fromCallable(() -> {
|
||||
scheduledTasksLifecycle.startScheduledTask();
|
||||
try {
|
||||
indexWriter.commit();
|
||||
indexWriter.flush();
|
||||
} finally {
|
||||
scheduledTasksLifecycle.endScheduledTask();
|
||||
}
|
||||
return null;
|
||||
})
|
||||
.subscribeOn(luceneScheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> refresh() {
|
||||
return Mono
|
||||
.<Void>fromCallable(() -> {
|
||||
scheduledTasksLifecycle.startScheduledTask();
|
||||
try {
|
||||
searcherManager.maybeRefreshBlocking();
|
||||
} finally {
|
||||
scheduledTasksLifecycle.endScheduledTask();
|
||||
}
|
||||
return null;
|
||||
})
|
||||
.subscribeOn(luceneScheduler);
|
||||
}
|
||||
|
||||
private void scheduledCommit() {
|
||||
try {
|
||||
if (indexWriter.hasUncommittedChanges()) {
|
||||
|
@ -8,6 +8,7 @@ import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.LLSort;
|
||||
import it.cavallium.dbengine.database.LLTerm;
|
||||
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
|
||||
import it.cavallium.dbengine.lucene.serializer.Query;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
|
||||
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
|
||||
@ -17,6 +18,7 @@ import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
@ -152,6 +154,10 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<LLSnapshot> resolveSnapshotOptional(LLSnapshot multiSnapshot, int instanceId) {
|
||||
return Optional.ofNullable(resolveSnapshot(multiSnapshot, instanceId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot,
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFields,
|
||||
@ -161,15 +167,15 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
.fromArray(luceneIndices)
|
||||
.index()
|
||||
.flatMap(tuple -> Mono
|
||||
.fromCallable(() -> resolveSnapshot(snapshot, (int) (long) tuple.getT1()))
|
||||
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
|
||||
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot)))
|
||||
.flatMap(tuple -> tuple.getT1().moreLikeThis(tuple.getT2(), mltDocumentFields, limit, keyFieldName))
|
||||
.flatMap(tuple -> tuple.getT1().moreLikeThis(tuple.getT2().orElse(null), mltDocumentFields, limit, keyFieldName))
|
||||
.reduce(LLSearchResult.accumulator());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot,
|
||||
String query,
|
||||
Query query,
|
||||
int limit,
|
||||
@Nullable LLSort sort,
|
||||
LLScoreMode scoreMode,
|
||||
@ -178,9 +184,9 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
.fromArray(luceneIndices)
|
||||
.index()
|
||||
.flatMap(tuple -> Mono
|
||||
.fromCallable(() -> resolveSnapshot(snapshot, (int) (long) tuple.getT1()))
|
||||
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
|
||||
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot)))
|
||||
.flatMap(tuple -> tuple.getT1().search(tuple.getT2(), query, limit, sort, scoreMode, keyFieldName))
|
||||
.flatMap(tuple -> tuple.getT1().search(tuple.getT2().orElse(null), query, limit, sort, scoreMode, keyFieldName))
|
||||
.reduce(LLSearchResult.accumulator());
|
||||
}
|
||||
|
||||
@ -192,6 +198,22 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
.then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> flush() {
|
||||
return Flux
|
||||
.fromArray(luceneIndices)
|
||||
.flatMap(LLLocalLuceneIndex::flush)
|
||||
.then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> refresh() {
|
||||
return Flux
|
||||
.fromArray(luceneIndices)
|
||||
.flatMap(LLLocalLuceneIndex::refresh)
|
||||
.then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LLSnapshot> takeSnapshot() {
|
||||
return Mono
|
||||
|
@ -6,6 +6,7 @@ import it.cavallium.dbengine.database.LLSearchResult;
|
||||
import it.cavallium.dbengine.database.LLSort;
|
||||
import it.cavallium.dbengine.database.collections.Joiner;
|
||||
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
|
||||
import it.cavallium.dbengine.lucene.serializer.Query;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import reactor.core.publisher.Mono;
|
||||
@ -57,7 +58,7 @@ public class JoinedIndicizerWriter<KEY, DBTYPE, JOINEDTYPE> implements LuceneInd
|
||||
|
||||
@Override
|
||||
public Mono<LLSearchResult> search(@Nullable CompositeSnapshot snapshot,
|
||||
String query,
|
||||
Query query,
|
||||
int limit,
|
||||
@Nullable LLSort sort,
|
||||
LLScoreMode scoreMode) {
|
||||
@ -66,7 +67,7 @@ public class JoinedIndicizerWriter<KEY, DBTYPE, JOINEDTYPE> implements LuceneInd
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Long> count(@Nullable CompositeSnapshot snapshot, String query) {
|
||||
public Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query) {
|
||||
return this.indicizerWriter.count(snapshot, query);
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@ import it.cavallium.dbengine.database.DatabaseMemoryMode;
|
||||
import it.cavallium.dbengine.database.LLScoreMode;
|
||||
import it.cavallium.dbengine.database.LLSearchResult;
|
||||
import it.cavallium.dbengine.database.LLSort;
|
||||
import it.cavallium.dbengine.lucene.serializer.Query;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
@ -34,12 +35,12 @@ public interface LuceneIndicizerWriter<T, U> {
|
||||
int limit);
|
||||
|
||||
Mono<LLSearchResult> search(@Nullable CompositeSnapshot snapshot,
|
||||
String query,
|
||||
Query query,
|
||||
int limit,
|
||||
@Nullable LLSort sort,
|
||||
LLScoreMode scoreMode);
|
||||
|
||||
Mono<Long> count(@Nullable CompositeSnapshot snapshot, String query);
|
||||
Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query);
|
||||
|
||||
Mono<Void> close();
|
||||
|
||||
|
@ -7,6 +7,7 @@ import it.cavallium.dbengine.database.LLSearchResult;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.LLSort;
|
||||
import it.cavallium.dbengine.database.LLTerm;
|
||||
import it.cavallium.dbengine.lucene.serializer.Query;
|
||||
import java.util.Set;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
@ -64,7 +65,7 @@ public class StandardLuceneIndicizerWriter<T, U> implements LuceneIndicizerWrite
|
||||
|
||||
@Override
|
||||
public Mono<LLSearchResult> search(@Nullable CompositeSnapshot snapshot,
|
||||
String query,
|
||||
Query query,
|
||||
int limit,
|
||||
@Nullable LLSort sort,
|
||||
LLScoreMode scoreMode) {
|
||||
@ -72,7 +73,7 @@ public class StandardLuceneIndicizerWriter<T, U> implements LuceneIndicizerWrite
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Long> count(@Nullable CompositeSnapshot snapshot, String query) {
|
||||
public Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query) {
|
||||
return luceneIndex.count(resolveSnapshot(snapshot), query);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,61 @@
|
||||
package it.cavallium.dbengine.lucene;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.StampedLock;
|
||||
import org.warp.commonutils.concurrency.atomicity.Atomic;
|
||||
import reactor.core.Disposable;
|
||||
|
||||
@Atomic
|
||||
public class ScheduledTaskLifecycle {
|
||||
|
||||
private final StampedLock lock;
|
||||
private final ConcurrentHashMap<Disposable, Object> tasks = new ConcurrentHashMap<>();
|
||||
|
||||
public ScheduledTaskLifecycle() {
|
||||
this.lock = new StampedLock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a scheduled task
|
||||
*/
|
||||
public void registerScheduledTask(Disposable task) {
|
||||
this.tasks.put(task, new Object());
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark this task as running.
|
||||
* After calling this method, please call {@method endScheduledTask} inside a finally block!
|
||||
*/
|
||||
public void startScheduledTask() {
|
||||
this.lock.readLock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark this task as ended. Must be called after {@method startScheduledTask}
|
||||
*/
|
||||
public void endScheduledTask() {
|
||||
this.lock.tryUnlockRead();
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel all scheduled tasks and wait all running methods to finish
|
||||
*/
|
||||
public void cancelAndWait() {
|
||||
for (var task : tasks.keySet()) {
|
||||
task.dispose();
|
||||
}
|
||||
for (var task : tasks.keySet()) {
|
||||
while (!task.isDisposed()) {
|
||||
try {
|
||||
//noinspection BusyWait
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire a write lock to wait all tasks to end
|
||||
lock.unlockWrite(lock.writeLock());
|
||||
}
|
||||
}
|
@ -9,9 +9,13 @@ import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public interface LuceneStreamSearcher {
|
||||
|
||||
Logger logger = LoggerFactory.getLogger(LuceneStreamSearcher.class);
|
||||
|
||||
/**
|
||||
* Do a lucene query, receiving the single results using a consumer
|
||||
* @param indexSearcher the index searcher, which contains all the lucene data
|
||||
|
@ -79,18 +79,18 @@ public class PagedStreamSearcher implements LuceneStreamSearcher {
|
||||
if (currentAllowedResults.var-- > 0) {
|
||||
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
|
||||
if (d.getFields().isEmpty()) {
|
||||
System.err.println("The document docId:" + docId + ",score:" + score + " is empty.");
|
||||
logger.error("The document docId: {}, score: {} is empty.", docId, score);
|
||||
var realFields = indexSearcher.doc(docId).getFields();
|
||||
if (!realFields.isEmpty()) {
|
||||
System.err.println("Present fields:");
|
||||
logger.error("Present fields:");
|
||||
for (IndexableField field : realFields) {
|
||||
System.err.println(" - " + field.name());
|
||||
logger.error(" - {}", field.name());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
var field = d.getField(keyFieldName);
|
||||
if (field == null) {
|
||||
System.err.println("Can't get key of document docId:" + docId + ",score:" + score);
|
||||
logger.error("Can't get key of document docId: {}, score: {}", docId, score);
|
||||
} else {
|
||||
resultsConsumer.accept(new LLKeyScore(field.stringValue(), score));
|
||||
}
|
||||
|
@ -43,18 +43,18 @@ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher {
|
||||
try {
|
||||
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
|
||||
if (d.getFields().isEmpty()) {
|
||||
System.err.println("The document docId:" + docId + " is empty.");
|
||||
logger.error("The document docId: {} is empty.", docId);
|
||||
var realFields = indexSearcher.doc(docId).getFields();
|
||||
if (!realFields.isEmpty()) {
|
||||
System.err.println("Present fields:");
|
||||
logger.error("Present fields:");
|
||||
for (IndexableField field : realFields) {
|
||||
System.err.println(" - " + field.name());
|
||||
logger.error(" - " + field.name());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
var field = d.getField(keyFieldName);
|
||||
if (field == null) {
|
||||
System.err.println("Can't get key of document docId:" + docId);
|
||||
logger.error("Can't get key of document docId:" + docId);
|
||||
} else {
|
||||
resultsConsumer.accept(new LLKeyScore(field.stringValue(), score));
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ public class SimpleStreamSearcher implements LuceneStreamSearcher {
|
||||
float score = hit.score;
|
||||
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
|
||||
if (d.getFields().isEmpty()) {
|
||||
System.err.println("The document docId:" + docId + ",score:" + score + " is empty.");
|
||||
logger.error("The document docId: {}, score: {} is empty.", docId, score);
|
||||
var realFields = indexSearcher.doc(docId).getFields();
|
||||
if (!realFields.isEmpty()) {
|
||||
System.err.println("Present fields:");
|
||||
@ -49,7 +49,7 @@ public class SimpleStreamSearcher implements LuceneStreamSearcher {
|
||||
} else {
|
||||
var field = d.getField(keyFieldName);
|
||||
if (field == null) {
|
||||
System.err.println("Can't get key of document docId:" + docId + ",score:" + score);
|
||||
logger.error("Can't get key of document docId: {}, score: {}", docId, score);
|
||||
} else {
|
||||
resultsConsumer.accept(new LLKeyScore(field.stringValue(), score));
|
||||
}
|
||||
|
@ -1,5 +1,77 @@
|
||||
package it.cavallium.dbengine.lucene.serializer;
|
||||
|
||||
import static it.cavallium.dbengine.lucene.serializer.QueryParser.USE_PHRASE_QUERY;
|
||||
|
||||
import it.cavallium.dbengine.database.LuceneUtils;
|
||||
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.analysis.TokenStream;
|
||||
import org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute;
|
||||
import org.apache.lucene.analysis.tokenattributes.TermToBytesRefAttribute;
|
||||
import org.apache.lucene.index.Term;
|
||||
|
||||
public interface Query extends SerializedQueryObject {
|
||||
|
||||
static Query approximativeSearch(TextFieldsAnalyzer yotsubaAnalyzer, String field, String text) {
|
||||
try {
|
||||
var terms = getTerms(yotsubaAnalyzer, field, text);
|
||||
|
||||
List<BooleanQueryPart> booleanQueryParts = new LinkedList<>();
|
||||
for (TermPosition term : terms) {
|
||||
booleanQueryParts.add(new BooleanQueryPart(new TermQuery(term.getTerm()), Occur.MUST));
|
||||
booleanQueryParts.add(new BooleanQueryPart(new PhraseQuery(terms.toArray(TermPosition[]::new)), Occur.SHOULD));
|
||||
}
|
||||
return new BooleanQuery(booleanQueryParts);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return exactSearch(yotsubaAnalyzer, field, text);
|
||||
}
|
||||
}
|
||||
|
||||
static Query exactSearch(TextFieldsAnalyzer yotsubaAnalyzer, String field, String text) {
|
||||
try {
|
||||
var terms = getTerms(yotsubaAnalyzer, field, text);
|
||||
|
||||
if (USE_PHRASE_QUERY) {
|
||||
return new PhraseQuery(terms.toArray(TermPosition[]::new));
|
||||
} else {
|
||||
List<BooleanQueryPart> booleanQueryParts = new LinkedList<>();
|
||||
for (TermPosition term : terms) {
|
||||
booleanQueryParts.add(new BooleanQueryPart(new TermQuery(term.getTerm()), Occur.MUST));
|
||||
}
|
||||
booleanQueryParts.add(new BooleanQueryPart(new PhraseQuery(terms.toArray(TermPosition[]::new)), Occur.FILTER));
|
||||
return new BooleanQuery(booleanQueryParts);
|
||||
}
|
||||
} catch (IOException exception) {
|
||||
throw new RuntimeException(exception);
|
||||
}
|
||||
}
|
||||
|
||||
private static List<TermPosition> getTerms(TextFieldsAnalyzer yotsubaAnalyzer, String field, String text) throws IOException {
|
||||
Analyzer analyzer = LuceneUtils.getAnalyzer(yotsubaAnalyzer);
|
||||
TokenStream ts = analyzer.tokenStream(field, new StringReader(text));
|
||||
return getTerms(ts, field);
|
||||
}
|
||||
|
||||
private static List<TermPosition> getTerms(TokenStream ts, String field) throws IOException {
|
||||
TermToBytesRefAttribute charTermAttr = ts.addAttribute(TermToBytesRefAttribute.class);
|
||||
PositionIncrementAttribute positionIncrementTermAttr = ts.addAttribute(PositionIncrementAttribute.class);
|
||||
List<TermPosition> terms = new LinkedList<>();
|
||||
try (ts) {
|
||||
ts.reset(); // Resets this stream to the beginning. (Required)
|
||||
int termPosition = -1;
|
||||
while (ts.incrementToken()) {
|
||||
var tokenPositionIncrement = positionIncrementTermAttr.getPositionIncrement();
|
||||
termPosition += tokenPositionIncrement;
|
||||
terms.add(new TermPosition(new Term(field, charTermAttr.getBytesRef()), termPosition));
|
||||
}
|
||||
ts.end(); // Perform end-of-stream operations, e.g. set the final offset.
|
||||
}
|
||||
// Release resources associated with this stream.
|
||||
return terms;
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,8 @@ import org.apache.lucene.search.TermQuery;
|
||||
|
||||
public class QueryParser {
|
||||
|
||||
static final boolean USE_PHRASE_QUERY = true;
|
||||
|
||||
public static Query parse(String text) throws ParseException {
|
||||
try {
|
||||
var builtQuery = (Query) parse(text, new AtomicInteger(0));
|
||||
@ -28,6 +30,12 @@ public class QueryParser {
|
||||
}
|
||||
}
|
||||
|
||||
public static Query parse(it.cavallium.dbengine.lucene.serializer.Query query) throws ParseException {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
query.stringify(sb);
|
||||
return parse(sb.toString());
|
||||
}
|
||||
|
||||
private static Object parse(String completeText, AtomicInteger position) {
|
||||
String text = completeText.substring(position.get());
|
||||
if (text.length() <= 2) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user