Add waitForMerges, waitForLastMerges, flush, fix #210, fix #209

This commit is contained in:
Andrea Cavalli 2022-06-21 14:35:07 +02:00
parent ea2065302a
commit 8083364ebf
11 changed files with 289 additions and 72 deletions

View File

@ -8,6 +8,8 @@ import reactor.core.publisher.Mono;
public interface CompositeDatabase extends DatabaseProperties {
Mono<Void> preClose();
Mono<Void> close();
/**

View File

@ -70,5 +70,9 @@ public interface LuceneIndex<T, U> extends LLSnapshottable {
Mono<Void> flush();
Mono<Void> waitForMerges();
Mono<Void> waitForLastMerges();
Mono<Void> refresh(boolean force);
}

View File

@ -174,6 +174,16 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
return luceneIndex.flush();
}
@Override
public Mono<Void> waitForMerges() {
return luceneIndex.waitForMerges();
}
@Override
public Mono<Void> waitForLastMerges() {
return luceneIndex.waitForLastMerges();
}
/**
* Refresh index searcher
*/

View File

@ -82,6 +82,14 @@ public interface LLLuceneIndex extends LLSnapshottable {
*/
Mono<Void> flush();
Mono<Void> waitForMerges();
/**
* Wait for the latest pending merge
* This disables future merges until shutdown!
*/
Mono<Void> waitForLastMerges();
/**
* Refresh index searcher
*/

View File

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
@ -117,7 +118,8 @@ public class LLMultiLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> deleteAll() {
return luceneIndicesFlux.flatMap(LLLuceneIndex::deleteAll).then();
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::deleteAll).iterator();
return Mono.whenDelayError(it);
}
@Override
@ -192,17 +194,32 @@ public class LLMultiLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> close() {
return luceneIndicesFlux.flatMap(LLLuceneIndex::close).then();
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::close).iterator();
return Mono.whenDelayError(it);
}
@Override
public Mono<Void> flush() {
return luceneIndicesFlux.flatMap(LLLuceneIndex::flush).then();
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::flush).iterator();
return Mono.whenDelayError(it);
}
@Override
public Mono<Void> waitForMerges() {
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::waitForMerges).iterator();
return Mono.whenDelayError(it);
}
@Override
public Mono<Void> waitForLastMerges() {
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::waitForLastMerges).iterator();
return Mono.whenDelayError(it);
}
@Override
public Mono<Void> refresh(boolean force) {
return luceneIndicesFlux.flatMap(index -> index.refresh(force)).then();
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(index -> index.refresh(force)).iterator();
return Mono.whenDelayError(it);
}
@Override

View File

@ -451,8 +451,14 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
private LLRange getPatchedRange(@NotNull LLRange range, @Nullable T keyMin, @Nullable T keyMax)
throws SerializationException {
Buffer keyMinBuf = requireNonNullElseGet(serializeSuffixForRange(keyMin), range::getMinCopy);
Buffer keyMaxBuf = requireNonNullElseGet(serializeSuffixForRange(keyMax), range::getMaxCopy);
Buffer keyMinBuf = serializeSuffixForRange(keyMin);
if (keyMinBuf == null) {
keyMinBuf = range.getMinCopy();
}
Buffer keyMaxBuf = serializeSuffixForRange(keyMax);
if (keyMaxBuf == null) {
keyMaxBuf = range.getMaxCopy();
}
return LLRange.ofUnsafe(keyMinBuf, keyMaxBuf);
}

View File

@ -149,37 +149,42 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
return null;
}
activeSearchers.incrementAndGet();
IndexSearcher indexSearcher;
boolean fromSnapshot;
if (snapshot == null) {
indexSearcher = searcherManager.acquire();
fromSnapshot = false;
} else {
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
fromSnapshot = true;
}
indexSearcher.setSimilarity(similarity);
assert indexSearcher.getIndexReader().getRefCount() > 0;
var closed = new AtomicBoolean();
LLIndexSearcher llIndexSearcher;
if (fromSnapshot) {
llIndexSearcher = new SnapshotIndexSearcher(indexSearcher, closed);
} else {
llIndexSearcher = new MainIndexSearcher(indexSearcher, closed);
}
CLEANER.register(llIndexSearcher, () -> {
if (closed.compareAndSet(false, true)) {
LOG.warn("An index searcher was not closed!");
if (!fromSnapshot) {
try {
searcherManager.release(indexSearcher);
} catch (IOException e) {
LOG.error("Failed to release the index searcher", e);
try {
IndexSearcher indexSearcher;
boolean fromSnapshot;
if (snapshot == null) {
indexSearcher = searcherManager.acquire();
fromSnapshot = false;
} else {
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
fromSnapshot = true;
}
indexSearcher.setSimilarity(similarity);
assert indexSearcher.getIndexReader().getRefCount() > 0;
var closed = new AtomicBoolean();
LLIndexSearcher llIndexSearcher;
if (fromSnapshot) {
llIndexSearcher = new SnapshotIndexSearcher(indexSearcher, closed);
} else {
llIndexSearcher = new MainIndexSearcher(indexSearcher, closed);
}
CLEANER.register(llIndexSearcher, () -> {
if (closed.compareAndSet(false, true)) {
LOG.warn("An index searcher was not closed!");
if (!fromSnapshot) {
try {
searcherManager.release(indexSearcher);
} catch (IOException e) {
LOG.error("Failed to release the index searcher", e);
}
}
}
}
});
return llIndexSearcher;
});
return llIndexSearcher;
} catch (Throwable ex) {
activeSearchers.decrementAndGet();
throw ex;
}
});
}

View File

@ -63,6 +63,7 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.index.SimpleMergedSegmentWarmer;
import org.apache.lucene.index.SnapshotDeletionPolicy;
@ -234,32 +235,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.commitTime = Timer.builder("index.write.commit.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry);
this.mergeTime = Timer.builder("index.write.merge.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry);
this.refreshTime = Timer.builder("index.search.refresh.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry);
meterRegistry.gauge("index.snapshot.counter", List.of(Tag.of("index.name", clusterName)), snapshotter, SnapshotDeletionPolicy::getSnapshotCount);
meterRegistry.gauge("index.write.flushing.bytes", List.of(Tag.of("index.name", clusterName)), indexWriter, IndexWriter::getFlushingBytes);
meterRegistry.gauge("index.write.sequence.completed.max", List.of(Tag.of("index.name", clusterName)), indexWriter, IndexWriter::getMaxCompletedSequenceNumber);
meterRegistry.gauge("index.write.doc.pending.counter", List.of(Tag.of("index.name", clusterName)), indexWriter, IndexWriter::getPendingNumDocs);
meterRegistry.gauge("index.write.segment.merging.counter", List.of(Tag.of("index.name", clusterName)), indexWriter, iw -> iw.getMergingSegments().size());
meterRegistry.gauge("index.directory.deletion.pending.counter", List.of(Tag.of("index.name", clusterName)), indexWriter, iw -> {
try {
return iw.getDirectory().getPendingDeletions().size();
} catch (IOException | NullPointerException e) {
return 0;
}
});
meterRegistry.gauge("index.doc.counter", List.of(Tag.of("index.name", clusterName)), indexWriter, iw -> {
try {
return iw.getDocStats().numDocs;
} catch (NullPointerException e) {
return 0;
}
});
meterRegistry.gauge("index.doc.max", List.of(Tag.of("index.name", clusterName)), indexWriter, iw -> {
try {
return iw.getDocStats().maxDoc;
} catch (NullPointerException e) {
return 0;
}
});
meterRegistry.gauge("index.snapshot.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getSnapshotsCount);
meterRegistry.gauge("index.write.flushing.bytes", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getIndexWriterFlushingBytes);
meterRegistry.gauge("index.write.sequence.completed.max", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getIndexWriterMaxCompletedSequenceNumber);
meterRegistry.gauge("index.write.doc.pending.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getIndexWriterPendingNumDocs);
meterRegistry.gauge("index.write.segment.merging.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getIndexWriterMergingSegmentsSize);
meterRegistry.gauge("index.directory.deletion.pending.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getDirectoryPendingDeletionsCount);
meterRegistry.gauge("index.doc.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getDocCount);
meterRegistry.gauge("index.doc.max", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getMaxDoc);
meterRegistry.gauge("index.searcher.refreshes.active.count",
List.of(Tag.of("index.name", clusterName)),
searcherManager,
@ -581,6 +564,53 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.transform(this::ensureOpen);
}
@Override
public Mono<Void> waitForMerges() {
return Mono
.<Void>fromCallable(() -> {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
if (closeRequested.get()) {
return null;
}
var mergeScheduler = indexWriter.getConfig().getMergeScheduler();
if (mergeScheduler instanceof ConcurrentMergeScheduler concurrentMergeScheduler) {
concurrentMergeScheduler.sync();
}
} finally {
shutdownLock.unlock();
}
return null;
})
.subscribeOn(luceneHeavyTasksScheduler)
.transform(this::ensureOpen);
}
@Override
public Mono<Void> waitForLastMerges() {
return Mono
.<Void>fromCallable(() -> {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
if (closeRequested.get()) {
return null;
}
indexWriter.getConfig().setMergePolicy(NoMergePolicy.INSTANCE);
var mergeScheduler = indexWriter.getConfig().getMergeScheduler();
if (mergeScheduler instanceof ConcurrentMergeScheduler concurrentMergeScheduler) {
concurrentMergeScheduler.sync();
}
} finally {
shutdownLock.unlock();
}
return null;
})
.subscribeOn(luceneHeavyTasksScheduler)
.transform(this::ensureOpen);
}
@Override
public Mono<Void> refresh(boolean force) {
return Mono
@ -657,6 +687,114 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return lowMemory;
}
private double getSnapshotsCount() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
return 0d;
}
return snapshotsManager.getSnapshotsCount();
} finally {
shutdownLock.unlock();
}
}
private double getIndexWriterFlushingBytes() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
return 0d;
}
return indexWriter.getFlushingBytes();
} finally {
shutdownLock.unlock();
}
}
private double getIndexWriterMaxCompletedSequenceNumber() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
return 0d;
}
return indexWriter.getMaxCompletedSequenceNumber();
} finally {
shutdownLock.unlock();
}
}
private double getIndexWriterPendingNumDocs() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
return 0d;
}
return indexWriter.getPendingNumDocs();
} finally {
shutdownLock.unlock();
}
}
private double getIndexWriterMergingSegmentsSize() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
return 0d;
}
return indexWriter.getMergingSegments().size();
} finally {
shutdownLock.unlock();
}
}
private double getDirectoryPendingDeletionsCount() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
return 0d;
}
return indexWriter.getDirectory().getPendingDeletions().size();
} catch (IOException e) {
return 0d;
} finally {
shutdownLock.unlock();
}
}
private double getDocCount() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
return 0d;
}
var docStats = indexWriter.getDocStats();
if (docStats != null) {
return docStats.numDocs;
} else {
return 0d;
}
} finally {
shutdownLock.unlock();
}
}
private double getMaxDoc() {
shutdownLock.lock();
try {
if (closeRequested.get()) {
return 0d;
}
var docStats = indexWriter.getDocStats();
if (docStats != null) {
return docStats.maxDoc;
} else {
return 0d;
}
} finally {
shutdownLock.unlock();
}
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -249,9 +249,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> deleteAll() {
return luceneIndicesFlux
.flatMap(LLLocalLuceneIndex::deleteAll)
.then();
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::deleteAll).iterator();
return Mono.whenDelayError(it);
}
private LLSnapshot resolveSnapshot(LLSnapshot multiSnapshot, int instanceId) {
@ -316,8 +315,9 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> close() {
return luceneIndicesFlux
.flatMap(LLLocalLuceneIndex::close)
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLocalLuceneIndex::close).iterator();
var indicesCloseMono = Mono.whenDelayError(it);
return indicesCloseMono
.then(Mono.fromCallable(() -> {
if (multiSearcher instanceof Closeable closeable) {
//noinspection BlockingMethodInNonBlockingContext
@ -331,16 +331,26 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> flush() {
return luceneIndicesFlux
.flatMap(LLLocalLuceneIndex::flush)
.then();
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::flush).iterator();
return Mono.whenDelayError(it);
}
@Override
public Mono<Void> waitForMerges() {
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::waitForMerges).iterator();
return Mono.whenDelayError(it);
}
@Override
public Mono<Void> waitForLastMerges() {
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(LLLuceneIndex::waitForLastMerges).iterator();
return Mono.whenDelayError(it);
}
@Override
public Mono<Void> refresh(boolean force) {
return luceneIndicesFlux
.flatMap(index -> index.refresh(force))
.then();
Iterable<Mono<Void>> it = () -> luceneIndicesSet.stream().map(index -> index.refresh(force)).iterator();
return Mono.whenDelayError(it);
}
@Override

View File

@ -98,6 +98,13 @@ public class SnapshotsManager {
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())).publishOn(Schedulers.parallel());
}
/**
* Returns the total number of snapshots currently held.
*/
public int getSnapshotsCount() {
return snapshotter.getSnapshotCount();
}
public void close() {
if (!activeTasks.isTerminated()) {
activeTasks.arriveAndAwaitAdvance();

View File

@ -513,6 +513,16 @@ public class LLQuicConnection implements LLDatabaseConnection {
return null;
}
@Override
public Mono<Void> waitForMerges() {
return null;
}
@Override
public Mono<Void> waitForLastMerges() {
return null;
}
@Override
public Mono<Void> refresh(boolean force) {
return null;