use a reentrantlock to avoid multiple merges at the same time

This commit is contained in:
Andrea Cavalli 2021-12-17 02:18:30 +01:00
parent 1dffb55572
commit 1a35930909

View File

@ -2,6 +2,8 @@ package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE;
import static it.cavallium.dbengine.database.LLUtils.toDocument;
import static it.cavallium.dbengine.database.LLUtils.toFields;
import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION;
import io.micrometer.core.instrument.MeterRegistry;
@ -50,6 +52,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
@ -91,8 +94,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
* There is only a single thread globally to not overwhelm the disk with
* concurrent commits or concurrent refreshes.
*/
private static final ReentrantLock shutdownLock = new ReentrantLock();
private static final Scheduler luceneHeavyTasksScheduler = uninterruptibleScheduler(Schedulers.single(Schedulers.boundedElastic()));
private static final ExecutorService SAFE_EXECUTOR = Executors.newCachedThreadPool(new ShortNamedThreadFactory("lucene-index-impl"));
private final MeterRegistry meterRegistry;
private final String luceneIndexName;
@ -287,13 +291,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.publishOn(Schedulers.parallel());
}
private <V> Mono<V> runSafe(IORunnable runnable) {
return Mono.<V>fromCallable(() -> {
runnable.run();
return null;
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())).publishOn(Schedulers.parallel());
}
@Override
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return snapshotsManager.releaseSnapshot(snapshot);
@ -301,40 +298,43 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> addDocument(LLTerm key, LLUpdateDocument doc) {
return this.<Void>runSafe(() -> indexWriter.addDocument(LLUtils.toDocument(doc))).transform(this::ensureOpen);
return this.<Void>runSafe(() -> {
indexWriter.addDocument(toDocument(doc));
return null;
}).transform(this::ensureOpen);
}
@Override
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
return documents
.collectList()
.flatMap(documentsList -> this.<Void>runSafe(() -> indexWriter.addDocuments(LLUtils
.toDocumentsFromEntries(documentsList))))
.transform(this::ensureOpen);
return documents.collectList().flatMap(documentsList -> this.<Void>runSafe(() -> {
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
return null;
})).transform(this::ensureOpen);
}
@Override
public Mono<Void> deleteDocument(LLTerm id) {
return this.<Void>runSafe(() -> indexWriter.deleteDocuments(LLUtils.toTerm(id))).transform(this::ensureOpen);
return this.<Void>runSafe(() -> {
indexWriter.deleteDocuments(LLUtils.toTerm(id));
return null;
}).transform(this::ensureOpen);
}
@Override
public Mono<Void> update(LLTerm id, LLIndexRequest request) {
return this
.<Void>runSafe(() -> {
if (request instanceof LLUpdateDocument updateDocument) {
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(updateDocument));
} else if (request instanceof LLSoftUpdateDocument softUpdateDocument) {
indexWriter.softUpdateDocument(LLUtils.toTerm(id),
LLUtils.toDocument(softUpdateDocument.items()),
LLUtils.toFields(softUpdateDocument.softDeleteItems())
);
} else if (request instanceof LLUpdateFields updateFields) {
indexWriter.updateDocValues(LLUtils.toTerm(id), LLUtils.toFields(updateFields.items()));
} else {
throw new UnsupportedOperationException("Unexpected request type: " + request);
switch (request) {
case LLUpdateDocument updateDocument ->
indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument));
case LLSoftUpdateDocument softUpdateDocument -> indexWriter.softUpdateDocument(LLUtils.toTerm(id),
toDocument(softUpdateDocument.items()), toFields(softUpdateDocument.softDeleteItems()));
case LLUpdateFields updateFields ->
indexWriter.updateDocValues(LLUtils.toTerm(id), toFields(updateFields.items()));
case null, default -> throw new UnsupportedOperationException("Unexpected request type: " + request);
}
return null;
})
.transform(this::ensureOpen);
}
@ -349,17 +349,24 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
for (Entry<LLTerm, LLUpdateDocument> entry : documentsMap.entrySet()) {
LLTerm key = entry.getKey();
LLUpdateDocument value = entry.getValue();
indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value));
indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value));
}
return null;
}).transform(this::ensureOpen);
}
@Override
public Mono<Void> deleteAll() {
return this.<Void>runSafe(() -> {
indexWriter.deleteAll();
indexWriter.forceMergeDeletes(true);
indexWriter.commit();
shutdownLock.lock();
try {
indexWriter.deleteAll();
indexWriter.forceMergeDeletes(true);
indexWriter.commit();
} finally {
shutdownLock.unlock();
}
return null;
}).subscribeOn(luceneHeavyTasksScheduler).transform(this::ensureOpen);
}
@ -431,12 +438,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.subscribeOn(luceneHeavyTasksScheduler)
.then(searcherManager.close())
.then(Mono.<Void>fromCallable(() -> {
logger.info("Closing IndexWriter...");
//noinspection BlockingMethodInNonBlockingContext
indexWriter.close();
//noinspection BlockingMethodInNonBlockingContext
directory.close();
logger.info("IndexWriter closed");
shutdownLock.lock();
try {
logger.info("Closing IndexWriter...");
indexWriter.close();
directory.close();
logger.info("IndexWriter closed");
} finally {
shutdownLock.unlock();
}
return null;
}).subscribeOn(luceneHeavyTasksScheduler))
@ -457,8 +467,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono
.<Void>fromCallable(() -> {
if (activeTasks.isTerminated()) return null;
//noinspection BlockingMethodInNonBlockingContext
indexWriter.flush();
shutdownLock.lock();
try {
indexWriter.flush();
} finally {
shutdownLock.unlock();
}
return null;
})
.subscribeOn(luceneHeavyTasksScheduler)
@ -472,12 +486,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
activeTasks.register();
try {
if (activeTasks.isTerminated()) return null;
if (force) {
//noinspection BlockingMethodInNonBlockingContext
searcherManager.maybeRefreshBlocking();
} else {
//noinspection BlockingMethodInNonBlockingContext
searcherManager.maybeRefresh();
shutdownLock.lock();
try {
if (force) {
searcherManager.maybeRefreshBlocking();
} else {
searcherManager.maybeRefresh();
}
} finally {
shutdownLock.unlock();
}
} finally {
activeTasks.arriveAndDeregister();
@ -488,18 +505,24 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
private void scheduledCommit() {
shutdownLock.lock();
try {
indexWriter.commit();
} catch (IOException ex) {
logger.error(MARKER_LUCENE, "Failed to execute a scheduled commit", ex);
} finally {
shutdownLock.unlock();
}
}
private void scheduledMerge() {
shutdownLock.lock();
try {
indexWriter.maybeMerge();
} catch (IOException ex) {
logger.error(MARKER_LUCENE, "Failed to execute a scheduled merge", ex);
} finally {
shutdownLock.unlock();
}
}