Allow lucene partial document updates

This commit is contained in:
Andrea Cavalli 2021-11-07 17:46:40 +01:00
parent d8801b3471
commit b34b620082
12 changed files with 78 additions and 133 deletions

View File

@ -11,24 +11,20 @@ import it.cavallium.dbengine.client.IndexAction.ReleaseSnapshot;
import it.cavallium.dbengine.client.IndexAction.Flush;
import it.cavallium.dbengine.client.IndexAction.Refresh;
import it.cavallium.dbengine.client.IndexAction.Close;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import reactor.core.publisher.Flux;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.publisher.Sinks.One;
sealed interface IndexAction permits Add, AddMulti, Update, UpdateMulti, Delete, DeleteAll, TakeSnapshot,
ReleaseSnapshot, Flush, Refresh, Close {
IndexActionType getType();
final record Add(LLTerm key, LLDocument doc, MonoSink<Void> addedFuture) implements IndexAction {
final record Add(LLTerm key, LLUpdateDocument doc, MonoSink<Void> addedFuture) implements IndexAction {
@Override
public IndexActionType getType() {
@ -36,7 +32,7 @@ sealed interface IndexAction permits Add, AddMulti, Update, UpdateMulti, Delete,
}
}
final record AddMulti(Flux<Entry<LLTerm, LLDocument>> docsFlux, MonoSink<Void> addedMultiFuture) implements IndexAction {
final record AddMulti(Flux<Entry<LLTerm, LLUpdateDocument>> docsFlux, MonoSink<Void> addedMultiFuture) implements IndexAction {
@Override
public IndexActionType getType() {
@ -44,7 +40,7 @@ sealed interface IndexAction permits Add, AddMulti, Update, UpdateMulti, Delete,
}
}
final record Update(LLTerm key, LLDocument doc, MonoSink<Void> updatedFuture) implements IndexAction {
final record Update(LLTerm key, LLUpdateDocument doc, MonoSink<Void> updatedFuture) implements IndexAction {
@Override
public IndexActionType getType() {
@ -52,7 +48,7 @@ sealed interface IndexAction permits Add, AddMulti, Update, UpdateMulti, Delete,
}
}
final record UpdateMulti(Map<LLTerm, LLDocument> docs, MonoSink<Void> updatedMultiFuture) implements IndexAction {
final record UpdateMulti(Map<LLTerm, LLUpdateDocument> docs, MonoSink<Void> updatedMultiFuture) implements IndexAction {
@Override
public IndexActionType getType() {

View File

@ -1,11 +1,9 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLIndexRequest;
import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -13,7 +11,10 @@ import reactor.util.function.Tuple2;
public abstract class Indicizer<T, U> {
public abstract @NotNull Mono<LLDocument> toDocument(@NotNull T key, @NotNull U value);
/**
* Transform a value to an IndexRequest.
*/
public abstract @NotNull Mono<? extends LLIndexRequest> toIndexRequest(@NotNull T key, @NotNull U value);
public abstract @NotNull LLTerm toIndex(@NotNull T key);

View File

@ -1,18 +1,6 @@
package it.cavallium.dbengine.client;
import com.google.common.util.concurrent.Uninterruptibles;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.IndexAction.Add;
import it.cavallium.dbengine.client.IndexAction.AddMulti;
import it.cavallium.dbengine.client.IndexAction.Close;
import it.cavallium.dbengine.client.IndexAction.Delete;
import it.cavallium.dbengine.client.IndexAction.DeleteAll;
import it.cavallium.dbengine.client.IndexAction.Flush;
import it.cavallium.dbengine.client.IndexAction.Refresh;
import it.cavallium.dbengine.client.IndexAction.ReleaseSnapshot;
import it.cavallium.dbengine.client.IndexAction.TakeSnapshot;
import it.cavallium.dbengine.client.IndexAction.Update;
import it.cavallium.dbengine.client.IndexAction.UpdateMulti;
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
@ -20,28 +8,15 @@ import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.collections.ValueTransformer;
import java.lang.ref.Cleaner;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Level;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import org.warp.commonutils.type.ShortNamedThreadFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
import reactor.util.function.Tuple2;
public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
@ -66,7 +41,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
@Override
public Mono<Void> addDocument(T key, U value) {
return indicizer
.toDocument(key, value)
.toIndexRequest(key, value)
.flatMap(doc -> luceneIndex.addDocument(indicizer.toIndex(key), doc));
}
@ -75,7 +50,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
return luceneIndex
.addDocuments(entries
.flatMap(entry -> indicizer
.toDocument(entry.getKey(), entry.getValue())
.toIndexRequest(entry.getKey(), entry.getValue())
.map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc)))
);
}
@ -89,8 +64,8 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
@Override
public Mono<Void> updateDocument(T key, @NotNull U value) {
return indicizer
.toDocument(key, value)
.flatMap(doc -> luceneIndex.updateDocument(indicizer.toIndex(key), doc));
.toIndexRequest(key, value)
.flatMap(doc -> luceneIndex.update(indicizer.toIndex(key), doc));
}
@Override
@ -98,7 +73,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
return luceneIndex
.updateDocuments(entries
.flatMap(entry -> indicizer
.toDocument(entry.getKey(), entry.getValue())
.toIndexRequest(entry.getKey(), entry.getValue())
.map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc)))
.collectMap(Entry::getKey, Entry::getValue)
);

View File

@ -1,49 +0,0 @@
package it.cavallium.dbengine.database;
import java.util.Arrays;
import org.jetbrains.annotations.Nullable;
public class LLDocument {
private final LLItem[] items;
public LLDocument(LLItem[] items) {
this.items = items;
}
public LLItem[] getItems() {
return items;
}
@Override
public String toString() {
return Arrays.toString(items);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LLDocument that = (LLDocument) o;
return Arrays.equals(items, that.items);
}
@Override
public int hashCode() {
return Arrays.hashCode(items);
}
@Nullable
public LLItem getField(String uid) {
for (LLItem item : items) {
if (item.getName().equals(uid)) {
return item;
}
}
return null;
}
}

View File

@ -0,0 +1,3 @@
package it.cavallium.dbengine.database;
public sealed interface LLIndexRequest permits LLUpdateDocument, LLUpdateFields {}

View File

@ -6,14 +6,11 @@ import it.cavallium.dbengine.client.query.current.data.NoSort;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@ -21,15 +18,15 @@ public interface LLLuceneIndex extends LLSnapshottable {
String getLuceneIndexName();
Mono<Void> addDocument(LLTerm id, LLDocument doc);
Mono<Void> addDocument(LLTerm id, LLUpdateDocument doc);
Mono<Void> addDocuments(Flux<Entry<LLTerm, LLDocument>> documents);
Mono<Void> addDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents);
Mono<Void> deleteDocument(LLTerm id);
Mono<Void> updateDocument(LLTerm id, LLDocument document);
Mono<Void> update(LLTerm id, LLIndexRequest request);
Mono<Void> updateDocuments(Mono<Map<LLTerm, LLDocument>> documents);
Mono<Void> updateDocuments(Mono<Map<LLTerm, LLUpdateDocument>> documents);
Mono<Void> deleteAll();

View File

@ -0,0 +1,3 @@
package it.cavallium.dbengine.database;
public record LLUpdateDocument(LLItem[] items) implements LLIndexRequest {}

View File

@ -0,0 +1,3 @@
package it.cavallium.dbengine.database;
public record LLUpdateFields(LLItem[] items) implements LLIndexRequest {}

View File

@ -18,8 +18,6 @@ import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.RandomSortField;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -39,7 +37,6 @@ import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
@ -129,25 +126,33 @@ public class LLUtils {
return new Term(term.getKey(), term.getValue());
}
public static Document toDocument(LLDocument document) {
public static Document toDocument(LLUpdateDocument document) {
Document d = new Document();
for (LLItem item : document.getItems()) {
for (LLItem item : document.items()) {
d.add(LLUtils.toField(item));
}
return d;
}
public static Collection<Document> toDocuments(Collection<LLDocument> document) {
public static Field[] toFields(LLItem... fields) {
Field[] d = new Field[fields.length];
for (int i = 0; i < fields.length; i++) {
d[i] = LLUtils.toField(fields[i]);
}
return d;
}
public static Collection<Document> toDocuments(Collection<LLUpdateDocument> document) {
List<Document> d = new ArrayList<>(document.size());
for (LLDocument doc : document) {
for (LLUpdateDocument doc : document) {
d.add(LLUtils.toDocument(doc));
}
return d;
}
public static Collection<Document> toDocumentsFromEntries(Collection<Entry<LLTerm, LLDocument>> documentsList) {
public static Collection<Document> toDocumentsFromEntries(Collection<Entry<LLTerm, LLUpdateDocument>> documentsList) {
ArrayList<Document> results = new ArrayList<>(documentsList.size());
for (Entry<LLTerm, LLDocument> entry : documentsList) {
for (Entry<LLTerm, LLUpdateDocument> entry : documentsList) {
results.add(LLUtils.toDocument(entry.getValue()));
}
return results;
@ -161,7 +166,7 @@ public class LLUtils {
return d;
}
private static IndexableField toField(LLItem item) {
private static Field toField(LLItem item) {
return switch (item.getType()) {
case IntPoint -> new IntPoint(item.getName(), Ints.fromByteArray(item.getData()));
case LongPoint -> new LongPoint(item.getName(), Longs.fromByteArray(item.getData()));

View File

@ -3,7 +3,6 @@ package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE;
import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION;
import com.google.common.util.concurrent.Uninterruptibles;
import io.micrometer.core.instrument.MeterRegistry;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.DirectIOOptions;
@ -12,11 +11,14 @@ import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.NRTCachingOptions;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLIndexRequest;
import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.LLItem;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLUpdateFields;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory;
@ -292,12 +294,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<Void> addDocument(LLTerm key, LLDocument doc) {
public Mono<Void> addDocument(LLTerm key, LLUpdateDocument doc) {
return this.<Void>runSafe(() -> indexWriter.addDocument(LLUtils.toDocument(doc))).transform(this::ensureOpen);
}
@Override
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLDocument>> documents) {
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
return documents
.collectList()
.flatMap(documentsList -> this.<Void>runSafe(() -> indexWriter.addDocuments(LLUtils
@ -312,22 +314,30 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<Void> updateDocument(LLTerm id, LLDocument document) {
public Mono<Void> update(LLTerm id, LLIndexRequest request) {
return this
.<Void>runSafe(() -> indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)))
.<Void>runSafe(() -> {
if (request instanceof LLUpdateDocument updateDocument) {
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(updateDocument));
} else if (request instanceof LLUpdateFields updateFields) {
indexWriter.updateDocValues(LLUtils.toTerm(id), LLUtils.toFields(updateFields.items()));
} else {
throw new UnsupportedOperationException("Unexpected request type: " + request);
}
})
.transform(this::ensureOpen);
}
@Override
public Mono<Void> updateDocuments(Mono<Map<LLTerm, LLDocument>> documents) {
public Mono<Void> updateDocuments(Mono<Map<LLTerm, LLUpdateDocument>> documents) {
return documents.flatMap(this::updateDocuments).then();
}
private Mono<Void> updateDocuments(Map<LLTerm, LLDocument> documentsMap) {
private Mono<Void> updateDocuments(Map<LLTerm, LLUpdateDocument> documentsMap) {
return this.<Void>runSafe(() -> {
for (Entry<LLTerm, LLDocument> entry : documentsMap.entrySet()) {
for (Entry<LLTerm, LLUpdateDocument> entry : documentsMap.entrySet()) {
LLTerm key = entry.getKey();
LLDocument value = entry.getValue();
LLUpdateDocument value = entry.getValue();
indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value));
}
}).transform(this::ensureOpen);

View File

@ -6,7 +6,9 @@ import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLIndexRequest;
import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.LLItem;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
@ -119,17 +121,17 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<Void> addDocument(LLTerm id, LLDocument doc) {
public Mono<Void> addDocument(LLTerm id, LLUpdateDocument doc) {
return getLuceneIndex(id).addDocument(id, doc);
}
@SuppressWarnings({"unchecked"})
@Override
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLDocument>> documents) {
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
return documents
.buffer(512)
.flatMap(inputEntries -> {
List<Entry<LLTerm, LLDocument>>[] sortedEntries = new List[luceneIndices.length];
List<Entry<LLTerm, LLUpdateDocument>>[] sortedEntries = new List[luceneIndices.length];
Mono<Void>[] results = new Mono[luceneIndices.length];
// Sort entries
@ -143,7 +145,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
// Add documents
int luceneIndexId = 0;
for (List<Entry<LLTerm, LLDocument>> docs : sortedEntries) {
for (List<Entry<LLTerm, LLUpdateDocument>> docs : sortedEntries) {
if (docs != null && !docs.isEmpty()) {
LLLocalLuceneIndex luceneIndex = luceneIndices[luceneIndexId];
results[luceneIndexId] = luceneIndex.addDocuments(Flux.fromIterable(docs));
@ -164,15 +166,15 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<Void> updateDocument(LLTerm id, LLDocument document) {
return getLuceneIndex(id).updateDocument(id, document);
public Mono<Void> update(LLTerm id, LLIndexRequest request) {
return getLuceneIndex(id).update(id, request);
}
@Override
public Mono<Void> updateDocuments(Mono<Map<LLTerm, LLDocument>> documents) {
public Mono<Void> updateDocuments(Mono<Map<LLTerm, LLUpdateDocument>> documents) {
return documents
.flatMapMany(map -> {
var sortedMap = new HashMap<LLLocalLuceneIndex, Map<LLTerm, LLDocument>>();
var sortedMap = new HashMap<LLLocalLuceneIndex, Map<LLTerm, LLUpdateDocument>>();
map.forEach((key, value) -> sortedMap
.computeIfAbsent(getLuceneIndex(key), _unused -> new HashMap<>())
.put(key, value)

View File

@ -5,13 +5,12 @@ import com.google.common.primitives.Longs;
import it.cavallium.dbengine.client.Indicizer;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.LLItem;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import java.util.LinkedList;
import java.util.Map;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.jetbrains.annotations.NotNull;
@ -20,7 +19,7 @@ import reactor.core.publisher.Mono;
public class StringIndicizer extends Indicizer<String, String> {
@Override
public @NotNull Mono<LLDocument> toDocument(@NotNull String key, @NotNull String value) {
public @NotNull Mono<LLUpdateDocument> toIndexRequest(@NotNull String key, @NotNull String value) {
return Mono.fromCallable(() -> {
var fields = new LinkedList<LLItem>();
fields.add(LLItem.newStringField("uid", key, Field.Store.YES));
@ -37,7 +36,7 @@ public class StringIndicizer extends Indicizer<String, String> {
fields.add(LLItem.newLongPoint("longpoint", numLong));
fields.add(LLItem.newSortedNumericDocValuesField("longsort", numLong));
}
return new LLDocument(fields.toArray(LLItem[]::new));
return new LLUpdateDocument(fields.toArray(LLItem[]::new));
});
}