diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java index 154c293..df20520 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java @@ -22,7 +22,7 @@ public interface LuceneIndex extends LLSnapshottable { Mono addDocument(T key, U value); - Mono addDocuments(Flux> entries); + Mono addDocuments(boolean atomic, Flux> entries); Mono deleteDocument(T key); diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 6dfb182..b3927b5 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -50,8 +50,8 @@ public class LuceneIndexImpl implements LuceneIndex { } @Override - public Mono addDocuments(Flux> entries) { - return luceneIndex.addDocuments(entries.flatMap(entry -> indicizer + public Mono addDocuments(boolean atomic, Flux> entries) { + return luceneIndex.addDocuments(atomic, entries.flatMap(entry -> indicizer .toDocument(entry.getKey(), entry.getValue()) .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc)))); } diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index f1585e7..adab64f 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -21,12 +21,7 @@ public interface LLLuceneIndex extends LLSnapshottable { Mono addDocument(LLTerm id, LLUpdateDocument doc); - /** - * WARNING! This operation is atomic! - * Please don't send infinite or huge documents fluxes, because they will - * be kept in ram all at once. - */ - Mono addDocuments(Flux> documents); + Mono addDocuments(boolean atomic, Flux> documents); Mono deleteDocument(LLTerm id); diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java index b4d6a9a..f1a9019 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java @@ -84,12 +84,12 @@ public class LLMultiLuceneIndex implements LLLuceneIndex { } @Override - public Mono addDocuments(Flux> documents) { + public Mono addDocuments(boolean atomic, Flux> documents) { return documents .groupBy(term -> LuceneUtils.getLuceneIndexId(term.getKey(), totalShards)) .flatMap(group -> { var index = luceneIndicesById[group.key()]; - return index.addDocuments(group); + return index.addDocuments(atomic, group); }) .then(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 4c212b7..221552f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -272,22 +272,55 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } @Override - public Mono addDocuments(Flux> documents) { - return documents.collectList().flatMap(documentsList -> this.runSafe(() -> { - var count = documentsList.size(); - StopWatch stopWatch = StopWatch.createStarted(); - try { - startedDocIndexings.increment(count); - try { - indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList)); - } finally { - endeddDocIndexings.increment(count); - } - } finally { - docIndexingTime.record(stopWatch.getTime(TimeUnit.MILLISECONDS) / Math.max(count, 1), TimeUnit.MILLISECONDS); - } - return null; - })).transform(this::ensureOpen); + public Mono addDocuments(boolean atomic, Flux> documents) { + if (!atomic) { + return documents + .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .handle((document, sink) -> { + LLUpdateDocument value = document.getValue(); + startedDocIndexings.increment(); + try { + docIndexingTime.recordCallable(() -> { + indexWriter.addDocument(toDocument(value)); + return null; + }); + } catch (Exception ex) { + sink.error(ex); + return; + } finally { + endeddDocIndexings.increment(); + } + sink.complete(); + }) + .then() + .transform(this::ensureOpen); + } else { + return documents + .collectList() + .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .handle((documentsList, sink) -> { + var count = documentsList.size(); + StopWatch stopWatch = StopWatch.createStarted(); + try { + startedDocIndexings.increment(count); + try { + indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList)); + } finally { + endeddDocIndexings.increment(count); + } + } catch (IOException ex) { + sink.error(ex); + return; + } finally { + docIndexingTime.record(stopWatch.getTime(TimeUnit.MILLISECONDS) / Math.max(count, 1), + TimeUnit.MILLISECONDS + ); + } + sink.complete(); + }) + .then() + .transform(this::ensureOpen); + } } @@ -330,28 +363,28 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono updateDocuments(Flux> documents) { return documents - .collectMap(Entry::getKey, Entry::getValue) - .flatMap(this::updateDocuments).then(); + .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .handle((document, sink) -> { + LLTerm key = document.getKey(); + LLUpdateDocument value = document.getValue(); + startedDocIndexings.increment(); + try { + docIndexingTime.recordCallable(() -> { + indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value)); + return null; + }); + } catch (Exception ex) { + sink.error(ex); + return; + } finally { + endeddDocIndexings.increment(); + } + sink.complete(); + }) + .then() + .transform(this::ensureOpen); } - private Mono updateDocuments(Map documentsMap) { - return this.runSafe(() -> { - for (Entry entry : documentsMap.entrySet()) { - LLTerm key = entry.getKey(); - LLUpdateDocument value = entry.getValue(); - startedDocIndexings.increment(); - try { - docIndexingTime.recordCallable(() -> { - indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value)); - return null; - }); - } finally { - endeddDocIndexings.increment(); - } - } - return null; - }).transform(this::ensureOpen); - } @Override public Mono deleteAll() { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index c3b817b..39bf8ff 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -155,10 +155,10 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } @Override - public Mono addDocuments(Flux> documents) { + public Mono addDocuments(boolean atomic, Flux> documents) { return documents .groupBy(term -> getLuceneIndex(term.getKey())) - .flatMap(group -> group.key().addDocuments(group)) + .flatMap(group -> group.key().addDocuments(atomic, group)) .then(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index a7c5ac2..4622343 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -81,6 +81,7 @@ public abstract class LLLocalReactiveRocksIterator extends if (!rangeShared.hasMin() || !rangeShared.hasMax()) { readOptions.setReadaheadSize(32 * 1024); // 32KiB readOptions.setFillCache(false); + readOptions.setVerifyChecksums(false); } if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared)); diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java index b0dc4bf..51fae28 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java @@ -375,7 +375,7 @@ public class LLQuicConnection implements LLDatabaseConnection { } @Override - public Mono addDocuments(Flux> documents) { + public Mono addDocuments(boolean atomic, Flux> documents) { return null; } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 0a1cdae..2634f27 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -600,7 +600,7 @@ public class LuceneUtils { } else if (directoryOptions instanceof MemoryMappedFSDirectory memoryMappedFSDirectory) { return FSDirectory.open(memoryMappedFSDirectory.managedPath().resolve(directoryName + ".lucene.db")); } else if (directoryOptions instanceof NIOFSDirectory niofsDirectory) { - return org.apache.lucene.store.NIOFSDirectory.open(niofsDirectory + return new org.apache.lucene.store.NIOFSDirectory(niofsDirectory .managedPath() .resolve(directoryName + ".lucene.db")); } else if (directoryOptions instanceof NRTCachingDirectory nrtCachingDirectory) {