Optional atomicity of addDocuments

This commit is contained in:
Andrea Cavalli 2022-03-12 00:22:41 +01:00
parent 4a2d143135
commit 9b5071c45e
9 changed files with 80 additions and 51 deletions

View File

@ -22,7 +22,7 @@ public interface LuceneIndex<T, U> extends LLSnapshottable {
Mono<Void> addDocument(T key, U value);
Mono<Void> addDocuments(Flux<Entry<T, U>> entries);
Mono<Void> addDocuments(boolean atomic, Flux<Entry<T, U>> entries);
Mono<Void> deleteDocument(T key);

View File

@ -50,8 +50,8 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
}
@Override
public Mono<Void> addDocuments(Flux<Entry<T, U>> entries) {
return luceneIndex.addDocuments(entries.flatMap(entry -> indicizer
public Mono<Void> addDocuments(boolean atomic, Flux<Entry<T, U>> entries) {
return luceneIndex.addDocuments(atomic, entries.flatMap(entry -> indicizer
.toDocument(entry.getKey(), entry.getValue())
.map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))));
}

View File

@ -21,12 +21,7 @@ public interface LLLuceneIndex extends LLSnapshottable {
Mono<Void> addDocument(LLTerm id, LLUpdateDocument doc);
/**
* WARNING! This operation is atomic!
* Please don't send infinite or huge documents fluxes, because they will
* be kept in ram all at once.
*/
Mono<Void> addDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents);
Mono<Void> addDocuments(boolean atomic, Flux<Entry<LLTerm, LLUpdateDocument>> documents);
Mono<Void> deleteDocument(LLTerm id);

View File

@ -84,12 +84,12 @@ public class LLMultiLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
public Mono<Void> addDocuments(boolean atomic, Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
return documents
.groupBy(term -> LuceneUtils.getLuceneIndexId(term.getKey(), totalShards))
.flatMap(group -> {
var index = luceneIndicesById[group.key()];
return index.addDocuments(group);
return index.addDocuments(atomic, group);
})
.then();
}

View File

@ -272,22 +272,55 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
return documents.collectList().flatMap(documentsList -> this.<Void>runSafe(() -> {
var count = documentsList.size();
StopWatch stopWatch = StopWatch.createStarted();
try {
startedDocIndexings.increment(count);
try {
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
} finally {
endeddDocIndexings.increment(count);
}
} finally {
docIndexingTime.record(stopWatch.getTime(TimeUnit.MILLISECONDS) / Math.max(count, 1), TimeUnit.MILLISECONDS);
}
return null;
})).transform(this::ensureOpen);
public Mono<Void> addDocuments(boolean atomic, Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
if (!atomic) {
return documents
.publishOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.handle((document, sink) -> {
LLUpdateDocument value = document.getValue();
startedDocIndexings.increment();
try {
docIndexingTime.recordCallable(() -> {
indexWriter.addDocument(toDocument(value));
return null;
});
} catch (Exception ex) {
sink.error(ex);
return;
} finally {
endeddDocIndexings.increment();
}
sink.complete();
})
.then()
.transform(this::ensureOpen);
} else {
return documents
.collectList()
.publishOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.handle((documentsList, sink) -> {
var count = documentsList.size();
StopWatch stopWatch = StopWatch.createStarted();
try {
startedDocIndexings.increment(count);
try {
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
} finally {
endeddDocIndexings.increment(count);
}
} catch (IOException ex) {
sink.error(ex);
return;
} finally {
docIndexingTime.record(stopWatch.getTime(TimeUnit.MILLISECONDS) / Math.max(count, 1),
TimeUnit.MILLISECONDS
);
}
sink.complete();
})
.then()
.transform(this::ensureOpen);
}
}
@ -330,28 +363,28 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> updateDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
return documents
.collectMap(Entry::getKey, Entry::getValue)
.flatMap(this::updateDocuments).then();
.publishOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.handle((document, sink) -> {
LLTerm key = document.getKey();
LLUpdateDocument value = document.getValue();
startedDocIndexings.increment();
try {
docIndexingTime.recordCallable(() -> {
indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value));
return null;
});
} catch (Exception ex) {
sink.error(ex);
return;
} finally {
endeddDocIndexings.increment();
}
sink.complete();
})
.then()
.transform(this::ensureOpen);
}
private Mono<Void> updateDocuments(Map<LLTerm, LLUpdateDocument> documentsMap) {
return this.<Void>runSafe(() -> {
for (Entry<LLTerm, LLUpdateDocument> entry : documentsMap.entrySet()) {
LLTerm key = entry.getKey();
LLUpdateDocument value = entry.getValue();
startedDocIndexings.increment();
try {
docIndexingTime.recordCallable(() -> {
indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value));
return null;
});
} finally {
endeddDocIndexings.increment();
}
}
return null;
}).transform(this::ensureOpen);
}
@Override
public Mono<Void> deleteAll() {

View File

@ -155,10 +155,10 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
public Mono<Void> addDocuments(boolean atomic, Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
return documents
.groupBy(term -> getLuceneIndex(term.getKey()))
.flatMap(group -> group.key().addDocuments(group))
.flatMap(group -> group.key().addDocuments(atomic, group))
.then();
}

View File

@ -81,6 +81,7 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
if (!rangeShared.hasMin() || !rangeShared.hasMax()) {
readOptions.setReadaheadSize(32 * 1024); // 32KiB
readOptions.setFillCache(false);
readOptions.setVerifyChecksums(false);
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared));

View File

@ -375,7 +375,7 @@ public class LLQuicConnection implements LLDatabaseConnection {
}
@Override
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
public Mono<Void> addDocuments(boolean atomic, Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
return null;
}

View File

@ -600,7 +600,7 @@ public class LuceneUtils {
} else if (directoryOptions instanceof MemoryMappedFSDirectory memoryMappedFSDirectory) {
return FSDirectory.open(memoryMappedFSDirectory.managedPath().resolve(directoryName + ".lucene.db"));
} else if (directoryOptions instanceof NIOFSDirectory niofsDirectory) {
return org.apache.lucene.store.NIOFSDirectory.open(niofsDirectory
return new org.apache.lucene.store.NIOFSDirectory(niofsDirectory
.managedPath()
.resolve(directoryName + ".lucene.db"));
} else if (directoryOptions instanceof NRTCachingDirectory nrtCachingDirectory) {