CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java

472 lines
15 KiB
Java
Raw Normal View History

2020-12-07 22:15:18 +01:00
package it.cavallium.dbengine.database.disk;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
2021-03-18 13:12:40 +01:00
import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
2021-03-02 01:53:36 +01:00
import it.cavallium.dbengine.client.query.current.data.QueryParams;
2021-01-24 03:15:05 +01:00
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResult;
import it.cavallium.dbengine.database.LLSearchResultShard;
2021-01-24 03:15:05 +01:00
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
2020-12-07 22:15:18 +01:00
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
2021-03-18 13:12:40 +01:00
import java.util.Objects;
2021-02-03 13:48:30 +01:00
import java.util.Optional;
2020-12-07 22:15:18 +01:00
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
2020-12-07 22:15:18 +01:00
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
2020-12-07 22:15:18 +01:00
import java.util.concurrent.atomic.AtomicLong;
2021-04-14 02:37:03 +02:00
import lombok.Value;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.IndexSearcher;
2020-12-07 22:15:18 +01:00
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.batch.ParallelUtils;
import org.warp.commonutils.functional.IOBiConsumer;
import reactor.core.publisher.Flux;
2021-01-30 20:01:22 +01:00
import reactor.core.publisher.GroupedFlux;
2020-12-07 22:15:18 +01:00
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
2021-01-30 20:01:22 +01:00
import reactor.util.function.Tuple2;
2020-12-07 22:15:18 +01:00
import reactor.util.function.Tuples;
2021-03-18 13:12:40 +01:00
@SuppressWarnings("UnstableApiUsage")
2020-12-07 22:15:18 +01:00
public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
private final Long2ObjectMap<LLSnapshot[]> registeredSnapshots = new Long2ObjectOpenHashMap<>();
private final AtomicLong nextSnapshotNumber = new AtomicLong(1);
private final LLLocalLuceneIndex[] luceneIndices;
private final AtomicLong nextActionId = new AtomicLong(0);
2021-03-18 13:12:40 +01:00
private final ConcurrentHashMap<Long, Cache<String, Optional<CollectionStatistics>>[]> statistics = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, AtomicInteger> completedStreams = new ConcurrentHashMap<>();
2020-12-07 22:15:18 +01:00
private final int maxQueueSize = 1000;
public LLLocalMultiLuceneIndex(Path lucene,
String name,
int instancesCount,
TextFieldsAnalyzer textFieldsAnalyzer,
TextFieldsSimilarity textFieldsSimilarity,
2020-12-07 22:15:18 +01:00
Duration queryRefreshDebounceTime,
Duration commitDebounceTime,
boolean lowMemory, boolean inMemory) throws IOException {
2020-12-07 22:15:18 +01:00
if (instancesCount <= 1 || instancesCount > 100) {
throw new IOException("Unsupported instances count: " + instancesCount);
}
LLLocalLuceneIndex[] luceneIndices = new LLLocalLuceneIndex[instancesCount];
for (int i = 0; i < instancesCount; i++) {
int finalI = i;
2020-12-07 22:15:18 +01:00
String instanceName;
if (i == 0) {
instanceName = name;
} else {
instanceName = name + "_" + String.format("%03d", i);
}
luceneIndices[i] = new LLLocalLuceneIndex(lucene,
instanceName,
textFieldsAnalyzer,
textFieldsSimilarity,
2020-12-07 22:15:18 +01:00
queryRefreshDebounceTime,
commitDebounceTime,
lowMemory, inMemory, (indexSearcher, field, distributedPre, actionId) -> distributedCustomCollectionStatistics(finalI,
indexSearcher,
field,
distributedPre,
actionId
)
2020-12-07 22:15:18 +01:00
);
}
this.luceneIndices = luceneIndices;
}
private long newAction() {
var statistics = new Cache[luceneIndices.length];
for (int i = 0; i < luceneIndices.length; i++) {
statistics[i] = CacheBuilder.newBuilder().build();
}
long actionId = nextActionId.getAndIncrement();
//noinspection unchecked
this.statistics.put(actionId, statistics);
this.completedStreams.put(actionId, new AtomicInteger(0));
return actionId;
}
private void completedAction(long actionId) {
var completedStreamsCount = completedStreams.get(actionId);
if (completedStreamsCount != null) {
if (completedStreamsCount.incrementAndGet() >= luceneIndices.length) {
this.statistics.remove(actionId);
this.completedStreams.remove(actionId);
}
}
}
private CollectionStatistics distributedCustomCollectionStatistics(int luceneIndex,
IndexSearcher indexSearcher, String field, boolean distributedPre, long actionId) throws IOException {
if (distributedPre) {
try {
2021-03-18 13:12:40 +01:00
var optional = statistics.get(actionId)[luceneIndex].get(field,
() -> Optional.ofNullable(indexSearcher.collectionStatistics(field))
);
return optional.orElse(null);
} catch ( InvalidCacheLoadException | ExecutionException e) {
throw new IOException(e);
}
} else {
long maxDoc = 0;
long docCount = 0;
long sumTotalTermFreq = 0;
long sumDocFreq = 0;
for (int i = 0; i < luceneIndices.length; i++) {
2021-03-18 13:12:40 +01:00
CollectionStatistics iCollStats = Objects
.requireNonNull(statistics.get(actionId)[i].getIfPresent(field))
.orElse(null);
if (iCollStats != null) {
maxDoc += iCollStats.maxDoc();
docCount += iCollStats.docCount();
sumTotalTermFreq += iCollStats.sumTotalTermFreq();
sumDocFreq += iCollStats.sumDocFreq();
}
}
return new CollectionStatistics(field,
(int) Math.max(1, Math.min(maxDoc, Integer.MAX_VALUE)),
Math.max(1, docCount),
Math.max(1, sumTotalTermFreq),
Math.max(1, sumDocFreq)
);
}
}
2020-12-07 22:15:18 +01:00
private LLLocalLuceneIndex getLuceneIndex(LLTerm id) {
return luceneIndices[getLuceneIndexId(id)];
}
private int getLuceneIndexId(LLTerm id) {
return Math.abs(id.getValue().hashCode()) % luceneIndices.length;
}
@Override
public String getLuceneIndexName() {
return luceneIndices[0].getLuceneIndexName();
2020-12-07 22:15:18 +01:00
}
@Override
public Mono<Void> addDocument(LLTerm id, LLDocument doc) {
return getLuceneIndex(id).addDocument(id, doc);
2020-12-07 22:15:18 +01:00
}
@Override
public Mono<Void> addDocuments(Mono<Map<LLTerm, LLDocument>> documents) {
return documents
.flatMapMany(map -> {
var sortedMap = new HashMap<LLLocalLuceneIndex, Map<LLTerm, LLDocument>>();
map.forEach((key, value) -> sortedMap
.computeIfAbsent(getLuceneIndex(key), _unused -> new HashMap<>())
.put(key, value)
);
return Flux.fromIterable(sortedMap.entrySet());
})
.flatMap(luceneIndexWithNewDocuments -> {
var luceneIndex = luceneIndexWithNewDocuments.getKey();
var docs = luceneIndexWithNewDocuments.getValue();
return luceneIndex.addDocuments(Mono.just(docs));
})
.then();
2020-12-07 22:15:18 +01:00
}
@Override
public Mono<Void> deleteDocument(LLTerm id) {
return getLuceneIndex(id).deleteDocument(id);
2020-12-07 22:15:18 +01:00
}
@Override
public Mono<Void> updateDocument(LLTerm id, LLDocument document) {
return getLuceneIndex(id).updateDocument(id, document);
2020-12-07 22:15:18 +01:00
}
@Override
public Mono<Void> updateDocuments(Mono<Map<LLTerm, LLDocument>> documents) {
return documents
.flatMapMany(map -> {
var sortedMap = new HashMap<LLLocalLuceneIndex, Map<LLTerm, LLDocument>>();
map.forEach((key, value) -> sortedMap
.computeIfAbsent(getLuceneIndex(key), _unused -> new HashMap<>())
.put(key, value)
);
return Flux.fromIterable(sortedMap.entrySet());
})
.flatMap(luceneIndexWithNewDocuments -> {
var luceneIndex = luceneIndexWithNewDocuments.getKey();
var docs = luceneIndexWithNewDocuments.getValue();
return luceneIndex.updateDocuments(Mono.just(docs));
})
.then();
2020-12-07 22:15:18 +01:00
}
@Override
public Mono<Void> deleteAll() {
return Flux
.fromArray(luceneIndices)
.flatMap(LLLocalLuceneIndex::deleteAll)
.then();
2020-12-07 22:15:18 +01:00
}
private LLSnapshot resolveSnapshot(LLSnapshot multiSnapshot, int instanceId) {
if (multiSnapshot != null) {
return registeredSnapshots.get(multiSnapshot.getSequenceNumber())[instanceId];
} else {
return null;
}
}
2021-02-03 13:48:30 +01:00
private Optional<LLSnapshot> resolveSnapshotOptional(LLSnapshot multiSnapshot, int instanceId) {
return Optional.ofNullable(resolveSnapshot(multiSnapshot, instanceId));
}
2020-12-07 22:15:18 +01:00
@Override
public Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot,
2021-03-02 01:53:36 +01:00
QueryParams queryParams,
String keyFieldName,
Flux<Tuple2<String, Set<String>>> mltDocumentFields) {
2021-04-01 19:48:25 +02:00
if (queryParams.getOffset() != 0) {
return Mono.error(new IllegalArgumentException("MultiLuceneIndex requires an offset equal to 0"));
}
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsShared;
2021-04-14 02:37:03 +02:00
Mono<DistributedSearch> distributedPre;
if (luceneIndices.length > 1) {
2021-04-14 02:37:03 +02:00
long actionId = newAction();
mltDocumentFieldsShared = mltDocumentFields.publish().refCount();
distributedPre = Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))
)
.flatMap(tuple -> tuple
.getT1()
2021-02-14 13:46:11 +01:00
.distributedPreMoreLikeThis(tuple.getT2().orElse(null),
2021-03-02 01:53:36 +01:00
queryParams,
2021-02-14 13:46:11 +01:00
keyFieldName,
2021-03-02 01:53:36 +01:00
mltDocumentFieldsShared,
2021-02-14 13:46:11 +01:00
actionId
)
)
2021-04-14 02:37:03 +02:00
.then(Mono.just(new DistributedSearch(actionId, 20)));
} else {
mltDocumentFieldsShared = mltDocumentFields;
2021-04-14 02:37:03 +02:00
distributedPre = Mono.just(new DistributedSearch(-1, 1));
}
2021-04-03 19:09:06 +02:00
//noinspection DuplicatedCode
2021-04-14 02:37:03 +02:00
return distributedPre
.flatMap(distributedSearch -> Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot)))
.flatMap(tuple -> tuple
.getT1()
.distributedMoreLikeThis(tuple.getT2().orElse(null),
2021-03-02 01:53:36 +01:00
queryParams,
keyFieldName,
2021-03-02 01:53:36 +01:00
mltDocumentFieldsShared,
2021-04-14 02:37:03 +02:00
distributedSearch.getActionId(),
distributedSearch.getScoreDivisor()
)
)
.reduce(LLSearchResult.accumulator())
.map(result -> {
2021-04-14 02:37:03 +02:00
if (distributedSearch.getActionId() != -1) {
Flux<LLSearchResultShard> resultsWithTermination = result
.getResults()
2021-04-14 02:37:03 +02:00
.map(flux -> new LLSearchResultShard(Flux
.using(
distributedSearch::getActionId,
actionId -> flux.getResults(),
this::completedAction
), flux.getTotalHitsCount())
);
2021-03-03 15:03:25 +01:00
return new LLSearchResult(resultsWithTermination);
} else {
return result;
}
})
2021-04-14 02:37:03 +02:00
.doOnError(ex -> {
if (distributedSearch.getActionId() != -1) {
completedAction(distributedSearch.getActionId());
}
})
);
2020-12-07 22:15:18 +01:00
}
2021-04-14 02:37:03 +02:00
@Value
private static class DistributedSearch {
long actionId;
int scoreDivisor;
}
2020-12-07 22:15:18 +01:00
@Override
public Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot,
2021-03-02 01:53:36 +01:00
QueryParams queryParams,
2020-12-07 22:15:18 +01:00
String keyFieldName) {
2021-04-01 19:48:25 +02:00
if (queryParams.getOffset() != 0) {
return Mono.error(new IllegalArgumentException("MultiLuceneIndex requires an offset equal to 0"));
}
2021-04-14 02:37:03 +02:00
Mono<DistributedSearch> distributedSearchMono;
2021-03-02 01:53:36 +01:00
if (luceneIndices.length <= 1 || !queryParams.getScoreMode().getComputeScores()) {
2021-04-14 02:37:03 +02:00
distributedSearchMono = Mono.just(new DistributedSearch(-1, 1));
} else {
2021-04-14 02:37:03 +02:00
var actionId = newAction();
distributedSearchMono = Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))
)
.flatMap(tuple -> tuple
.getT1()
2021-02-14 13:46:11 +01:00
.distributedPreSearch(tuple.getT2().orElse(null),
2021-03-02 01:53:36 +01:00
queryParams,
2021-02-14 13:46:11 +01:00
keyFieldName,
actionId
)
)
2021-04-14 02:37:03 +02:00
.then(Mono.just(new DistributedSearch(actionId, 20)));
}
2021-04-14 02:37:03 +02:00
return distributedSearchMono
.flatMap(distributedSearch -> Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))
)
.flatMap(tuple -> tuple
.getT1()
.distributedSearch(tuple.getT2().orElse(null),
2021-03-02 01:53:36 +01:00
queryParams,
keyFieldName,
2021-04-14 02:37:03 +02:00
distributedSearch.getActionId(),
distributedSearch.getScoreDivisor()
))
.reduce(LLSearchResult.accumulator())
.map(result -> {
2021-04-14 02:37:03 +02:00
if (distributedSearch.getActionId() != -1) {
Flux<LLSearchResultShard> resultsWithTermination = result
.getResults()
2021-04-14 02:37:03 +02:00
.map(flux -> new LLSearchResultShard(Flux
.using(
distributedSearch::getActionId,
actionId -> flux.getResults(),
this::completedAction
), flux.getTotalHitsCount())
);
2021-03-03 15:03:25 +01:00
return new LLSearchResult(resultsWithTermination);
} else {
return result;
}
})
2021-04-14 02:37:03 +02:00
.doOnError(ex -> {
if (distributedSearch.getActionId() != -1) {
completedAction(distributedSearch.getActionId());
}
})
);
2020-12-07 22:15:18 +01:00
}
@Override
public Mono<Void> close() {
return Flux
.fromArray(luceneIndices)
.flatMap(LLLocalLuceneIndex::close)
.then();
2020-12-07 22:15:18 +01:00
}
2021-02-03 13:48:30 +01:00
@Override
public Mono<Void> flush() {
return Flux
.fromArray(luceneIndices)
.flatMap(LLLocalLuceneIndex::flush)
.then();
}
@Override
public Mono<Void> refresh() {
return Flux
.fromArray(luceneIndices)
.flatMap(LLLocalLuceneIndex::refresh)
.then();
}
2020-12-07 22:15:18 +01:00
@Override
public Mono<LLSnapshot> takeSnapshot() {
return Mono
.fromCallable(() -> {
CopyOnWriteArrayList<LLSnapshot> instancesSnapshots
= new CopyOnWriteArrayList<>(new LLSnapshot[luceneIndices.length]);
var snapIndex = nextSnapshotNumber.getAndIncrement();
2020-12-07 22:15:18 +01:00
ParallelUtils.parallelizeIO((IOBiConsumer<LLLuceneIndex, Integer> s) -> {
for (int i = 0; i < luceneIndices.length; i++) {
s.consume(luceneIndices[i], i);
}
}, maxQueueSize, luceneIndices.length, 1, (instance, i) -> {
var instanceSnapshot = instance.takeSnapshot();
//todo: reimplement better (don't block and take them parallel)
instancesSnapshots.set(i, instanceSnapshot.block());
});
2020-12-07 22:15:18 +01:00
LLSnapshot[] instancesSnapshotsArray = instancesSnapshots.toArray(LLSnapshot[]::new);
registeredSnapshots.put(snapIndex, instancesSnapshotsArray);
2020-12-07 22:15:18 +01:00
return new LLSnapshot(snapIndex);
})
.subscribeOn(Schedulers.boundedElastic());
2020-12-07 22:15:18 +01:00
}
@Override
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono
.<Void>fromCallable(() -> {
LLSnapshot[] instancesSnapshots = registeredSnapshots.remove(snapshot.getSequenceNumber());
for (int i = 0; i < luceneIndices.length; i++) {
LLLocalLuceneIndex luceneIndex = luceneIndices[i];
//todo: reimplement better (don't block and take them parallel)
luceneIndex.releaseSnapshot(instancesSnapshots[i]).block();
}
return null;
})
.subscribeOn(Schedulers.boundedElastic());
2020-12-07 22:15:18 +01:00
}
@Override
public boolean isLowMemoryMode() {
return luceneIndices[0].isLowMemoryMode();
}
2021-04-01 19:48:25 +02:00
@Override
public boolean supportsOffset() {
return false;
}
2020-12-07 22:15:18 +01:00
}