Code cleanup

This commit is contained in:
Andrea Cavalli 2022-06-28 13:52:21 +02:00
parent 4a0710ed9a
commit aee08f3e48
4 changed files with 44 additions and 43 deletions

View File

@ -156,6 +156,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
indexSearcher = searcherManager.acquire();
fromSnapshot = false;
} else {
//noinspection resource
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
fromSnapshot = true;
}
@ -256,12 +257,13 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
private class SnapshotIndexSearcher extends LLIndexSearcher {
public SnapshotIndexSearcher(IndexSearcher indexSearcher, AtomicBoolean closed) {
public SnapshotIndexSearcher(IndexSearcher indexSearcher,
AtomicBoolean closed) {
super(indexSearcher, closed);
}
@Override
public void onClose() {
public void onClose() throws IOException {
dropCachedIndexSearcher();
}
}

View File

@ -14,7 +14,6 @@ import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.client.query.QueryParser;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
@ -27,7 +26,6 @@ import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.LLUpdateFields;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers.UnshardedIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.LuceneRocksDBManager;
import it.cavallium.dbengine.lucene.LuceneUtils;
@ -59,19 +57,16 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.index.SimpleMergedSegmentWarmer;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.util.InfoStream;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import it.cavallium.dbengine.utils.ShortNamedThreadFactory;
@ -172,8 +167,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
var indexWriterConfig = new IndexWriterConfig(luceneAnalyzer);
var snapshotter = new SnapshotDeletionPolicy(requireNonNull(indexWriterConfig.getIndexDeletionPolicy()));
indexWriterConfig.setIndexDeletionPolicy(snapshotter);
IndexDeletionPolicy deletionPolicy;
deletionPolicy = requireNonNull(indexWriterConfig.getIndexDeletionPolicy());
deletionPolicy = new SnapshotDeletionPolicy(deletionPolicy);
indexWriterConfig.setIndexDeletionPolicy(deletionPolicy);
indexWriterConfig.setCommitOnClose(true);
int writerSchedulerMaxThreadCount;
MergeScheduler mergeScheduler;
@ -216,7 +213,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
indexWriterConfig.setSimilarity(getLuceneSimilarity());
this.indexWriter = new IndexWriter(directory, indexWriterConfig);
this.snapshotsManager = new SnapshotsManager(indexWriter, snapshotter);
this.snapshotsManager = new SnapshotsManager(indexWriter, (SnapshotDeletionPolicy) deletionPolicy);
var searcherManager = new CachedIndexSearcherManager(indexWriter,
snapshotsManager,
luceneHeavyTasksScheduler,

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.database.disk;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
@ -8,14 +9,14 @@ import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.search.IndexSearcher;
import org.jetbrains.annotations.Nullable;
@SuppressWarnings("unused")
public class LuceneIndexSnapshot {
public class LuceneIndexSnapshot implements Closeable {
private final IndexCommit snapshot;
private boolean initialized;
private boolean failed;
private boolean closed;
private DirectoryReader indexReader;
private IndexSearcher indexSearcher;
public LuceneIndexSnapshot(IndexCommit snapshot) {
@ -45,6 +46,7 @@ public class LuceneIndexSnapshot {
if (!initialized) {
try {
var indexReader = DirectoryReader.open(snapshot);
this.indexReader = indexReader;
indexSearcher = new IndexSearcher(indexReader, searchExecutor);
initialized = true;
@ -59,7 +61,7 @@ public class LuceneIndexSnapshot {
closed = true;
if (initialized && !failed) {
indexSearcher.getIndexReader().close();
indexReader.close();
indexSearcher = null;
}
}

View File

@ -46,51 +46,51 @@ public class SnapshotsManager {
}
public Mono<LLSnapshot> takeSnapshot() {
return takeLuceneSnapshot().map(snapshot -> {
var snapshotSeqNo = lastSnapshotSeqNo.incrementAndGet();
this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot));
return new LLSnapshot(snapshotSeqNo);
});
return Mono
.fromCallable(this::takeLuceneSnapshot)
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel());
}
/**
* Use internally. This method commits before taking the snapshot if there are no commits in a new database,
* avoiding the exception.
*/
private Mono<IndexCommit> takeLuceneSnapshot() {
return Mono
.fromCallable(snapshotter::snapshot)
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.onErrorResume(ex -> {
if (ex instanceof IllegalStateException && "No index commit to snapshot".equals(ex.getMessage())) {
return Mono.fromCallable(() -> {
activeTasks.register();
try {
indexWriter.commit();
return snapshotter.snapshot();
} finally {
activeTasks.arriveAndDeregister();
}
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()));
} else {
return Mono.error(ex);
}
})
.publishOn(Schedulers.parallel());
private LLSnapshot takeLuceneSnapshot() throws IOException {
activeTasks.register();
try {
if (snapshotter.getSnapshots().isEmpty()) {
indexWriter.commit();
}
var snapshotSeqNo = lastSnapshotSeqNo.incrementAndGet();
IndexCommit snapshot = snapshotter.snapshot();
var prevSnapshot = this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot));
// Unexpectedly found a snapshot
if (prevSnapshot != null) {
try {
prevSnapshot.close();
} catch (IOException e) {
throw new IllegalStateException("Can't close snapshot", e);
}
}
return new LLSnapshot(snapshotSeqNo);
} finally {
activeTasks.arriveAndDeregister();
}
}
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono.<Void>fromCallable(() -> {
activeTasks.register();
try {
var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber());
try (var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber())) {
if (indexSnapshot == null) {
throw new IOException("LLSnapshot " + snapshot.getSequenceNumber() + " not found!");
}
var luceneIndexSnapshot = indexSnapshot.getSnapshot();
snapshotter.release(luceneIndexSnapshot);
indexSnapshot.close();
return null;
} finally {
activeTasks.arriveAndDeregister();
@ -102,7 +102,7 @@ public class SnapshotsManager {
* Returns the total number of snapshots currently held.
*/
public int getSnapshotsCount() {
return snapshotter.getSnapshotCount();
return Math.max(snapshots.size(), snapshotter.getSnapshotCount());
}
public void close() {