From 8083364ebfe0bc735fbabb3d32a6a252414eac7d Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 21 Jun 2022 14:35:07 +0200 Subject: [PATCH] Add waitForMerges, waitForLastMerges, flush, fix #210, fix #209 --- .../dbengine/client/CompositeDatabase.java | 2 + .../dbengine/client/LuceneIndex.java | 4 + .../dbengine/client/LuceneIndexImpl.java | 10 + .../dbengine/database/LLLuceneIndex.java | 8 + .../dbengine/database/LLMultiLuceneIndex.java | 25 ++- .../collections/DatabaseMapDictionary.java | 10 +- .../disk/CachedIndexSearcherManager.java | 63 +++--- .../database/disk/LLLocalLuceneIndex.java | 190 +++++++++++++++--- .../disk/LLLocalMultiLuceneIndex.java | 32 ++- .../database/disk/SnapshotsManager.java | 7 + .../database/remote/LLQuicConnection.java | 10 + 11 files changed, 289 insertions(+), 72 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java index 105d548..c5b3956 100644 --- a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java +++ b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java @@ -8,6 +8,8 @@ import reactor.core.publisher.Mono; public interface CompositeDatabase extends DatabaseProperties { + Mono preClose(); + Mono close(); /** diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java index 5c95b93..af3bf24 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java @@ -70,5 +70,9 @@ public interface LuceneIndex extends LLSnapshottable { Mono flush(); + Mono waitForMerges(); + + Mono waitForLastMerges(); + Mono refresh(boolean force); } diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 33c4239..bbdc107 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -174,6 +174,16 @@ public class LuceneIndexImpl implements LuceneIndex { return luceneIndex.flush(); } + @Override + public Mono waitForMerges() { + return luceneIndex.waitForMerges(); + } + + @Override + public Mono waitForLastMerges() { + return luceneIndex.waitForLastMerges(); + } + /** * Refresh index searcher */ diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index 7d826e8..955798e 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -82,6 +82,14 @@ public interface LLLuceneIndex extends LLSnapshottable { */ Mono flush(); + Mono waitForMerges(); + + /** + * Wait for the latest pending merge + * This disables future merges until shutdown! + */ + Mono waitForLastMerges(); + /** * Refresh index searcher */ diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java index 25ea525..c6814dc 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; @@ -117,7 +118,8 @@ public class LLMultiLuceneIndex implements LLLuceneIndex { @Override public Mono deleteAll() { - return luceneIndicesFlux.flatMap(LLLuceneIndex::deleteAll).then(); + Iterable> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::deleteAll).iterator(); + return Mono.whenDelayError(it); } @Override @@ -192,17 +194,32 @@ public class LLMultiLuceneIndex implements LLLuceneIndex { @Override public Mono close() { - return luceneIndicesFlux.flatMap(LLLuceneIndex::close).then(); + Iterable> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::close).iterator(); + return Mono.whenDelayError(it); } @Override public Mono flush() { - return luceneIndicesFlux.flatMap(LLLuceneIndex::flush).then(); + Iterable> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::flush).iterator(); + return Mono.whenDelayError(it); + } + + @Override + public Mono waitForMerges() { + Iterable> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::waitForMerges).iterator(); + return Mono.whenDelayError(it); + } + + @Override + public Mono waitForLastMerges() { + Iterable> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::waitForLastMerges).iterator(); + return Mono.whenDelayError(it); } @Override public Mono refresh(boolean force) { - return luceneIndicesFlux.flatMap(index -> index.refresh(force)).then(); + Iterable> it = () -> luceneIndicesSet.stream().map(index -> index.refresh(force)).iterator(); + return Mono.whenDelayError(it); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index c9fc4ba..e3b20a5 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -451,8 +451,14 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep 0; - var closed = new AtomicBoolean(); - LLIndexSearcher llIndexSearcher; - if (fromSnapshot) { - llIndexSearcher = new SnapshotIndexSearcher(indexSearcher, closed); - } else { - llIndexSearcher = new MainIndexSearcher(indexSearcher, closed); - } - CLEANER.register(llIndexSearcher, () -> { - if (closed.compareAndSet(false, true)) { - LOG.warn("An index searcher was not closed!"); - if (!fromSnapshot) { - try { - searcherManager.release(indexSearcher); - } catch (IOException e) { - LOG.error("Failed to release the index searcher", e); + try { + IndexSearcher indexSearcher; + boolean fromSnapshot; + if (snapshot == null) { + indexSearcher = searcherManager.acquire(); + fromSnapshot = false; + } else { + indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR); + fromSnapshot = true; + } + indexSearcher.setSimilarity(similarity); + assert indexSearcher.getIndexReader().getRefCount() > 0; + var closed = new AtomicBoolean(); + LLIndexSearcher llIndexSearcher; + if (fromSnapshot) { + llIndexSearcher = new SnapshotIndexSearcher(indexSearcher, closed); + } else { + llIndexSearcher = new MainIndexSearcher(indexSearcher, closed); + } + CLEANER.register(llIndexSearcher, () -> { + if (closed.compareAndSet(false, true)) { + LOG.warn("An index searcher was not closed!"); + if (!fromSnapshot) { + try { + searcherManager.release(indexSearcher); + } catch (IOException e) { + LOG.error("Failed to release the index searcher", e); + } } } - } - }); - return llIndexSearcher; + }); + return llIndexSearcher; + } catch (Throwable ex) { + activeSearchers.decrementAndGet(); + throw ex; + } }); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 28d82fd..676858b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -63,6 +63,7 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; import org.apache.lucene.index.MergeScheduler; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SerialMergeScheduler; import org.apache.lucene.index.SimpleMergedSegmentWarmer; import org.apache.lucene.index.SnapshotDeletionPolicy; @@ -234,32 +235,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { this.commitTime = Timer.builder("index.write.commit.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry); this.mergeTime = Timer.builder("index.write.merge.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry); this.refreshTime = Timer.builder("index.search.refresh.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry); - meterRegistry.gauge("index.snapshot.counter", List.of(Tag.of("index.name", clusterName)), snapshotter, SnapshotDeletionPolicy::getSnapshotCount); - meterRegistry.gauge("index.write.flushing.bytes", List.of(Tag.of("index.name", clusterName)), indexWriter, IndexWriter::getFlushingBytes); - meterRegistry.gauge("index.write.sequence.completed.max", List.of(Tag.of("index.name", clusterName)), indexWriter, IndexWriter::getMaxCompletedSequenceNumber); - meterRegistry.gauge("index.write.doc.pending.counter", List.of(Tag.of("index.name", clusterName)), indexWriter, IndexWriter::getPendingNumDocs); - meterRegistry.gauge("index.write.segment.merging.counter", List.of(Tag.of("index.name", clusterName)), indexWriter, iw -> iw.getMergingSegments().size()); - meterRegistry.gauge("index.directory.deletion.pending.counter", List.of(Tag.of("index.name", clusterName)), indexWriter, iw -> { - try { - return iw.getDirectory().getPendingDeletions().size(); - } catch (IOException | NullPointerException e) { - return 0; - } - }); - meterRegistry.gauge("index.doc.counter", List.of(Tag.of("index.name", clusterName)), indexWriter, iw -> { - try { - return iw.getDocStats().numDocs; - } catch (NullPointerException e) { - return 0; - } - }); - meterRegistry.gauge("index.doc.max", List.of(Tag.of("index.name", clusterName)), indexWriter, iw -> { - try { - return iw.getDocStats().maxDoc; - } catch (NullPointerException e) { - return 0; - } - }); + meterRegistry.gauge("index.snapshot.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getSnapshotsCount); + meterRegistry.gauge("index.write.flushing.bytes", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getIndexWriterFlushingBytes); + meterRegistry.gauge("index.write.sequence.completed.max", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getIndexWriterMaxCompletedSequenceNumber); + meterRegistry.gauge("index.write.doc.pending.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getIndexWriterPendingNumDocs); + meterRegistry.gauge("index.write.segment.merging.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getIndexWriterMergingSegmentsSize); + meterRegistry.gauge("index.directory.deletion.pending.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getDirectoryPendingDeletionsCount); + meterRegistry.gauge("index.doc.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getDocCount); + meterRegistry.gauge("index.doc.max", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getMaxDoc); meterRegistry.gauge("index.searcher.refreshes.active.count", List.of(Tag.of("index.name", clusterName)), searcherManager, @@ -581,6 +564,53 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .transform(this::ensureOpen); } + @Override + public Mono waitForMerges() { + return Mono + .fromCallable(() -> { + if (activeTasks.isTerminated()) return null; + shutdownLock.lock(); + try { + if (closeRequested.get()) { + return null; + } + var mergeScheduler = indexWriter.getConfig().getMergeScheduler(); + if (mergeScheduler instanceof ConcurrentMergeScheduler concurrentMergeScheduler) { + concurrentMergeScheduler.sync(); + } + } finally { + shutdownLock.unlock(); + } + return null; + }) + .subscribeOn(luceneHeavyTasksScheduler) + .transform(this::ensureOpen); + } + + @Override + public Mono waitForLastMerges() { + return Mono + .fromCallable(() -> { + if (activeTasks.isTerminated()) return null; + shutdownLock.lock(); + try { + if (closeRequested.get()) { + return null; + } + indexWriter.getConfig().setMergePolicy(NoMergePolicy.INSTANCE); + var mergeScheduler = indexWriter.getConfig().getMergeScheduler(); + if (mergeScheduler instanceof ConcurrentMergeScheduler concurrentMergeScheduler) { + concurrentMergeScheduler.sync(); + } + } finally { + shutdownLock.unlock(); + } + return null; + }) + .subscribeOn(luceneHeavyTasksScheduler) + .transform(this::ensureOpen); + } + @Override public Mono refresh(boolean force) { return Mono @@ -657,6 +687,114 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return lowMemory; } + private double getSnapshotsCount() { + shutdownLock.lock(); + try { + if (closeRequested.get()) { + return 0d; + } + return snapshotsManager.getSnapshotsCount(); + } finally { + shutdownLock.unlock(); + } + } + + private double getIndexWriterFlushingBytes() { + shutdownLock.lock(); + try { + if (closeRequested.get()) { + return 0d; + } + return indexWriter.getFlushingBytes(); + } finally { + shutdownLock.unlock(); + } + } + + private double getIndexWriterMaxCompletedSequenceNumber() { + shutdownLock.lock(); + try { + if (closeRequested.get()) { + return 0d; + } + return indexWriter.getMaxCompletedSequenceNumber(); + } finally { + shutdownLock.unlock(); + } + } + + private double getIndexWriterPendingNumDocs() { + shutdownLock.lock(); + try { + if (closeRequested.get()) { + return 0d; + } + return indexWriter.getPendingNumDocs(); + } finally { + shutdownLock.unlock(); + } + } + + private double getIndexWriterMergingSegmentsSize() { + shutdownLock.lock(); + try { + if (closeRequested.get()) { + return 0d; + } + return indexWriter.getMergingSegments().size(); + } finally { + shutdownLock.unlock(); + } + } + + private double getDirectoryPendingDeletionsCount() { + shutdownLock.lock(); + try { + if (closeRequested.get()) { + return 0d; + } + return indexWriter.getDirectory().getPendingDeletions().size(); + } catch (IOException e) { + return 0d; + } finally { + shutdownLock.unlock(); + } + } + + private double getDocCount() { + shutdownLock.lock(); + try { + if (closeRequested.get()) { + return 0d; + } + var docStats = indexWriter.getDocStats(); + if (docStats != null) { + return docStats.numDocs; + } else { + return 0d; + } + } finally { + shutdownLock.unlock(); + } + } + + private double getMaxDoc() { + shutdownLock.lock(); + try { + if (closeRequested.get()) { + return 0d; + } + var docStats = indexWriter.getDocStats(); + if (docStats != null) { + return docStats.maxDoc; + } else { + return 0d; + } + } finally { + shutdownLock.unlock(); + } + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 844c33f..b2cddf0 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -249,9 +249,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public Mono deleteAll() { - return luceneIndicesFlux - .flatMap(LLLocalLuceneIndex::deleteAll) - .then(); + Iterable> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::deleteAll).iterator(); + return Mono.whenDelayError(it); } private LLSnapshot resolveSnapshot(LLSnapshot multiSnapshot, int instanceId) { @@ -316,8 +315,9 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public Mono close() { - return luceneIndicesFlux - .flatMap(LLLocalLuceneIndex::close) + Iterable> it = () -> luceneIndicesSet.stream().map(LLLocalLuceneIndex::close).iterator(); + var indicesCloseMono = Mono.whenDelayError(it); + return indicesCloseMono .then(Mono.fromCallable(() -> { if (multiSearcher instanceof Closeable closeable) { //noinspection BlockingMethodInNonBlockingContext @@ -331,16 +331,26 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public Mono flush() { - return luceneIndicesFlux - .flatMap(LLLocalLuceneIndex::flush) - .then(); + Iterable> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::flush).iterator(); + return Mono.whenDelayError(it); + } + + @Override + public Mono waitForMerges() { + Iterable> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::waitForMerges).iterator(); + return Mono.whenDelayError(it); + } + + @Override + public Mono waitForLastMerges() { + Iterable> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::waitForLastMerges).iterator(); + return Mono.whenDelayError(it); } @Override public Mono refresh(boolean force) { - return luceneIndicesFlux - .flatMap(index -> index.refresh(force)) - .then(); + Iterable> it = () -> luceneIndicesSet.stream().map(index -> index.refresh(force)).iterator(); + return Mono.whenDelayError(it); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java b/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java index e602b2f..3d28613 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java @@ -98,6 +98,13 @@ public class SnapshotsManager { }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())).publishOn(Schedulers.parallel()); } + /** + * Returns the total number of snapshots currently held. + */ + public int getSnapshotsCount() { + return snapshotter.getSnapshotCount(); + } + public void close() { if (!activeTasks.isTerminated()) { activeTasks.arriveAndAwaitAdvance(); diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java index 0c2d126..b564ffd 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java @@ -513,6 +513,16 @@ public class LLQuicConnection implements LLDatabaseConnection { return null; } + @Override + public Mono waitForMerges() { + return null; + } + + @Override + public Mono waitForLastMerges() { + return null; + } + @Override public Mono refresh(boolean force) { return null;