diff --git a/src/main/java/it/cavallium/dbengine/client/IndexAction.java b/src/main/java/it/cavallium/dbengine/client/IndexAction.java index e0ba3e2..ec5d680 100644 --- a/src/main/java/it/cavallium/dbengine/client/IndexAction.java +++ b/src/main/java/it/cavallium/dbengine/client/IndexAction.java @@ -11,24 +11,20 @@ import it.cavallium.dbengine.client.IndexAction.ReleaseSnapshot; import it.cavallium.dbengine.client.IndexAction.Flush; import it.cavallium.dbengine.client.IndexAction.Refresh; import it.cavallium.dbengine.client.IndexAction.Close; -import it.cavallium.dbengine.database.LLDocument; +import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; import reactor.core.publisher.Flux; import reactor.core.publisher.MonoSink; -import reactor.core.publisher.Sinks.Empty; -import reactor.core.publisher.Sinks.One; sealed interface IndexAction permits Add, AddMulti, Update, UpdateMulti, Delete, DeleteAll, TakeSnapshot, ReleaseSnapshot, Flush, Refresh, Close { IndexActionType getType(); - final record Add(LLTerm key, LLDocument doc, MonoSink addedFuture) implements IndexAction { + final record Add(LLTerm key, LLUpdateDocument doc, MonoSink addedFuture) implements IndexAction { @Override public IndexActionType getType() { @@ -36,7 +32,7 @@ sealed interface IndexAction permits Add, AddMulti, Update, UpdateMulti, Delete, } } - final record AddMulti(Flux> docsFlux, MonoSink addedMultiFuture) implements IndexAction { + final record AddMulti(Flux> docsFlux, MonoSink addedMultiFuture) implements IndexAction { @Override public IndexActionType getType() { @@ -44,7 +40,7 @@ sealed interface IndexAction permits Add, AddMulti, Update, UpdateMulti, Delete, } } - final record Update(LLTerm key, LLDocument doc, MonoSink updatedFuture) implements IndexAction { + final record Update(LLTerm key, LLUpdateDocument doc, MonoSink updatedFuture) implements IndexAction { @Override public IndexActionType getType() { @@ -52,7 +48,7 @@ sealed interface IndexAction permits Add, AddMulti, Update, UpdateMulti, Delete, } } - final record UpdateMulti(Map docs, MonoSink updatedMultiFuture) implements IndexAction { + final record UpdateMulti(Map docs, MonoSink updatedMultiFuture) implements IndexAction { @Override public IndexActionType getType() { diff --git a/src/main/java/it/cavallium/dbengine/client/Indicizer.java b/src/main/java/it/cavallium/dbengine/client/Indicizer.java index b751bc2..f7661f5 100644 --- a/src/main/java/it/cavallium/dbengine/client/Indicizer.java +++ b/src/main/java/it/cavallium/dbengine/client/Indicizer.java @@ -1,11 +1,9 @@ package it.cavallium.dbengine.client; -import it.cavallium.dbengine.database.LLDocument; +import it.cavallium.dbengine.database.LLIndexRequest; +import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLTerm; -import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; -import java.util.Map; import java.util.Set; -import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,7 +11,10 @@ import reactor.util.function.Tuple2; public abstract class Indicizer { - public abstract @NotNull Mono toDocument(@NotNull T key, @NotNull U value); + /** + * Transform a value to an IndexRequest. + */ + public abstract @NotNull Mono toIndexRequest(@NotNull T key, @NotNull U value); public abstract @NotNull LLTerm toIndex(@NotNull T key); diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 6d15cf4..47adb33 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -1,18 +1,6 @@ package it.cavallium.dbengine.client; -import com.google.common.util.concurrent.Uninterruptibles; import io.net5.buffer.api.Send; -import it.cavallium.dbengine.client.IndexAction.Add; -import it.cavallium.dbengine.client.IndexAction.AddMulti; -import it.cavallium.dbengine.client.IndexAction.Close; -import it.cavallium.dbengine.client.IndexAction.Delete; -import it.cavallium.dbengine.client.IndexAction.DeleteAll; -import it.cavallium.dbengine.client.IndexAction.Flush; -import it.cavallium.dbengine.client.IndexAction.Refresh; -import it.cavallium.dbengine.client.IndexAction.ReleaseSnapshot; -import it.cavallium.dbengine.client.IndexAction.TakeSnapshot; -import it.cavallium.dbengine.client.IndexAction.Update; -import it.cavallium.dbengine.client.IndexAction.UpdateMulti; import it.cavallium.dbengine.client.query.ClientQueryParams; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; @@ -20,28 +8,15 @@ import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; -import it.cavallium.dbengine.database.collections.ValueTransformer; -import java.lang.ref.Cleaner; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.locks.LockSupport; -import java.util.logging.Level; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; -import org.warp.commonutils.type.ShortNamedThreadFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.EmitResult; -import reactor.core.publisher.Sinks.Empty; -import reactor.core.publisher.Sinks.Many; -import reactor.core.scheduler.Schedulers; -import reactor.util.concurrent.Queues; import reactor.util.function.Tuple2; public class LuceneIndexImpl implements LuceneIndex { @@ -66,7 +41,7 @@ public class LuceneIndexImpl implements LuceneIndex { @Override public Mono addDocument(T key, U value) { return indicizer - .toDocument(key, value) + .toIndexRequest(key, value) .flatMap(doc -> luceneIndex.addDocument(indicizer.toIndex(key), doc)); } @@ -75,7 +50,7 @@ public class LuceneIndexImpl implements LuceneIndex { return luceneIndex .addDocuments(entries .flatMap(entry -> indicizer - .toDocument(entry.getKey(), entry.getValue()) + .toIndexRequest(entry.getKey(), entry.getValue()) .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) ); } @@ -89,8 +64,8 @@ public class LuceneIndexImpl implements LuceneIndex { @Override public Mono updateDocument(T key, @NotNull U value) { return indicizer - .toDocument(key, value) - .flatMap(doc -> luceneIndex.updateDocument(indicizer.toIndex(key), doc)); + .toIndexRequest(key, value) + .flatMap(doc -> luceneIndex.update(indicizer.toIndex(key), doc)); } @Override @@ -98,7 +73,7 @@ public class LuceneIndexImpl implements LuceneIndex { return luceneIndex .updateDocuments(entries .flatMap(entry -> indicizer - .toDocument(entry.getKey(), entry.getValue()) + .toIndexRequest(entry.getKey(), entry.getValue()) .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) .collectMap(Entry::getKey, Entry::getValue) ); diff --git a/src/main/java/it/cavallium/dbengine/database/LLDocument.java b/src/main/java/it/cavallium/dbengine/database/LLDocument.java deleted file mode 100644 index 3b9ce9a..0000000 --- a/src/main/java/it/cavallium/dbengine/database/LLDocument.java +++ /dev/null @@ -1,49 +0,0 @@ -package it.cavallium.dbengine.database; - -import java.util.Arrays; -import org.jetbrains.annotations.Nullable; - -public class LLDocument { - - private final LLItem[] items; - - public LLDocument(LLItem[] items) { - this.items = items; - } - - public LLItem[] getItems() { - return items; - } - - @Override - public String toString() { - return Arrays.toString(items); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - LLDocument that = (LLDocument) o; - return Arrays.equals(items, that.items); - } - - @Override - public int hashCode() { - return Arrays.hashCode(items); - } - - @Nullable - public LLItem getField(String uid) { - for (LLItem item : items) { - if (item.getName().equals(uid)) { - return item; - } - } - return null; - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/LLIndexRequest.java b/src/main/java/it/cavallium/dbengine/database/LLIndexRequest.java new file mode 100644 index 0000000..6876567 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/LLIndexRequest.java @@ -0,0 +1,3 @@ +package it.cavallium.dbengine.database; + +public sealed interface LLIndexRequest permits LLUpdateDocument, LLUpdateFields {} diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index a0b5eb0..e2dc91c 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -6,14 +6,11 @@ import it.cavallium.dbengine.client.query.current.data.NoSort; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; -import it.cavallium.dbengine.lucene.LuceneUtils; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; -import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -21,15 +18,15 @@ public interface LLLuceneIndex extends LLSnapshottable { String getLuceneIndexName(); - Mono addDocument(LLTerm id, LLDocument doc); + Mono addDocument(LLTerm id, LLUpdateDocument doc); - Mono addDocuments(Flux> documents); + Mono addDocuments(Flux> documents); Mono deleteDocument(LLTerm id); - Mono updateDocument(LLTerm id, LLDocument document); + Mono update(LLTerm id, LLIndexRequest request); - Mono updateDocuments(Mono> documents); + Mono updateDocuments(Mono> documents); Mono deleteAll(); diff --git a/src/main/java/it/cavallium/dbengine/database/LLUpdateDocument.java b/src/main/java/it/cavallium/dbengine/database/LLUpdateDocument.java new file mode 100644 index 0000000..29da299 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/LLUpdateDocument.java @@ -0,0 +1,3 @@ +package it.cavallium.dbengine.database; + +public record LLUpdateDocument(LLItem[] items) implements LLIndexRequest {} diff --git a/src/main/java/it/cavallium/dbengine/database/LLUpdateFields.java b/src/main/java/it/cavallium/dbengine/database/LLUpdateFields.java new file mode 100644 index 0000000..b38c542 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/LLUpdateFields.java @@ -0,0 +1,3 @@ +package it.cavallium.dbengine.database; + +public record LLUpdateFields(LLItem[] items) implements LLIndexRequest {} diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index fa7a82e..86d396b 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -18,8 +18,6 @@ import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.RandomSortField; import java.nio.ByteBuffer; import java.nio.charset.Charset; -import java.nio.charset.CharsetEncoder; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -39,7 +37,6 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; -import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreMode; @@ -129,25 +126,33 @@ public class LLUtils { return new Term(term.getKey(), term.getValue()); } - public static Document toDocument(LLDocument document) { + public static Document toDocument(LLUpdateDocument document) { Document d = new Document(); - for (LLItem item : document.getItems()) { + for (LLItem item : document.items()) { d.add(LLUtils.toField(item)); } return d; } - public static Collection toDocuments(Collection document) { + public static Field[] toFields(LLItem... fields) { + Field[] d = new Field[fields.length]; + for (int i = 0; i < fields.length; i++) { + d[i] = LLUtils.toField(fields[i]); + } + return d; + } + + public static Collection toDocuments(Collection document) { List d = new ArrayList<>(document.size()); - for (LLDocument doc : document) { + for (LLUpdateDocument doc : document) { d.add(LLUtils.toDocument(doc)); } return d; } - public static Collection toDocumentsFromEntries(Collection> documentsList) { + public static Collection toDocumentsFromEntries(Collection> documentsList) { ArrayList results = new ArrayList<>(documentsList.size()); - for (Entry entry : documentsList) { + for (Entry entry : documentsList) { results.add(LLUtils.toDocument(entry.getValue())); } return results; @@ -161,7 +166,7 @@ public class LLUtils { return d; } - private static IndexableField toField(LLItem item) { + private static Field toField(LLItem item) { return switch (item.getType()) { case IntPoint -> new IntPoint(item.getName(), Ints.fromByteArray(item.getData())); case LongPoint -> new LongPoint(item.getName(), Longs.fromByteArray(item.getData())); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index a246c9a..0d6bec5 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -3,7 +3,6 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE; import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION; -import com.google.common.util.concurrent.Uninterruptibles; import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.DirectIOOptions; @@ -12,11 +11,14 @@ import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.client.NRTCachingOptions; import it.cavallium.dbengine.client.query.current.data.QueryParams; -import it.cavallium.dbengine.database.LLDocument; +import it.cavallium.dbengine.database.LLIndexRequest; +import it.cavallium.dbengine.database.LLUpdateDocument; +import it.cavallium.dbengine.database.LLItem; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; +import it.cavallium.dbengine.database.LLUpdateFields; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory; @@ -292,12 +294,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } @Override - public Mono addDocument(LLTerm key, LLDocument doc) { + public Mono addDocument(LLTerm key, LLUpdateDocument doc) { return this.runSafe(() -> indexWriter.addDocument(LLUtils.toDocument(doc))).transform(this::ensureOpen); } @Override - public Mono addDocuments(Flux> documents) { + public Mono addDocuments(Flux> documents) { return documents .collectList() .flatMap(documentsList -> this.runSafe(() -> indexWriter.addDocuments(LLUtils @@ -312,22 +314,30 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } @Override - public Mono updateDocument(LLTerm id, LLDocument document) { + public Mono update(LLTerm id, LLIndexRequest request) { return this - .runSafe(() -> indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document))) + .runSafe(() -> { + if (request instanceof LLUpdateDocument updateDocument) { + indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(updateDocument)); + } else if (request instanceof LLUpdateFields updateFields) { + indexWriter.updateDocValues(LLUtils.toTerm(id), LLUtils.toFields(updateFields.items())); + } else { + throw new UnsupportedOperationException("Unexpected request type: " + request); + } + }) .transform(this::ensureOpen); } @Override - public Mono updateDocuments(Mono> documents) { + public Mono updateDocuments(Mono> documents) { return documents.flatMap(this::updateDocuments).then(); } - private Mono updateDocuments(Map documentsMap) { + private Mono updateDocuments(Map documentsMap) { return this.runSafe(() -> { - for (Entry entry : documentsMap.entrySet()) { + for (Entry entry : documentsMap.entrySet()) { LLTerm key = entry.getKey(); - LLDocument value = entry.getValue(); + LLUpdateDocument value = entry.getValue(); indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value)); } }).transform(this::ensureOpen); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index a6c6304..0587b9f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -6,7 +6,9 @@ import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.client.query.current.data.QueryParams; -import it.cavallium.dbengine.database.LLDocument; +import it.cavallium.dbengine.database.LLIndexRequest; +import it.cavallium.dbengine.database.LLUpdateDocument; +import it.cavallium.dbengine.database.LLItem; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; @@ -119,17 +121,17 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } @Override - public Mono addDocument(LLTerm id, LLDocument doc) { + public Mono addDocument(LLTerm id, LLUpdateDocument doc) { return getLuceneIndex(id).addDocument(id, doc); } @SuppressWarnings({"unchecked"}) @Override - public Mono addDocuments(Flux> documents) { + public Mono addDocuments(Flux> documents) { return documents .buffer(512) .flatMap(inputEntries -> { - List>[] sortedEntries = new List[luceneIndices.length]; + List>[] sortedEntries = new List[luceneIndices.length]; Mono[] results = new Mono[luceneIndices.length]; // Sort entries @@ -143,7 +145,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { // Add documents int luceneIndexId = 0; - for (List> docs : sortedEntries) { + for (List> docs : sortedEntries) { if (docs != null && !docs.isEmpty()) { LLLocalLuceneIndex luceneIndex = luceneIndices[luceneIndexId]; results[luceneIndexId] = luceneIndex.addDocuments(Flux.fromIterable(docs)); @@ -164,15 +166,15 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } @Override - public Mono updateDocument(LLTerm id, LLDocument document) { - return getLuceneIndex(id).updateDocument(id, document); + public Mono update(LLTerm id, LLIndexRequest request) { + return getLuceneIndex(id).update(id, request); } @Override - public Mono updateDocuments(Mono> documents) { + public Mono updateDocuments(Mono> documents) { return documents .flatMapMany(map -> { - var sortedMap = new HashMap>(); + var sortedMap = new HashMap>(); map.forEach((key, value) -> sortedMap .computeIfAbsent(getLuceneIndex(key), _unused -> new HashMap<>()) .put(key, value) diff --git a/src/test/java/it/cavallium/dbengine/StringIndicizer.java b/src/test/java/it/cavallium/dbengine/StringIndicizer.java index ac7f467..b6a084b 100644 --- a/src/test/java/it/cavallium/dbengine/StringIndicizer.java +++ b/src/test/java/it/cavallium/dbengine/StringIndicizer.java @@ -5,13 +5,12 @@ import com.google.common.primitives.Longs; import it.cavallium.dbengine.client.Indicizer; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; -import it.cavallium.dbengine.database.LLDocument; +import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLItem; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; import java.util.LinkedList; -import java.util.Map; import org.apache.lucene.document.Field; import org.apache.lucene.document.Field.Store; import org.jetbrains.annotations.NotNull; @@ -20,7 +19,7 @@ import reactor.core.publisher.Mono; public class StringIndicizer extends Indicizer { @Override - public @NotNull Mono toDocument(@NotNull String key, @NotNull String value) { + public @NotNull Mono toIndexRequest(@NotNull String key, @NotNull String value) { return Mono.fromCallable(() -> { var fields = new LinkedList(); fields.add(LLItem.newStringField("uid", key, Field.Store.YES)); @@ -37,7 +36,7 @@ public class StringIndicizer extends Indicizer { fields.add(LLItem.newLongPoint("longpoint", numLong)); fields.add(LLItem.newSortedNumericDocValuesField("longsort", numLong)); } - return new LLDocument(fields.toArray(LLItem[]::new)); + return new LLUpdateDocument(fields.toArray(LLItem[]::new)); }); }