2020-12-07 22:15:18 +01:00
|
|
|
package it.cavallium.dbengine.database.disk;
|
|
|
|
|
2021-02-04 22:42:57 +01:00
|
|
|
import com.google.common.cache.Cache;
|
|
|
|
import com.google.common.cache.CacheBuilder;
|
2021-01-24 03:15:05 +01:00
|
|
|
import it.cavallium.dbengine.database.LLDocument;
|
|
|
|
import it.cavallium.dbengine.database.LLLuceneIndex;
|
2021-01-29 17:19:01 +01:00
|
|
|
import it.cavallium.dbengine.database.LLScoreMode;
|
|
|
|
import it.cavallium.dbengine.database.LLSearchResult;
|
2021-01-24 03:15:05 +01:00
|
|
|
import it.cavallium.dbengine.database.LLSnapshot;
|
|
|
|
import it.cavallium.dbengine.database.LLSort;
|
|
|
|
import it.cavallium.dbengine.database.LLTerm;
|
2021-02-04 22:42:57 +01:00
|
|
|
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
|
|
|
|
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
|
2021-02-03 13:48:30 +01:00
|
|
|
import it.cavallium.dbengine.lucene.serializer.Query;
|
2020-12-07 22:15:18 +01:00
|
|
|
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
|
|
|
|
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.nio.file.Path;
|
|
|
|
import java.time.Duration;
|
2021-02-03 13:48:30 +01:00
|
|
|
import java.util.Optional;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.util.Set;
|
2021-02-04 22:42:57 +01:00
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
2021-02-04 22:42:57 +01:00
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
2021-02-04 22:42:57 +01:00
|
|
|
import org.apache.lucene.search.CollectionStatistics;
|
|
|
|
import org.apache.lucene.search.IndexSearcher;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.jetbrains.annotations.Nullable;
|
|
|
|
import org.warp.commonutils.batch.ParallelUtils;
|
|
|
|
import org.warp.commonutils.functional.IOBiConsumer;
|
|
|
|
import reactor.core.publisher.Flux;
|
2021-01-30 20:01:22 +01:00
|
|
|
import reactor.core.publisher.GroupedFlux;
|
2020-12-07 22:15:18 +01:00
|
|
|
import reactor.core.publisher.Mono;
|
2021-01-29 17:19:01 +01:00
|
|
|
import reactor.core.scheduler.Schedulers;
|
2021-01-30 20:01:22 +01:00
|
|
|
import reactor.util.function.Tuple2;
|
2020-12-07 22:15:18 +01:00
|
|
|
import reactor.util.function.Tuples;
|
|
|
|
|
|
|
|
public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
|
|
|
|
|
|
|
private final Long2ObjectMap<LLSnapshot[]> registeredSnapshots = new Long2ObjectOpenHashMap<>();
|
|
|
|
private final AtomicLong nextSnapshotNumber = new AtomicLong(1);
|
|
|
|
private final LLLocalLuceneIndex[] luceneIndices;
|
|
|
|
|
2021-02-04 22:42:57 +01:00
|
|
|
private final AtomicLong nextActionId = new AtomicLong(0);
|
|
|
|
private final ConcurrentHashMap<Long, Cache<String, CollectionStatistics>[]> statistics = new ConcurrentHashMap<>();
|
|
|
|
private final ConcurrentHashMap<Long, AtomicInteger> completedStreams = new ConcurrentHashMap<>();
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
private final int maxQueueSize = 1000;
|
|
|
|
|
|
|
|
public LLLocalMultiLuceneIndex(Path lucene,
|
|
|
|
String name,
|
|
|
|
int instancesCount,
|
|
|
|
TextFieldsAnalyzer textFieldsAnalyzer,
|
2021-02-04 22:42:57 +01:00
|
|
|
TextFieldsSimilarity textFieldsSimilarity,
|
2020-12-07 22:15:18 +01:00
|
|
|
Duration queryRefreshDebounceTime,
|
|
|
|
Duration commitDebounceTime,
|
|
|
|
boolean lowMemory) throws IOException {
|
|
|
|
|
|
|
|
if (instancesCount <= 1 || instancesCount > 100) {
|
|
|
|
throw new IOException("Unsupported instances count: " + instancesCount);
|
|
|
|
}
|
|
|
|
|
|
|
|
LLLocalLuceneIndex[] luceneIndices = new LLLocalLuceneIndex[instancesCount];
|
|
|
|
for (int i = 0; i < instancesCount; i++) {
|
2021-02-04 22:42:57 +01:00
|
|
|
int finalI = i;
|
2020-12-07 22:15:18 +01:00
|
|
|
String instanceName;
|
|
|
|
if (i == 0) {
|
|
|
|
instanceName = name;
|
|
|
|
} else {
|
|
|
|
instanceName = name + "_" + String.format("%03d", i);
|
|
|
|
}
|
|
|
|
luceneIndices[i] = new LLLocalLuceneIndex(lucene,
|
|
|
|
instanceName,
|
|
|
|
textFieldsAnalyzer,
|
2021-02-04 22:42:57 +01:00
|
|
|
textFieldsSimilarity,
|
2020-12-07 22:15:18 +01:00
|
|
|
queryRefreshDebounceTime,
|
|
|
|
commitDebounceTime,
|
2021-02-04 22:42:57 +01:00
|
|
|
lowMemory,
|
|
|
|
(indexSearcher, field, distributedPre, actionId) -> distributedCustomCollectionStatistics(finalI,
|
|
|
|
indexSearcher,
|
|
|
|
field,
|
|
|
|
distributedPre,
|
|
|
|
actionId
|
|
|
|
)
|
2020-12-07 22:15:18 +01:00
|
|
|
);
|
|
|
|
}
|
|
|
|
this.luceneIndices = luceneIndices;
|
|
|
|
}
|
|
|
|
|
2021-02-04 22:42:57 +01:00
|
|
|
private long newAction() {
|
|
|
|
var statistics = new Cache[luceneIndices.length];
|
|
|
|
for (int i = 0; i < luceneIndices.length; i++) {
|
|
|
|
statistics[i] = CacheBuilder.newBuilder().build();
|
|
|
|
}
|
|
|
|
long actionId = nextActionId.getAndIncrement();
|
|
|
|
//noinspection unchecked
|
|
|
|
this.statistics.put(actionId, statistics);
|
|
|
|
this.completedStreams.put(actionId, new AtomicInteger(0));
|
|
|
|
return actionId;
|
|
|
|
}
|
|
|
|
|
|
|
|
private void completedAction(long actionId) {
|
|
|
|
var completedStreamsCount = completedStreams.get(actionId);
|
|
|
|
if (completedStreamsCount != null) {
|
|
|
|
if (completedStreamsCount.incrementAndGet() >= luceneIndices.length) {
|
|
|
|
this.statistics.remove(actionId);
|
|
|
|
this.completedStreams.remove(actionId);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private CollectionStatistics distributedCustomCollectionStatistics(int luceneIndex,
|
|
|
|
IndexSearcher indexSearcher, String field, boolean distributedPre, long actionId) throws IOException {
|
|
|
|
if (distributedPre) {
|
|
|
|
try {
|
|
|
|
return statistics.get(actionId)[luceneIndex].get(field, () -> indexSearcher.collectionStatistics(field));
|
|
|
|
} catch (ExecutionException e) {
|
|
|
|
throw new IOException();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
long maxDoc = 0;
|
|
|
|
long docCount = 0;
|
|
|
|
long sumTotalTermFreq = 0;
|
|
|
|
long sumDocFreq = 0;
|
|
|
|
for (int i = 0; i < luceneIndices.length; i++) {
|
|
|
|
CollectionStatistics iCollStats = statistics.get(actionId)[i].getIfPresent(field);
|
|
|
|
if (iCollStats != null) {
|
|
|
|
maxDoc += iCollStats.maxDoc();
|
|
|
|
docCount += iCollStats.docCount();
|
|
|
|
sumTotalTermFreq += iCollStats.sumTotalTermFreq();
|
|
|
|
sumDocFreq += iCollStats.sumDocFreq();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return new CollectionStatistics(field,
|
|
|
|
(int) Math.max(1, Math.min(maxDoc, Integer.MAX_VALUE)),
|
|
|
|
Math.max(1, docCount),
|
|
|
|
Math.max(1, sumTotalTermFreq),
|
|
|
|
Math.max(1, sumDocFreq)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
private LLLocalLuceneIndex getLuceneIndex(LLTerm id) {
|
|
|
|
return luceneIndices[getLuceneIndexId(id)];
|
|
|
|
}
|
|
|
|
|
|
|
|
private int getLuceneIndexId(LLTerm id) {
|
|
|
|
return Math.abs(id.getValue().hashCode()) % luceneIndices.length;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String getLuceneIndexName() {
|
2021-01-30 01:41:04 +01:00
|
|
|
return luceneIndices[0].getLuceneIndexName();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> addDocument(LLTerm id, LLDocument doc) {
|
|
|
|
return getLuceneIndex(id).addDocument(id, doc);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 20:01:22 +01:00
|
|
|
public Mono<Void> addDocuments(Flux<GroupedFlux<LLTerm, LLDocument>> documents) {
|
|
|
|
return documents.flatMap(docs -> getLuceneIndex(docs.key()).addDocuments(documents)).then();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> deleteDocument(LLTerm id) {
|
|
|
|
return getLuceneIndex(id).deleteDocument(id);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> updateDocument(LLTerm id, LLDocument document) {
|
|
|
|
return getLuceneIndex(id).updateDocument(id, document);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 20:01:22 +01:00
|
|
|
public Mono<Void> updateDocuments(Flux<GroupedFlux<LLTerm, LLDocument>> documents) {
|
|
|
|
return documents.flatMap(docs -> getLuceneIndex(docs.key()).updateDocuments(documents)).then();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> deleteAll() {
|
|
|
|
return Flux
|
|
|
|
.fromArray(luceneIndices)
|
|
|
|
.flatMap(LLLocalLuceneIndex::deleteAll)
|
|
|
|
.then();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
private LLSnapshot resolveSnapshot(LLSnapshot multiSnapshot, int instanceId) {
|
|
|
|
if (multiSnapshot != null) {
|
|
|
|
return registeredSnapshots.get(multiSnapshot.getSequenceNumber())[instanceId];
|
|
|
|
} else {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-03 13:48:30 +01:00
|
|
|
private Optional<LLSnapshot> resolveSnapshotOptional(LLSnapshot multiSnapshot, int instanceId) {
|
|
|
|
return Optional.ofNullable(resolveSnapshot(multiSnapshot, instanceId));
|
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
@Override
|
2021-01-29 17:19:01 +01:00
|
|
|
public Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot,
|
2021-01-30 20:01:22 +01:00
|
|
|
Flux<Tuple2<String, Set<String>>> mltDocumentFields,
|
2021-02-27 17:32:57 +01:00
|
|
|
@Nullable it.cavallium.dbengine.lucene.serializer.Query additionalQuery,
|
2021-02-22 01:37:17 +01:00
|
|
|
long limit,
|
2021-02-14 13:46:11 +01:00
|
|
|
@Nullable Float minCompetitiveScore,
|
2021-02-28 16:11:50 +01:00
|
|
|
boolean enableScoring,
|
2021-02-28 16:50:59 +01:00
|
|
|
boolean sortByScore,
|
2021-01-29 17:19:01 +01:00
|
|
|
String keyFieldName) {
|
2021-02-04 22:42:57 +01:00
|
|
|
long actionId;
|
|
|
|
int scoreDivisor;
|
|
|
|
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsShared;
|
|
|
|
Mono<Void> distributedPre;
|
|
|
|
if (luceneIndices.length > 1) {
|
|
|
|
actionId = newAction();
|
|
|
|
scoreDivisor = 20;
|
|
|
|
mltDocumentFieldsShared = mltDocumentFields.publish().refCount();
|
|
|
|
distributedPre = Flux
|
|
|
|
.fromArray(luceneIndices)
|
|
|
|
.index()
|
|
|
|
.flatMap(tuple -> Mono
|
|
|
|
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
|
|
|
|
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))
|
|
|
|
)
|
|
|
|
.flatMap(tuple -> tuple
|
|
|
|
.getT1()
|
2021-02-14 13:46:11 +01:00
|
|
|
.distributedPreMoreLikeThis(tuple.getT2().orElse(null),
|
|
|
|
mltDocumentFieldsShared,
|
2021-02-27 17:32:57 +01:00
|
|
|
additionalQuery,
|
2021-02-14 13:46:11 +01:00
|
|
|
minCompetitiveScore,
|
2021-02-28 16:11:50 +01:00
|
|
|
enableScoring,
|
2021-02-28 16:50:59 +01:00
|
|
|
sortByScore,
|
2021-02-14 13:46:11 +01:00
|
|
|
keyFieldName,
|
|
|
|
actionId
|
|
|
|
)
|
2021-02-04 22:42:57 +01:00
|
|
|
)
|
|
|
|
.then();
|
|
|
|
} else {
|
|
|
|
actionId = -1;
|
|
|
|
scoreDivisor = 1;
|
|
|
|
mltDocumentFieldsShared = mltDocumentFields;
|
|
|
|
distributedPre = Mono.empty();
|
|
|
|
}
|
|
|
|
|
|
|
|
return distributedPre.then(Flux
|
|
|
|
.fromArray(luceneIndices)
|
|
|
|
.index()
|
|
|
|
.flatMap(tuple -> Mono
|
|
|
|
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
|
|
|
|
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot)))
|
|
|
|
.flatMap(tuple -> tuple
|
|
|
|
.getT1()
|
|
|
|
.distributedMoreLikeThis(tuple.getT2().orElse(null),
|
|
|
|
mltDocumentFieldsShared,
|
2021-02-27 17:32:57 +01:00
|
|
|
additionalQuery,
|
2021-02-04 22:42:57 +01:00
|
|
|
limit,
|
2021-02-14 13:46:11 +01:00
|
|
|
minCompetitiveScore,
|
2021-02-28 16:11:50 +01:00
|
|
|
enableScoring,
|
2021-02-28 16:50:59 +01:00
|
|
|
sortByScore,
|
2021-02-04 22:42:57 +01:00
|
|
|
keyFieldName,
|
|
|
|
actionId,
|
|
|
|
scoreDivisor
|
|
|
|
)
|
|
|
|
)
|
|
|
|
.reduce(LLSearchResult.accumulator())
|
|
|
|
.map(result -> {
|
|
|
|
if (actionId != -1) {
|
|
|
|
var resultsWithTermination = result
|
|
|
|
.results()
|
|
|
|
.map(flux -> flux.doOnTerminate(() -> completedAction(actionId)));
|
|
|
|
return new LLSearchResult(result.totalHitsCount(), resultsWithTermination);
|
|
|
|
} else {
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
})
|
|
|
|
);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-29 17:19:01 +01:00
|
|
|
public Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot,
|
2021-02-03 13:48:30 +01:00
|
|
|
Query query,
|
2021-02-22 01:37:17 +01:00
|
|
|
long limit,
|
2020-12-07 22:15:18 +01:00
|
|
|
@Nullable LLSort sort,
|
2021-01-29 17:19:01 +01:00
|
|
|
LLScoreMode scoreMode,
|
2021-02-14 13:46:11 +01:00
|
|
|
@Nullable Float minCompetitiveScore,
|
2020-12-07 22:15:18 +01:00
|
|
|
String keyFieldName) {
|
2021-02-04 22:42:57 +01:00
|
|
|
long actionId;
|
|
|
|
int scoreDivisor;
|
|
|
|
Mono<Void> distributedPre;
|
|
|
|
if (luceneIndices.length <= 1 || scoreMode == LLScoreMode.COMPLETE_NO_SCORES) {
|
|
|
|
actionId = -1;
|
|
|
|
scoreDivisor = 1;
|
|
|
|
distributedPre = Mono.empty();
|
|
|
|
} else {
|
|
|
|
actionId = newAction();
|
|
|
|
scoreDivisor = 20;
|
|
|
|
distributedPre = Flux
|
|
|
|
.fromArray(luceneIndices)
|
|
|
|
.index()
|
|
|
|
.flatMap(tuple -> Mono
|
|
|
|
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
|
|
|
|
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))
|
|
|
|
)
|
|
|
|
.flatMap(tuple -> tuple
|
|
|
|
.getT1()
|
2021-02-14 13:46:11 +01:00
|
|
|
.distributedPreSearch(tuple.getT2().orElse(null),
|
|
|
|
query,
|
|
|
|
sort,
|
|
|
|
scoreMode,
|
|
|
|
minCompetitiveScore,
|
|
|
|
keyFieldName,
|
|
|
|
actionId
|
|
|
|
)
|
2021-02-04 22:42:57 +01:00
|
|
|
)
|
|
|
|
.then();
|
|
|
|
}
|
|
|
|
return distributedPre
|
|
|
|
.then(Flux
|
|
|
|
.fromArray(luceneIndices)
|
|
|
|
.index()
|
|
|
|
.flatMap(tuple -> Mono
|
|
|
|
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
|
|
|
|
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))
|
|
|
|
)
|
|
|
|
.flatMap(tuple -> tuple
|
|
|
|
.getT1()
|
|
|
|
.distributedSearch(tuple.getT2().orElse(null),
|
|
|
|
query,
|
2021-02-24 23:16:56 +01:00
|
|
|
limit,
|
2021-02-04 22:42:57 +01:00
|
|
|
sort,
|
|
|
|
scoreMode,
|
2021-02-14 13:46:11 +01:00
|
|
|
minCompetitiveScore,
|
2021-02-04 22:42:57 +01:00
|
|
|
keyFieldName,
|
|
|
|
actionId,
|
|
|
|
scoreDivisor
|
|
|
|
))
|
|
|
|
.reduce(LLSearchResult.accumulator())
|
|
|
|
.map(result -> {
|
|
|
|
if (actionId != -1) {
|
|
|
|
var resultsWithTermination = result
|
|
|
|
.results()
|
|
|
|
.map(flux -> flux.doOnTerminate(() -> completedAction(actionId)));
|
|
|
|
return new LLSearchResult(result.totalHitsCount(), resultsWithTermination);
|
|
|
|
} else {
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
})
|
|
|
|
);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> close() {
|
|
|
|
return Flux
|
|
|
|
.fromArray(luceneIndices)
|
|
|
|
.flatMap(LLLocalLuceneIndex::close)
|
|
|
|
.then();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2021-02-03 13:48:30 +01:00
|
|
|
@Override
|
|
|
|
public Mono<Void> flush() {
|
|
|
|
return Flux
|
|
|
|
.fromArray(luceneIndices)
|
|
|
|
.flatMap(LLLocalLuceneIndex::flush)
|
|
|
|
.then();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Void> refresh() {
|
|
|
|
return Flux
|
|
|
|
.fromArray(luceneIndices)
|
|
|
|
.flatMap(LLLocalLuceneIndex::refresh)
|
|
|
|
.then();
|
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<LLSnapshot> takeSnapshot() {
|
|
|
|
return Mono
|
|
|
|
.fromCallable(() -> {
|
2021-02-01 11:00:27 +01:00
|
|
|
CopyOnWriteArrayList<LLSnapshot> instancesSnapshots
|
|
|
|
= new CopyOnWriteArrayList<>(new LLSnapshot[luceneIndices.length]);
|
2021-01-30 01:41:04 +01:00
|
|
|
var snapIndex = nextSnapshotNumber.getAndIncrement();
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2021-01-30 01:41:04 +01:00
|
|
|
ParallelUtils.parallelizeIO((IOBiConsumer<LLLuceneIndex, Integer> s) -> {
|
|
|
|
for (int i = 0; i < luceneIndices.length; i++) {
|
|
|
|
s.consume(luceneIndices[i], i);
|
|
|
|
}
|
|
|
|
}, maxQueueSize, luceneIndices.length, 1, (instance, i) -> {
|
|
|
|
var instanceSnapshot = instance.takeSnapshot();
|
|
|
|
//todo: reimplement better (don't block and take them parallel)
|
|
|
|
instancesSnapshots.set(i, instanceSnapshot.block());
|
|
|
|
});
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2021-01-30 01:41:04 +01:00
|
|
|
LLSnapshot[] instancesSnapshotsArray = instancesSnapshots.toArray(LLSnapshot[]::new);
|
|
|
|
registeredSnapshots.put(snapIndex, instancesSnapshotsArray);
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2021-01-30 01:41:04 +01:00
|
|
|
return new LLSnapshot(snapIndex);
|
|
|
|
})
|
|
|
|
.subscribeOn(Schedulers.boundedElastic());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
|
|
|
|
return Mono
|
|
|
|
.<Void>fromCallable(() -> {
|
|
|
|
LLSnapshot[] instancesSnapshots = registeredSnapshots.remove(snapshot.getSequenceNumber());
|
|
|
|
for (int i = 0; i < luceneIndices.length; i++) {
|
|
|
|
LLLocalLuceneIndex luceneIndex = luceneIndices[i];
|
|
|
|
//todo: reimplement better (don't block and take them parallel)
|
|
|
|
luceneIndex.releaseSnapshot(instancesSnapshots[i]).block();
|
|
|
|
}
|
|
|
|
return null;
|
|
|
|
})
|
|
|
|
.subscribeOn(Schedulers.boundedElastic());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean isLowMemoryMode() {
|
|
|
|
return luceneIndices[0].isLowMemoryMode();
|
|
|
|
}
|
|
|
|
}
|