diff --git a/pom.xml b/pom.xml
index 4f51bc8..a181dd2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,274 +71,6 @@
scm:git:https://git.ignuranza.net/andreacavalli/CavalliumDBEngine.git
HEAD
-
-
- com.google.guava
- guava
-
-
- org.warp
- common-utils
-
-
- io.netty
- netty5-buffer
-
-
- org.yaml
- snakeyaml
-
-
- javax.annotation
- javax.annotation-api
-
-
- it.unimi.dsi
- fastutil
-
-
- org.junit.jupiter
- junit-jupiter-api
- test
-
-
- org.hamcrest
- hamcrest-core
-
-
-
-
- org.junit.jupiter
- junit-jupiter-engine
- test
-
-
- org.junit.jupiter
- junit-jupiter-params
- test
-
-
- org.assertj
- assertj-core
- 3.22.0
- test
-
-
-
- org.hamcrest
- hamcrest-library
- test
-
-
- org.apache.logging.log4j
- log4j-slf4j-impl
- 2.17.1
- test
-
-
- org.slf4j
- slf4j-api
-
-
- org.apache.logging.log4j
- log4j-api
-
-
-
-
- org.slf4j
- slf4j-api
- 1.7.36
-
-
- org.apache.logging.log4j
- log4j-api
- 2.17.1
-
-
- com.lmax
- disruptor
- 3.4.4
-
-
- org.rocksdb
- rocksdbjni
-
-
- org.apache.lucene
- lucene-core
-
-
- org.apache.lucene
- lucene-join
-
-
- org.apache.lucene
- lucene-analysis-common
-
-
- org.apache.lucene
- lucene-analysis-icu
-
-
- org.apache.lucene
- lucene-codecs
-
-
- org.apache.lucene
- lucene-backward-codecs
-
-
- org.apache.lucene
- lucene-queries
-
-
- org.apache.lucene
- lucene-queryparser
-
-
- org.apache.lucene
- lucene-misc
-
-
- org.apache.lucene
- lucene-facet
-
-
- org.apache.lucene
- lucene-test-framework
- test
-
-
- org.jetbrains
- annotations
-
-
- io.projectreactor
- reactor-core
-
-
- io.projectreactor
- reactor-tools
-
-
- io.projectreactor
- reactor-test
-
-
- io.projectreactor.netty
- reactor-netty-core
-
-
- io.netty.incubator
- netty-incubator-codec-native-quic
- 0.0.25.Final
- linux-x86_64
-
-
- io.netty
- netty-common
-
-
- io.netty
- netty-codec
-
-
- io.netty
- netty-handler
-
-
- io.netty
- netty-transport
-
-
- io.netty
- netty-buffer
-
-
-
-
- io.projectreactor.netty.incubator
- reactor-netty-incubator-quic
- 0.0.4
-
-
- io.netty
- reactor-netty-core
-
-
- io.netty
- netty-common
-
-
- io.netty
- netty-codec
-
-
- io.netty
- netty-handler
-
-
- io.netty
- netty-transport
-
-
- io.netty
- netty-buffer
-
-
-
-
- org.bouncycastle
- bcpkix-jdk15on
- 1.69
-
-
- org.novasearch
- lucene-relevance
-
-
- io.soabase.record-builder
- record-builder-core
- provided
-
-
- it.cavallium
- data-generator-runtime
-
-
- org.jetbrains
- annotations
-
-
-
-
- io.micrometer
- micrometer-core
-
-
- org.slf4j
- slf4j-api
-
-
-
-
- io.micrometer
- micrometer-registry-jmx
- true
-
-
- org.slf4j
- slf4j-api
-
-
-
-
- org.lmdbjava
- lmdbjava
- 0.8.2
-
-
@@ -511,6 +243,282 @@
+
+
+ com.google.guava
+ guava
+
+
+ org.warp
+ common-utils
+
+
+ io.netty
+ netty5-buffer
+
+
+ org.yaml
+ snakeyaml
+
+
+ javax.annotation
+ javax.annotation-api
+
+
+ it.unimi.dsi
+ fastutil
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+ org.hamcrest
+ hamcrest-core
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
+
+ org.assertj
+ assertj-core
+ 3.22.0
+ test
+
+
+
+ org.hamcrest
+ hamcrest-library
+ test
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ 2.17.1
+ test
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.36
+
+
+ org.apache.logging.log4j
+ log4j-api
+ 2.17.1
+
+
+ com.lmax
+ disruptor
+ 3.4.4
+
+
+ org.rocksdb
+ rocksdbjni
+
+
+ org.apache.lucene
+ lucene-core
+
+
+ org.apache.lucene
+ lucene-join
+
+
+ org.apache.lucene
+ lucene-analysis-common
+
+
+ org.apache.lucene
+ lucene-analysis-icu
+
+
+ org.apache.lucene
+ lucene-codecs
+
+
+ org.apache.lucene
+ lucene-backward-codecs
+
+
+ org.apache.lucene
+ lucene-queries
+
+
+ org.apache.lucene
+ lucene-queryparser
+
+
+ org.apache.lucene
+ lucene-misc
+
+
+ org.apache.lucene
+ lucene-facet
+
+
+ org.apache.lucene
+ lucene-test-framework
+ test
+
+
+ org.jetbrains
+ annotations
+
+
+ io.projectreactor
+ reactor-core
+
+
+ io.projectreactor.addons
+ reactor-extra
+
+
+ io.projectreactor
+ reactor-tools
+
+
+ io.projectreactor
+ reactor-test
+
+
+ io.projectreactor.netty
+ reactor-netty-core
+
+
+ io.netty.incubator
+ netty-incubator-codec-native-quic
+ 0.0.26.Final
+ linux-x86_64
+
+
+ io.netty
+ netty-common
+
+
+ io.netty
+ netty-codec
+
+
+ io.netty
+ netty-handler
+
+
+ io.netty
+ netty-transport
+
+
+ io.netty
+ netty-buffer
+
+
+
+
+ io.projectreactor.netty.incubator
+ reactor-netty-incubator-quic
+ 0.0.5
+
+
+ io.netty.incubator
+ netty-incubator-codec-native-quic
+
+
+ io.netty
+ reactor-netty-core
+
+
+ io.netty
+ netty-common
+
+
+ io.netty
+ netty-codec
+
+
+ io.netty
+ netty-handler
+
+
+ io.netty
+ netty-transport
+
+
+ io.netty
+ netty-buffer
+
+
+
+
+ org.bouncycastle
+ bcpkix-jdk15on
+ 1.69
+
+
+ org.novasearch
+ lucene-relevance
+
+
+ io.soabase.record-builder
+ record-builder-core
+ provided
+
+
+ it.cavallium
+ data-generator-runtime
+
+
+ org.jetbrains
+ annotations
+
+
+
+
+ io.micrometer
+ micrometer-core
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ io.micrometer
+ micrometer-registry-jmx
+ true
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ org.lmdbjava
+ lmdbjava
+ 0.8.2
+
+
src/test/java
diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
index 8ce1b43..5c95b93 100644
--- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
@@ -22,13 +22,13 @@ public interface LuceneIndex extends LLSnapshottable {
Mono addDocument(T key, U value);
- Mono addDocuments(boolean atomic, Flux> entries);
+ Mono addDocuments(boolean atomic, Flux> entries);
Mono deleteDocument(T key);
Mono updateDocument(T key, @NotNull U value);
- Mono updateDocuments(Flux> entries);
+ Mono updateDocuments(Flux> entries);
default Mono updateOrDeleteDocument(T key, @Nullable U value) {
if (value == null) {
diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
index dcaf316..d3c26ee 100644
--- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
+++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
@@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
+import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
@@ -19,10 +20,12 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.logging.Level;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
public class LuceneIndexImpl implements LuceneIndex {
@@ -50,7 +53,7 @@ public class LuceneIndexImpl implements LuceneIndex {
}
@Override
- public Mono addDocuments(boolean atomic, Flux> entries) {
+ public Mono addDocuments(boolean atomic, Flux> entries) {
return luceneIndex.addDocuments(atomic, entries.flatMap(entry -> indicizer
.toDocument(entry.getKey(), entry.getValue())
.map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))));
@@ -70,12 +73,17 @@ public class LuceneIndexImpl implements LuceneIndex {
}
@Override
- public Mono updateDocuments(Flux> entries) {
- return luceneIndex.updateDocuments(entries.flatMap(entry -> Mono.zip(
- Mono.just(indicizer.toIndex(entry.getKey())),
- indicizer.toDocument(entry.getKey(), entry.getValue()).single(),
- Map::entry
- )));
+ public Mono updateDocuments(Flux> entries) {
+ Flux> mappedEntries = entries
+ .flatMap(entry -> Mono
+ .zip(Mono.just(indicizer.toIndex(entry.getKey())),
+ indicizer.toDocument(entry.getKey(), entry.getValue()).single(),
+ Map::entry
+ )
+ .single()
+ )
+ .log("impl-update-documents", Level.FINEST, false, SignalType.ON_NEXT, SignalType.ON_COMPLETE);
+ return luceneIndex.updateDocuments(mappedEntries);
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
index adab64f..7d826e8 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
@@ -21,13 +21,13 @@ public interface LLLuceneIndex extends LLSnapshottable {
Mono addDocument(LLTerm id, LLUpdateDocument doc);
- Mono addDocuments(boolean atomic, Flux> documents);
+ Mono addDocuments(boolean atomic, Flux> documents);
Mono deleteDocument(LLTerm id);
Mono update(LLTerm id, LLIndexRequest request);
- Mono updateDocuments(Flux> documents);
+ Mono updateDocuments(Flux> documents);
Mono deleteAll();
diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java
index e635170..25ea525 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java
@@ -22,10 +22,12 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
public class LLMultiLuceneIndex implements LLLuceneIndex {
@@ -84,14 +86,14 @@ public class LLMultiLuceneIndex implements LLLuceneIndex {
}
@Override
- public Mono addDocuments(boolean atomic, Flux> documents) {
+ public Mono addDocuments(boolean atomic, Flux> documents) {
return documents
.groupBy(term -> LuceneUtils.getLuceneIndexId(term.getKey(), totalShards))
.flatMap(group -> {
var index = luceneIndicesById[group.key()];
return index.addDocuments(atomic, group);
})
- .then();
+ .reduce(0L, Long::sum);
}
@Override
@@ -105,14 +107,12 @@ public class LLMultiLuceneIndex implements LLLuceneIndex {
}
@Override
- public Mono updateDocuments(Flux> documents) {
+ public Mono updateDocuments(Flux> documents) {
return documents
- .groupBy(term -> LuceneUtils.getLuceneIndexId(term.getKey(), totalShards))
- .flatMap(group -> {
- var index = luceneIndicesById[group.key()];
- return index.updateDocuments(group);
- })
- .then();
+ .log("multi-update-documents", Level.FINEST, false, SignalType.ON_NEXT, SignalType.ON_COMPLETE)
+ .groupBy(term -> getLuceneIndex(term.getKey()))
+ .flatMap(groupFlux -> groupFlux.key().updateDocuments(groupFlux))
+ .reduce(0L, Long::sum);
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
index 80efce5..556500a 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
@@ -1725,6 +1725,6 @@ public class LLLocalDictionary implements LLDictionary {
seekTo = () -> {};
rocksIterator.seekToFirst();
}
- return new RocksIteratorTuple(rocksIterator, sliceMin, sliceMax, seekTo);
+ return new RocksIteratorTuple(List.of(readOptions), rocksIterator, sliceMin, sliceMax, seekTo);
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
index 5f3c320..56a7f00 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
@@ -44,6 +44,7 @@ import org.jetbrains.annotations.Nullable;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
+import org.rocksdb.ChecksumType;
import org.rocksdb.ClockCache;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
@@ -194,6 +195,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
tableOptions.setOptimizeFiltersForMemory(true);
}
tableOptions
+ .setChecksumType(ChecksumType.kxxHash64)
.setBlockCacheCompressed(optionsWithCache.compressedCache())
.setBlockCache(optionsWithCache.standardCache())
.setBlockSize(16 * 1024); // 16KiB
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
index f1d9e62..e9347fd 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
@@ -44,11 +44,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -70,6 +72,7 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@@ -135,12 +138,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
logger.debug("Lucene MMap is supported");
}
this.lowMemory = luceneOptions.lowMemory();
+ this.shardName = LuceneUtils.getStandardName(clusterName, shardIndex);
this.directory = LuceneUtils.createLuceneDirectory(luceneOptions.directoryOptions(),
- LuceneUtils.getStandardName(clusterName, shardIndex),
+ shardName,
rocksDBManager);
boolean isFilesystemCompressed = LuceneUtils.getIsFilesystemCompressed(luceneOptions.directoryOptions());
- this.shardName = clusterName;
var snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers);
this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
@@ -264,19 +267,23 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono addDocument(LLTerm key, LLUpdateDocument doc) {
- return this.runSafe(() -> docIndexingTime.recordCallable(() -> {
- startedDocIndexings.increment();
- try {
- indexWriter.addDocument(toDocument(doc));
- } finally {
- endeddDocIndexings.increment();
- }
+ return this.runSafe(() -> {
+ docIndexingTime.recordCallable(() -> {
+ startedDocIndexings.increment();
+ try {
+ indexWriter.addDocument(toDocument(doc));
+ } finally {
+ endeddDocIndexings.increment();
+ }
+ return null;
+ });
+ logger.trace(MARKER_LUCENE, "Added document {}: {}", key, doc);
return null;
- })).transform(this::ensureOpen);
+ }).transform(this::ensureOpen);
}
@Override
- public Mono addDocuments(boolean atomic, Flux> documents) {
+ public Mono addDocuments(boolean atomic, Flux> documents) {
if (!atomic) {
return documents
.publishOn(bulkScheduler)
@@ -294,15 +301,16 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} finally {
endeddDocIndexings.increment();
}
- sink.complete();
+ logger.trace(MARKER_LUCENE, "Added document: {}", document);
+ sink.next(true);
})
- .then()
+ .count()
.transform(this::ensureOpen);
} else {
return documents
.collectList()
.publishOn(bulkScheduler)
- .handle((documentsList, sink) -> {
+ .handle((documentsList, sink) -> {
var count = documentsList.size();
StopWatch stopWatch = StopWatch.createStarted();
try {
@@ -320,9 +328,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
TimeUnit.MILLISECONDS
);
}
- sink.complete();
+ sink.next((long) documentsList.size());
})
- .then()
.transform(this::ensureOpen);
}
}
@@ -343,30 +350,36 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono update(LLTerm id, LLIndexRequest request) {
- return this.runSafe(() -> docIndexingTime.recordCallable(() -> {
- startedDocIndexings.increment();
- try {
- if (request instanceof LLUpdateDocument updateDocument) {
- indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument));
- } else if (request instanceof LLSoftUpdateDocument softUpdateDocument) {
- indexWriter.softUpdateDocument(LLUtils.toTerm(id),
- toDocument(softUpdateDocument.items()),
- toFields(softUpdateDocument.softDeleteItems()));
- } else if (request instanceof LLUpdateFields updateFields) {
- indexWriter.updateDocValues(LLUtils.toTerm(id), toFields(updateFields.items()));
- } else {
- throw new UnsupportedOperationException("Unexpected request type: " + request);
+ return this.runSafe(() -> {
+ docIndexingTime.recordCallable(() -> {
+ startedDocIndexings.increment();
+ try {
+ if (request instanceof LLUpdateDocument updateDocument) {
+ indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument));
+ } else if (request instanceof LLSoftUpdateDocument softUpdateDocument) {
+ indexWriter.softUpdateDocument(LLUtils.toTerm(id),
+ toDocument(softUpdateDocument.items()),
+ toFields(softUpdateDocument.softDeleteItems())
+ );
+ } else if (request instanceof LLUpdateFields updateFields) {
+ indexWriter.updateDocValues(LLUtils.toTerm(id), toFields(updateFields.items()));
+ } else {
+ throw new UnsupportedOperationException("Unexpected request type: " + request);
+ }
+ } finally {
+ endeddDocIndexings.increment();
}
- } finally {
- endeddDocIndexings.increment();
- }
+ return null;
+ });
+ logger.trace(MARKER_LUCENE, "Updated document {}: {}", id, request);
return null;
- })).transform(this::ensureOpen);
+ }).transform(this::ensureOpen);
}
@Override
- public Mono updateDocuments(Flux> documents) {
+ public Mono updateDocuments(Flux> documents) {
return documents
+ .log("local-update-documents", Level.FINEST, false, SignalType.ON_NEXT, SignalType.ON_COMPLETE)
.publishOn(bulkScheduler)
.handle((document, sink) -> {
LLTerm key = document.getKey();
@@ -384,9 +397,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} finally {
endeddDocIndexings.increment();
}
- sink.complete();
+ sink.next(true);
})
- .then()
+ .count()
.transform(this::ensureOpen);
}
@@ -589,4 +602,22 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return lowMemory;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ LLLocalLuceneIndex that = (LLLocalLuceneIndex) o;
+
+ return Objects.equals(shardName, that.shardName);
+ }
+
+ @Override
+ public int hashCode() {
+ return shardName.hashCode();
+ }
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
index e452fbf..ab522af 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
@@ -42,6 +42,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -51,7 +52,9 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;
+import reactor.math.MathFlux;
public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@@ -163,13 +166,13 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
@Override
- public Mono addDocuments(boolean atomic, Flux> documents) {
+ public Mono addDocuments(boolean atomic, Flux> documents) {
if (BYPASS_GROUPBY_BUG) {
return documents
.buffer(8192)
.flatMap(inputEntries -> {
List>[] sortedEntries = new List[totalShards];
- Mono[] results = new Mono[totalShards];
+ Mono[] results = new Mono[totalShards];
// Sort entries
for(var inputEntry : inputEntries) {
@@ -192,14 +195,14 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
luceneIndexId++;
}
- return Mono.when(results);
+ return Flux.merge(results).reduce(0L, Long::sum);
})
- .then();
+ .reduce(0L, Long::sum);
} else {
return documents
.groupBy(term -> getLuceneIndex(term.getKey()))
.flatMap(group -> group.key().addDocuments(atomic, group))
- .then();
+ .reduce(0L, Long::sum);
}
}
@@ -214,7 +217,9 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
@Override
- public Mono updateDocuments(Flux> documents) {
+ public Mono updateDocuments(Flux> documents) {
+ documents = documents
+ .log("local-multi-update-documents", Level.FINEST, false, SignalType.ON_NEXT, SignalType.ON_COMPLETE);
if (BYPASS_GROUPBY_BUG) {
int bufferSize = 8192;
return documents
@@ -222,20 +227,19 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
.flatMap(bufferFlux -> bufferFlux
.collect(Collectors.groupingBy(inputEntry -> LuceneUtils.getLuceneIndexId(inputEntry.getKey(), totalShards),
Collectors.collectingAndThen(Collectors.toList(), docs -> {
- int luceneIndexId = LuceneUtils.getLuceneIndexId(docs.get(0).getKey(), totalShards);
- LLLocalLuceneIndex luceneIndex = Objects.requireNonNull(luceneIndicesById[luceneIndexId]);
+ var luceneIndex = getLuceneIndex(docs.get(0).getKey());
return luceneIndex.updateDocuments(Flux.fromIterable(docs));
}))
)
.map(Map::values)
- .flatMap(Mono::whenDelayError)
+ .flatMap(parts -> Flux.merge(parts).reduce(0L, Long::sum))
)
- .then();
+ .reduce(0L, Long::sum);
} else {
return documents
.groupBy(term -> getLuceneIndex(term.getKey()))
.flatMap(group -> group.key().updateDocuments(group))
- .then();
+ .reduce(0L, Long::sum);
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java
index d5fe867..6937c3e 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java
@@ -155,7 +155,7 @@ public abstract class LLLocalReactiveRocksIterator extends
@Override
protected Owned> prepareSend() {
var range = this.rangeShared.send();
- var readOptions = new ReadOptions(this.readOptions);
+ var readOptions = this.readOptions;
return drop -> new LLLocalReactiveRocksIterator<>(db, range, allowNettyDirect, readOptions, readValues) {
@Override
public T getEntry(@Nullable Send key, @Nullable Send value) {
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java
index c5b4fe8..8ba26fa 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java
@@ -1,15 +1,21 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.SafeCloseable;
+import java.util.List;
import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractImmutableNativeReference;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksObject;
-public record RocksIteratorTuple(@NotNull RocksIterator iterator, @NotNull ReleasableSlice sliceMin,
- @NotNull ReleasableSlice sliceMax, @NotNull SafeCloseable seekTo) implements
+public record RocksIteratorTuple(List refs, @NotNull RocksIterator iterator,
+ @NotNull ReleasableSlice sliceMin, @NotNull ReleasableSlice sliceMax,
+ @NotNull SafeCloseable seekTo) implements
SafeCloseable {
@Override
public void close() {
+ refs.forEach(AbstractImmutableNativeReference::close);
iterator.close();
sliceMin.close();
sliceMax.close();
diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java
index 77f80cf..d55c95a 100644
--- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java
+++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java
@@ -375,7 +375,7 @@ public class LLQuicConnection implements LLDatabaseConnection {
}
@Override
- public Mono addDocuments(boolean atomic, Flux> documents) {
+ public Mono addDocuments(boolean atomic, Flux> documents) {
return null;
}
@@ -390,7 +390,7 @@ public class LLQuicConnection implements LLDatabaseConnection {
}
@Override
- public Mono updateDocuments(Flux> documents) {
+ public Mono updateDocuments(Flux> documents) {
return null;
}