Compare commits
1 Commits
master
...
lmdb-paral
Author | SHA1 | Date | |
---|---|---|---|
|
a9c462940f |
@ -18,7 +18,7 @@ import static org.lmdbjava.EnvFlags.*;
|
||||
public class LLTempLMDBEnv implements Closeable {
|
||||
|
||||
private static final long TWENTY_GIBIBYTES = 20L * 1024L * 1024L * 1024L;
|
||||
private static final int MAX_DATABASES = 1024;
|
||||
private static final int MAX_DATABASES = 16384;
|
||||
|
||||
private final Path tempDirectory;
|
||||
private final Env<ByteBuf> env;
|
||||
|
@ -6,6 +6,7 @@ import static org.apache.lucene.search.TotalHits.Relation.*;
|
||||
import it.cavallium.dbengine.lucene.collector.FullFieldDocs;
|
||||
import java.util.Comparator;
|
||||
import org.apache.lucene.search.FieldComparator;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.SortField;
|
||||
import org.apache.lucene.search.TotalHits;
|
||||
@ -15,9 +16,17 @@ import reactor.core.publisher.Flux;
|
||||
|
||||
public interface FullDocs<T extends LLDoc> extends ResourceIterable<T> {
|
||||
|
||||
Comparator<LLDoc> SHARD_INDEX_TIE_BREAKER = Comparator.comparingInt(LLDoc::shardIndex);
|
||||
Comparator<LLDoc> DOC_ID_TIE_BREAKER = Comparator.comparingInt(LLDoc::doc);
|
||||
Comparator<LLDoc> DEFAULT_TIE_BREAKER = SHARD_INDEX_TIE_BREAKER.thenComparing(DOC_ID_TIE_BREAKER);
|
||||
/** Internal comparator with shardIndex */
|
||||
Comparator<LLDoc> SHARD_INDEX_TIE_BREAKER =
|
||||
Comparator.comparingInt(LLDoc::shardIndex);
|
||||
|
||||
/** Internal comparator with docID */
|
||||
Comparator<LLDoc> DOC_ID_TIE_BREAKER =
|
||||
Comparator.comparingInt(LLDoc::doc);
|
||||
|
||||
/** Default comparator */
|
||||
Comparator<LLDoc> DEFAULT_TIE_BREAKER =
|
||||
SHARD_INDEX_TIE_BREAKER.thenComparing(DOC_ID_TIE_BREAKER);
|
||||
|
||||
@Override
|
||||
Flux<T> iterate();
|
||||
|
@ -0,0 +1,76 @@
|
||||
package it.cavallium.dbengine.lucene.collector;
|
||||
|
||||
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
||||
import it.cavallium.dbengine.lucene.FullDocs;
|
||||
import it.cavallium.dbengine.lucene.LLDoc;
|
||||
import it.cavallium.dbengine.lucene.LLFieldDoc;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.apache.lucene.search.CollectorManager;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.TotalHits;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class LMDBFullFieldDocCollectorMultiManager implements
|
||||
CollectorMultiManager<FullFieldDocs<LLFieldDoc>, FullFieldDocs<LLFieldDoc>> {
|
||||
|
||||
|
||||
private final Sort sort;
|
||||
private final CollectorManager<LMDBFullFieldDocCollector, FullFieldDocs<LLFieldDoc>> sharedCollector;
|
||||
|
||||
public LMDBFullFieldDocCollectorMultiManager(LLTempLMDBEnv env, Sort sort, int limit, long totalHitsThreshold) {
|
||||
this.sort = sort;
|
||||
this.sharedCollector = LMDBFullFieldDocCollector.createSharedManager(env, sort, limit, totalHitsThreshold);
|
||||
}
|
||||
|
||||
public CollectorManager<LMDBFullFieldDocCollector, FullFieldDocs<LLFieldDoc>> get(int shardIndex) {
|
||||
return new CollectorManager<>() {
|
||||
@Override
|
||||
public LMDBFullFieldDocCollector newCollector() throws IOException {
|
||||
return sharedCollector.newCollector();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FullFieldDocs<LLFieldDoc> reduce(Collection<LMDBFullFieldDocCollector> collectors) throws IOException {
|
||||
@SuppressWarnings("unchecked")
|
||||
final FullDocs<LLFieldDoc>[] fullDocs = new FullDocs[collectors.size()];
|
||||
int i = 0;
|
||||
for (var collector : collectors) {
|
||||
fullDocs[i++] = collector.fullDocs();
|
||||
}
|
||||
var result = (FullFieldDocs<LLFieldDoc>) FullDocs.merge(sort, fullDocs);
|
||||
return new FullFieldDocs<>(new FullDocs<>() {
|
||||
@Override
|
||||
public Flux<LLFieldDoc> iterate() {
|
||||
return result.iterate().map(doc -> new LLFieldDoc(doc.doc(), doc.score(), shardIndex, doc.fields()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<LLFieldDoc> iterate(long skips) {
|
||||
return result.iterate(skips).map(doc -> new LLFieldDoc(doc.doc(), doc.score(), shardIndex, doc.fields()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TotalHits totalHits() {
|
||||
return result.totalHits();
|
||||
}
|
||||
}, result.fields());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScoreMode scoreMode() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FullFieldDocs<LLFieldDoc> reduce(List<FullFieldDocs<LLFieldDoc>> results) {
|
||||
//noinspection unchecked
|
||||
return (FullFieldDocs<LLFieldDoc>) FullDocs
|
||||
.merge(sort, (FullDocs<LLFieldDoc>[]) results.toArray(FullDocs<?>[]::new));
|
||||
}
|
||||
}
|
@ -0,0 +1,67 @@
|
||||
package it.cavallium.dbengine.lucene.collector;
|
||||
|
||||
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
||||
import it.cavallium.dbengine.lucene.FullDocs;
|
||||
import it.cavallium.dbengine.lucene.LLDoc;
|
||||
import it.cavallium.dbengine.lucene.LLFieldDoc;
|
||||
import it.cavallium.dbengine.lucene.LLScoreDoc;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.apache.lucene.search.CollectorManager;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.TotalHits;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class LMDBFullScoreDocCollectorMultiManager implements
|
||||
CollectorMultiManager<FullDocs<LLScoreDoc>, FullDocs<LLScoreDoc>> {
|
||||
|
||||
private final CollectorManager<LMDBFullScoreDocCollector, FullDocs<LLScoreDoc>> sharedCollector;
|
||||
|
||||
public LMDBFullScoreDocCollectorMultiManager(LLTempLMDBEnv env, long limit, long totalHitsThreshold) {
|
||||
this.sharedCollector = LMDBFullScoreDocCollector.createSharedManager(env, limit, totalHitsThreshold);
|
||||
}
|
||||
|
||||
public CollectorManager<LMDBFullScoreDocCollector, FullDocs<LLScoreDoc>> get(int shardIndex) {
|
||||
return new CollectorManager<>() {
|
||||
@Override
|
||||
public LMDBFullScoreDocCollector newCollector() throws IOException {
|
||||
return sharedCollector.newCollector();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FullDocs<LLScoreDoc> reduce(Collection<LMDBFullScoreDocCollector> collectors) throws IOException {
|
||||
var result = sharedCollector.reduce(collectors);
|
||||
return new FullDocs<>() {
|
||||
@Override
|
||||
public Flux<LLScoreDoc> iterate() {
|
||||
return result.iterate().map(doc -> new LLScoreDoc(doc.doc(), doc.score(), shardIndex));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<LLScoreDoc> iterate(long skips) {
|
||||
return result.iterate(skips).map(doc -> new LLScoreDoc(doc.doc(), doc.score(), shardIndex));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TotalHits totalHits() {
|
||||
return result.totalHits();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScoreMode scoreMode() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FullDocs<LLScoreDoc> reduce(List<FullDocs<LLScoreDoc>> results) {
|
||||
//noinspection unchecked
|
||||
return FullDocs.merge(Sort.RELEVANCE, (FullDocs<LLScoreDoc>[]) results.toArray(FullDocs<?>[]::new));
|
||||
}
|
||||
}
|
@ -9,12 +9,15 @@ import it.cavallium.dbengine.lucene.FullDocs;
|
||||
import it.cavallium.dbengine.lucene.LLFieldDoc;
|
||||
import it.cavallium.dbengine.lucene.LLScoreDoc;
|
||||
import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||
import it.cavallium.dbengine.lucene.collector.FullFieldDocs;
|
||||
import it.cavallium.dbengine.lucene.collector.LMDBFullFieldDocCollector;
|
||||
import it.cavallium.dbengine.lucene.collector.LMDBFullFieldDocCollectorMultiManager;
|
||||
import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector;
|
||||
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ServiceLoader;
|
||||
import org.apache.lucene.search.CollectorManager;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.TopFieldCollector;
|
||||
import org.warp.commonutils.log.Logger;
|
||||
@ -65,28 +68,26 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher {
|
||||
.fromCallable(() -> {
|
||||
LLUtils.ensureBlocking();
|
||||
var totalHitsThreshold = queryParams.getTotalHitsThresholdLong();
|
||||
return LMDBFullFieldDocCollector.createSharedManager(env, queryParams.sort(), queryParams.limitInt(),
|
||||
return new LMDBFullFieldDocCollectorMultiManager(env, queryParams.sort(), queryParams.limitInt(),
|
||||
totalHitsThreshold);
|
||||
})
|
||||
.flatMap(sharedManager -> Flux
|
||||
.flatMap(cmm -> Flux
|
||||
.fromIterable(indexSearchers)
|
||||
.flatMap(shard -> Mono.fromCallable(() -> {
|
||||
.index()
|
||||
.flatMap(shardWithIndex -> Mono.fromCallable(() -> {
|
||||
LLUtils.ensureBlocking();
|
||||
|
||||
var collector = sharedManager.newCollector();
|
||||
assert queryParams.complete() == collector.scoreMode().isExhaustive();
|
||||
assert queryParams
|
||||
.getScoreModeOptional()
|
||||
.map(scoreMode -> scoreMode == collector.scoreMode())
|
||||
.orElse(true);
|
||||
var index = (int) (long) shardWithIndex.getT1();
|
||||
var shard = shardWithIndex.getT2();
|
||||
|
||||
shard.search(queryParams.query(), collector);
|
||||
return collector;
|
||||
var cm = cmm.get(index);
|
||||
|
||||
return shard.search(queryParams.query(), cm);
|
||||
}))
|
||||
.collectList()
|
||||
.flatMap(collectors -> Mono.fromCallable(() -> {
|
||||
.flatMap(results -> Mono.fromCallable(() -> {
|
||||
LLUtils.ensureBlocking();
|
||||
return sharedManager.reduce(collectors);
|
||||
return cmm.reduce(results);
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||
import it.cavallium.dbengine.lucene.FullDocs;
|
||||
import it.cavallium.dbengine.lucene.LLScoreDoc;
|
||||
import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector;
|
||||
import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollectorMultiManager;
|
||||
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
@ -70,27 +71,25 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher {
|
||||
.fromCallable(() -> {
|
||||
LLUtils.ensureBlocking();
|
||||
var totalHitsThreshold = queryParams.getTotalHitsThresholdLong();
|
||||
return LMDBFullScoreDocCollector.createSharedManager(env, queryParams.limitLong(), totalHitsThreshold);
|
||||
return new LMDBFullScoreDocCollectorMultiManager(env, queryParams.limitLong(), totalHitsThreshold);
|
||||
})
|
||||
.flatMap(sharedManager -> Flux
|
||||
.flatMap(cmm -> Flux
|
||||
.fromIterable(indexSearchers)
|
||||
.flatMap(shard -> Mono.fromCallable(() -> {
|
||||
.index()
|
||||
.flatMap(shardWithIndex -> Mono.fromCallable(() -> {
|
||||
LLUtils.ensureBlocking();
|
||||
|
||||
var collector = sharedManager.newCollector();
|
||||
assert queryParams.complete() == collector.scoreMode().isExhaustive();
|
||||
assert queryParams
|
||||
.getScoreModeOptional()
|
||||
.map(scoreMode -> scoreMode == collector.scoreMode())
|
||||
.orElse(true);
|
||||
var index = (int) (long) shardWithIndex.getT1();
|
||||
var shard = shardWithIndex.getT2();
|
||||
|
||||
shard.search(queryParams.query(), collector);
|
||||
return collector;
|
||||
var cm = cmm.get(index);
|
||||
|
||||
return shard.search(queryParams.query(), cm);
|
||||
}))
|
||||
.collectList()
|
||||
.flatMap(collectors -> Mono.fromCallable(() -> {
|
||||
.flatMap(results -> Mono.fromCallable(() -> {
|
||||
LLUtils.ensureBlocking();
|
||||
return sharedManager.reduce(collectors);
|
||||
return cmm.reduce(results);
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user