Schedule commits and merges

This commit is contained in:
Andrea Cavalli 2021-12-15 16:47:59 +01:00
parent 8ad622db0a
commit b7ca57a215
2 changed files with 40 additions and 22 deletions

View File

@ -250,6 +250,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
var commitMillis = luceneOptions.commitDebounceTime().toMillis(); var commitMillis = luceneOptions.commitDebounceTime().toMillis();
luceneHeavyTasksScheduler.schedulePeriodically(this::scheduledCommit, commitMillis, commitMillis, luceneHeavyTasksScheduler.schedulePeriodically(this::scheduledCommit, commitMillis, commitMillis,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
// Maybe merge every 5 commits
luceneHeavyTasksScheduler.schedulePeriodically(this::scheduledMerge, commitMillis * 5, commitMillis * 5,
TimeUnit.MILLISECONDS);
} }
private Similarity getLuceneSimilarity() { private Similarity getLuceneSimilarity() {
@ -263,7 +266,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override @Override
public Mono<LLSnapshot> takeSnapshot() { public Mono<LLSnapshot> takeSnapshot() {
return snapshotsManager.takeSnapshot().subscribeOn(luceneHeavyTasksScheduler).transform(this::ensureOpen); return snapshotsManager.takeSnapshot().transform(this::ensureOpen);
} }
private <V> Mono<V> ensureOpen(Mono<V> mono) { private <V> Mono<V> ensureOpen(Mono<V> mono) {
@ -503,6 +506,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} }
} }
private void scheduledMerge() {
try {
indexWriter.maybeMerge();
} catch (IOException ex) {
logger.error(MARKER_LUCENE, "Failed to execute a scheduled merge", ex);
}
}
@Override @Override
public boolean isLowMemoryMode() { public boolean isLowMemoryMode() {
return lowMemory; return lowMemory;

View File

@ -57,29 +57,34 @@ public class SnapshotsManager {
*/ */
private Mono<IndexCommit> takeLuceneSnapshot() { private Mono<IndexCommit> takeLuceneSnapshot() {
return Mono return Mono
.fromCallable(snapshotter::snapshot) .<IndexCommit>create(sink -> Schedulers.boundedElastic().schedule(() -> {
.subscribeOn(Schedulers.boundedElastic()) try {
.onErrorResume(ex -> Mono sink.success(snapshotter.snapshot());
.defer(() -> { } catch (Throwable ex) {
if (ex instanceof IllegalStateException && "No index commit to snapshot".equals(ex.getMessage())) { sink.error(ex);
return Mono.fromCallable(() -> { }
activeTasks.register(); }))
try { .onErrorResume(ex -> {
indexWriter.commit(); if (ex instanceof IllegalStateException && "No index commit to snapshot".equals(ex.getMessage())) {
return snapshotter.snapshot(); return Mono.create(sink -> Schedulers.boundedElastic().schedule(() -> {
} finally { activeTasks.register();
activeTasks.arriveAndDeregister(); try {
} indexWriter.commit();
}); sink.success(snapshotter.snapshot());
} else { } catch (Throwable e) {
return Mono.error(ex); sink.error(e);
} finally {
activeTasks.arriveAndDeregister();
} }
}) }));
); } else {
return Mono.error(ex);
}
});
} }
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) { public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono.<Void>fromCallable(() -> { return Mono.create(sink -> Schedulers.boundedElastic().schedule(() -> {
activeTasks.register(); activeTasks.register();
try { try {
var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber()); var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber());
@ -93,11 +98,13 @@ public class SnapshotsManager {
snapshotter.release(luceneIndexSnapshot); snapshotter.release(luceneIndexSnapshot);
// Delete unused files after releasing the snapshot // Delete unused files after releasing the snapshot
indexWriter.deleteUnusedFiles(); indexWriter.deleteUnusedFiles();
return null; sink.success();
} catch (Throwable ex) {
sink.error(ex);
} finally { } finally {
activeTasks.arriveAndDeregister(); activeTasks.arriveAndDeregister();
} }
}).subscribeOn(Schedulers.boundedElastic()); }));
} }
public void close() { public void close() {