From 1a35930909c8138b26d37b0666acc6d01e67cbac Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 17 Dec 2021 02:18:30 +0100 Subject: [PATCH] use a reentrantlock to avoid multiple merges at the same time --- .../database/disk/LLLocalLuceneIndex.java | 111 +++++++++++------- 1 file changed, 67 insertions(+), 44 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index fccb7a6..42a170a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -2,6 +2,8 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE; +import static it.cavallium.dbengine.database.LLUtils.toDocument; +import static it.cavallium.dbengine.database.LLUtils.toFields; import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION; import io.micrometer.core.instrument.MeterRegistry; @@ -50,6 +52,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; @@ -91,8 +94,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { * There is only a single thread globally to not overwhelm the disk with * concurrent commits or concurrent refreshes. */ + private static final ReentrantLock shutdownLock = new ReentrantLock(); private static final Scheduler luceneHeavyTasksScheduler = uninterruptibleScheduler(Schedulers.single(Schedulers.boundedElastic())); - private static final ExecutorService SAFE_EXECUTOR = Executors.newCachedThreadPool(new ShortNamedThreadFactory("lucene-index-impl")); + private final MeterRegistry meterRegistry; private final String luceneIndexName; @@ -287,13 +291,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .publishOn(Schedulers.parallel()); } - private Mono runSafe(IORunnable runnable) { - return Mono.fromCallable(() -> { - runnable.run(); - return null; - }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())).publishOn(Schedulers.parallel()); - } - @Override public Mono releaseSnapshot(LLSnapshot snapshot) { return snapshotsManager.releaseSnapshot(snapshot); @@ -301,40 +298,43 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono addDocument(LLTerm key, LLUpdateDocument doc) { - return this.runSafe(() -> indexWriter.addDocument(LLUtils.toDocument(doc))).transform(this::ensureOpen); + return this.runSafe(() -> { + indexWriter.addDocument(toDocument(doc)); + return null; + }).transform(this::ensureOpen); } @Override public Mono addDocuments(Flux> documents) { - return documents - .collectList() - .flatMap(documentsList -> this.runSafe(() -> indexWriter.addDocuments(LLUtils - .toDocumentsFromEntries(documentsList)))) - .transform(this::ensureOpen); + return documents.collectList().flatMap(documentsList -> this.runSafe(() -> { + indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList)); + return null; + })).transform(this::ensureOpen); } @Override public Mono deleteDocument(LLTerm id) { - return this.runSafe(() -> indexWriter.deleteDocuments(LLUtils.toTerm(id))).transform(this::ensureOpen); + return this.runSafe(() -> { + indexWriter.deleteDocuments(LLUtils.toTerm(id)); + return null; + }).transform(this::ensureOpen); } @Override public Mono update(LLTerm id, LLIndexRequest request) { return this .runSafe(() -> { - if (request instanceof LLUpdateDocument updateDocument) { - indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(updateDocument)); - } else if (request instanceof LLSoftUpdateDocument softUpdateDocument) { - indexWriter.softUpdateDocument(LLUtils.toTerm(id), - LLUtils.toDocument(softUpdateDocument.items()), - LLUtils.toFields(softUpdateDocument.softDeleteItems()) - ); - } else if (request instanceof LLUpdateFields updateFields) { - indexWriter.updateDocValues(LLUtils.toTerm(id), LLUtils.toFields(updateFields.items())); - } else { - throw new UnsupportedOperationException("Unexpected request type: " + request); + switch (request) { + case LLUpdateDocument updateDocument -> + indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument)); + case LLSoftUpdateDocument softUpdateDocument -> indexWriter.softUpdateDocument(LLUtils.toTerm(id), + toDocument(softUpdateDocument.items()), toFields(softUpdateDocument.softDeleteItems())); + case LLUpdateFields updateFields -> + indexWriter.updateDocValues(LLUtils.toTerm(id), toFields(updateFields.items())); + case null, default -> throw new UnsupportedOperationException("Unexpected request type: " + request); } + return null; }) .transform(this::ensureOpen); } @@ -349,17 +349,24 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { for (Entry entry : documentsMap.entrySet()) { LLTerm key = entry.getKey(); LLUpdateDocument value = entry.getValue(); - indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value)); + indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value)); } + return null; }).transform(this::ensureOpen); } @Override public Mono deleteAll() { return this.runSafe(() -> { - indexWriter.deleteAll(); - indexWriter.forceMergeDeletes(true); - indexWriter.commit(); + shutdownLock.lock(); + try { + indexWriter.deleteAll(); + indexWriter.forceMergeDeletes(true); + indexWriter.commit(); + } finally { + shutdownLock.unlock(); + } + return null; }).subscribeOn(luceneHeavyTasksScheduler).transform(this::ensureOpen); } @@ -431,12 +438,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .subscribeOn(luceneHeavyTasksScheduler) .then(searcherManager.close()) .then(Mono.fromCallable(() -> { - logger.info("Closing IndexWriter..."); - //noinspection BlockingMethodInNonBlockingContext - indexWriter.close(); - //noinspection BlockingMethodInNonBlockingContext - directory.close(); - logger.info("IndexWriter closed"); + shutdownLock.lock(); + try { + logger.info("Closing IndexWriter..."); + indexWriter.close(); + directory.close(); + logger.info("IndexWriter closed"); + } finally { + shutdownLock.unlock(); + } return null; }).subscribeOn(luceneHeavyTasksScheduler)) @@ -457,8 +467,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono .fromCallable(() -> { if (activeTasks.isTerminated()) return null; - //noinspection BlockingMethodInNonBlockingContext - indexWriter.flush(); + shutdownLock.lock(); + try { + indexWriter.flush(); + } finally { + shutdownLock.unlock(); + } return null; }) .subscribeOn(luceneHeavyTasksScheduler) @@ -472,12 +486,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { activeTasks.register(); try { if (activeTasks.isTerminated()) return null; - if (force) { - //noinspection BlockingMethodInNonBlockingContext - searcherManager.maybeRefreshBlocking(); - } else { - //noinspection BlockingMethodInNonBlockingContext - searcherManager.maybeRefresh(); + shutdownLock.lock(); + try { + if (force) { + searcherManager.maybeRefreshBlocking(); + } else { + searcherManager.maybeRefresh(); + } + } finally { + shutdownLock.unlock(); } } finally { activeTasks.arriveAndDeregister(); @@ -488,18 +505,24 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } private void scheduledCommit() { + shutdownLock.lock(); try { indexWriter.commit(); } catch (IOException ex) { logger.error(MARKER_LUCENE, "Failed to execute a scheduled commit", ex); + } finally { + shutdownLock.unlock(); } } private void scheduledMerge() { + shutdownLock.lock(); try { indexWriter.maybeMerge(); } catch (IOException ex) { logger.error(MARKER_LUCENE, "Failed to execute a scheduled merge", ex); + } finally { + shutdownLock.unlock(); } }