Add ensureOpen method to avoid writing to the index when closing

This commit is contained in:
Andrea Cavalli 2021-10-07 00:53:38 +02:00
parent c4dc83a883
commit 33e4cb2f14

View File

@ -30,6 +30,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexWriter;
@ -48,6 +49,7 @@ import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.Constants;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
@ -77,6 +79,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private final boolean lowMemory;
private final Phaser activeTasks = new Phaser(1);
private final AtomicBoolean closeRequested = new AtomicBoolean();
public LLLocalLuceneIndex(@Nullable Path luceneBasePath,
String name,
@ -215,7 +218,17 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<LLSnapshot> takeSnapshot() {
return snapshotsManager.takeSnapshot().subscribeOn(luceneHeavyTasksScheduler);
return snapshotsManager.takeSnapshot().subscribeOn(luceneHeavyTasksScheduler).transform(this::ensureOpen);
}
private <V> Mono<V> ensureOpen(Mono<V> mono) {
return Mono.defer(() -> {
if (closeRequested.get()) {
return Mono.error(new IllegalStateException("Lucene index is closed"));
} else {
return mono;
}
}).doFirst(activeTasks::register).doFinally(s -> activeTasks.arriveAndDeregister());
}
@Override
@ -226,14 +239,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> addDocument(LLTerm key, LLDocument doc) {
return Mono.<Void>fromCallable(() -> {
activeTasks.register();
try {
indexWriter.addDocument(LLUtils.toDocument(doc));
return null;
} finally {
activeTasks.arriveAndDeregister();
}
}).subscribeOn(Schedulers.boundedElastic());
}).subscribeOn(Schedulers.boundedElastic()).transform(this::ensureOpen);
}
@Override
@ -242,42 +250,28 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.collectList()
.flatMap(documentsList -> Mono
.<Void>fromCallable(() -> {
activeTasks.register();
try {
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
return null;
} finally {
activeTasks.arriveAndDeregister();
}
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
return null;
}).subscribeOn(Schedulers.boundedElastic())
);
)
.transform(this::ensureOpen);
}
@Override
public Mono<Void> deleteDocument(LLTerm id) {
return Mono.<Void>fromCallable(() -> {
activeTasks.register();
try {
indexWriter.deleteDocuments(LLUtils.toTerm(id));
return null;
} finally {
activeTasks.arriveAndDeregister();
}
}).subscribeOn(Schedulers.boundedElastic());
indexWriter.deleteDocuments(LLUtils.toTerm(id));
return null;
}).subscribeOn(Schedulers.boundedElastic()).transform(this::ensureOpen);
}
@Override
public Mono<Void> updateDocument(LLTerm id, LLDocument document) {
return Mono.<Void>fromCallable(() -> {
activeTasks.register();
try {
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document));
} finally {
activeTasks.arriveAndDeregister();
}
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document));
return null;
}).subscribeOn(Schedulers.boundedElastic());
}).subscribeOn(Schedulers.boundedElastic()).transform(this::ensureOpen);
}
@Override
@ -288,37 +282,28 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private Mono<Void> updateDocuments(Map<LLTerm, LLDocument> documentsMap) {
return Mono
.<Void>fromCallable(() -> {
activeTasks.register();
try {
for (Entry<LLTerm, LLDocument> entry : documentsMap.entrySet()) {
LLTerm key = entry.getKey();
LLDocument value = entry.getValue();
indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value));
}
return null;
} finally {
activeTasks.arriveAndDeregister();
for (Entry<LLTerm, LLDocument> entry : documentsMap.entrySet()) {
LLTerm key = entry.getKey();
LLDocument value = entry.getValue();
indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value));
}
return null;
})
.subscribeOn(Schedulers.boundedElastic());
.subscribeOn(Schedulers.boundedElastic())
.transform(this::ensureOpen);
}
@Override
public Mono<Void> deleteAll() {
return Mono.<Void>fromCallable(() -> {
activeTasks.register();
try {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.deleteAll();
//noinspection BlockingMethodInNonBlockingContext
indexWriter.forceMergeDeletes(true);
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
return null;
} finally {
activeTasks.arriveAndDeregister();
}
}).subscribeOn(luceneHeavyTasksScheduler);
//noinspection BlockingMethodInNonBlockingContext
indexWriter.deleteAll();
//noinspection BlockingMethodInNonBlockingContext
indexWriter.forceMergeDeletes(true);
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
return null;
}).subscribeOn(luceneHeavyTasksScheduler).transform(this::ensureOpen);
}
@Override
@ -373,24 +358,31 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
directory.close();
logger.info("IndexWriter closed");
return null;
}).subscribeOn(luceneHeavyTasksScheduler));
}).subscribeOn(luceneHeavyTasksScheduler))
// Avoid closing multiple times
.transformDeferred(mono -> {
if (this.closeRequested.compareAndSet(false, true)) {
logger.trace("Set closeRequested to true. Further update/write calls will result in an error");
return mono;
} else {
logger.debug("Tried to close more than once");
return Mono.empty();
}
});
}
@Override
public Mono<Void> flush() {
return Mono
.<Void>fromCallable(() -> {
activeTasks.register();
try {
if (activeTasks.isTerminated()) return null;
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
} finally {
activeTasks.arriveAndDeregister();
}
if (activeTasks.isTerminated()) return null;
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
return null;
})
.subscribeOn(luceneHeavyTasksScheduler);
.subscribeOn(luceneHeavyTasksScheduler)
.transform(this::ensureOpen);
}
@Override