From a9c462940fe4b1f56ae63e77c307a7b39e52a530 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 9 Nov 2021 01:55:09 +0100 Subject: [PATCH] Parallel lmdb --- .../dbengine/database/disk/LLTempLMDBEnv.java | 2 +- .../cavallium/dbengine/lucene/FullDocs.java | 15 +++- ...LMDBFullFieldDocCollectorMultiManager.java | 76 +++++++++++++++++++ ...LMDBFullScoreDocCollectorMultiManager.java | 67 ++++++++++++++++ .../SortedScoredFullMultiSearcher.java | 27 +++---- .../UnsortedScoredFullMultiSearcher.java | 25 +++--- 6 files changed, 182 insertions(+), 30 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullFieldDocCollectorMultiManager.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollectorMultiManager.java diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java b/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java index 20afc36..5c9a7ad 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java @@ -18,7 +18,7 @@ import static org.lmdbjava.EnvFlags.*; public class LLTempLMDBEnv implements Closeable { private static final long TWENTY_GIBIBYTES = 20L * 1024L * 1024L * 1024L; - private static final int MAX_DATABASES = 1024; + private static final int MAX_DATABASES = 16384; private final Path tempDirectory; private final Env env; diff --git a/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java index 68523db..e477654 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java +++ b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java @@ -6,6 +6,7 @@ import static org.apache.lucene.search.TotalHits.Relation.*; import it.cavallium.dbengine.lucene.collector.FullFieldDocs; import java.util.Comparator; import org.apache.lucene.search.FieldComparator; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.TotalHits; @@ -15,9 +16,17 @@ import reactor.core.publisher.Flux; public interface FullDocs extends ResourceIterable { - Comparator SHARD_INDEX_TIE_BREAKER = Comparator.comparingInt(LLDoc::shardIndex); - Comparator DOC_ID_TIE_BREAKER = Comparator.comparingInt(LLDoc::doc); - Comparator DEFAULT_TIE_BREAKER = SHARD_INDEX_TIE_BREAKER.thenComparing(DOC_ID_TIE_BREAKER); + /** Internal comparator with shardIndex */ + Comparator SHARD_INDEX_TIE_BREAKER = + Comparator.comparingInt(LLDoc::shardIndex); + + /** Internal comparator with docID */ + Comparator DOC_ID_TIE_BREAKER = + Comparator.comparingInt(LLDoc::doc); + + /** Default comparator */ + Comparator DEFAULT_TIE_BREAKER = + SHARD_INDEX_TIE_BREAKER.thenComparing(DOC_ID_TIE_BREAKER); @Override Flux iterate(); diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullFieldDocCollectorMultiManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullFieldDocCollectorMultiManager.java new file mode 100644 index 0000000..05dd92e --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullFieldDocCollectorMultiManager.java @@ -0,0 +1,76 @@ +package it.cavallium.dbengine.lucene.collector; + +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.FullDocs; +import it.cavallium.dbengine.lucene.LLDoc; +import it.cavallium.dbengine.lucene.LLFieldDoc; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.TotalHits; +import reactor.core.publisher.Flux; + +public class LMDBFullFieldDocCollectorMultiManager implements + CollectorMultiManager, FullFieldDocs> { + + + private final Sort sort; + private final CollectorManager> sharedCollector; + + public LMDBFullFieldDocCollectorMultiManager(LLTempLMDBEnv env, Sort sort, int limit, long totalHitsThreshold) { + this.sort = sort; + this.sharedCollector = LMDBFullFieldDocCollector.createSharedManager(env, sort, limit, totalHitsThreshold); + } + + public CollectorManager> get(int shardIndex) { + return new CollectorManager<>() { + @Override + public LMDBFullFieldDocCollector newCollector() throws IOException { + return sharedCollector.newCollector(); + } + + @Override + public FullFieldDocs reduce(Collection collectors) throws IOException { + @SuppressWarnings("unchecked") + final FullDocs[] fullDocs = new FullDocs[collectors.size()]; + int i = 0; + for (var collector : collectors) { + fullDocs[i++] = collector.fullDocs(); + } + var result = (FullFieldDocs) FullDocs.merge(sort, fullDocs); + return new FullFieldDocs<>(new FullDocs<>() { + @Override + public Flux iterate() { + return result.iterate().map(doc -> new LLFieldDoc(doc.doc(), doc.score(), shardIndex, doc.fields())); + } + + @Override + public Flux iterate(long skips) { + return result.iterate(skips).map(doc -> new LLFieldDoc(doc.doc(), doc.score(), shardIndex, doc.fields())); + } + + @Override + public TotalHits totalHits() { + return result.totalHits(); + } + }, result.fields()); + } + }; + } + + @Override + public ScoreMode scoreMode() { + throw new NotImplementedException(); + } + + @Override + public FullFieldDocs reduce(List> results) { + //noinspection unchecked + return (FullFieldDocs) FullDocs + .merge(sort, (FullDocs[]) results.toArray(FullDocs[]::new)); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollectorMultiManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollectorMultiManager.java new file mode 100644 index 0000000..21c667b --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollectorMultiManager.java @@ -0,0 +1,67 @@ +package it.cavallium.dbengine.lucene.collector; + +import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; +import it.cavallium.dbengine.lucene.FullDocs; +import it.cavallium.dbengine.lucene.LLDoc; +import it.cavallium.dbengine.lucene.LLFieldDoc; +import it.cavallium.dbengine.lucene.LLScoreDoc; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.TotalHits; +import reactor.core.publisher.Flux; + +public class LMDBFullScoreDocCollectorMultiManager implements + CollectorMultiManager, FullDocs> { + + private final CollectorManager> sharedCollector; + + public LMDBFullScoreDocCollectorMultiManager(LLTempLMDBEnv env, long limit, long totalHitsThreshold) { + this.sharedCollector = LMDBFullScoreDocCollector.createSharedManager(env, limit, totalHitsThreshold); + } + + public CollectorManager> get(int shardIndex) { + return new CollectorManager<>() { + @Override + public LMDBFullScoreDocCollector newCollector() throws IOException { + return sharedCollector.newCollector(); + } + + @Override + public FullDocs reduce(Collection collectors) throws IOException { + var result = sharedCollector.reduce(collectors); + return new FullDocs<>() { + @Override + public Flux iterate() { + return result.iterate().map(doc -> new LLScoreDoc(doc.doc(), doc.score(), shardIndex)); + } + + @Override + public Flux iterate(long skips) { + return result.iterate(skips).map(doc -> new LLScoreDoc(doc.doc(), doc.score(), shardIndex)); + } + + @Override + public TotalHits totalHits() { + return result.totalHits(); + } + }; + } + }; + } + + @Override + public ScoreMode scoreMode() { + throw new NotImplementedException(); + } + + @Override + public FullDocs reduce(List> results) { + //noinspection unchecked + return FullDocs.merge(Sort.RELEVANCE, (FullDocs[]) results.toArray(FullDocs[]::new)); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java index 6c490d5..fb23c4d 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java @@ -9,12 +9,15 @@ import it.cavallium.dbengine.lucene.FullDocs; import it.cavallium.dbengine.lucene.LLFieldDoc; import it.cavallium.dbengine.lucene.LLScoreDoc; import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.collector.FullFieldDocs; import it.cavallium.dbengine.lucene.collector.LMDBFullFieldDocCollector; +import it.cavallium.dbengine.lucene.collector.LMDBFullFieldDocCollectorMultiManager; import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import java.io.Closeable; import java.io.IOException; import java.util.ServiceLoader; +import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TopFieldCollector; import org.warp.commonutils.log.Logger; @@ -65,28 +68,26 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher { .fromCallable(() -> { LLUtils.ensureBlocking(); var totalHitsThreshold = queryParams.getTotalHitsThresholdLong(); - return LMDBFullFieldDocCollector.createSharedManager(env, queryParams.sort(), queryParams.limitInt(), + return new LMDBFullFieldDocCollectorMultiManager(env, queryParams.sort(), queryParams.limitInt(), totalHitsThreshold); }) - .flatMap(sharedManager -> Flux + .flatMap(cmm -> Flux .fromIterable(indexSearchers) - .flatMap(shard -> Mono.fromCallable(() -> { + .index() + .flatMap(shardWithIndex -> Mono.fromCallable(() -> { LLUtils.ensureBlocking(); - var collector = sharedManager.newCollector(); - assert queryParams.complete() == collector.scoreMode().isExhaustive(); - assert queryParams - .getScoreModeOptional() - .map(scoreMode -> scoreMode == collector.scoreMode()) - .orElse(true); + var index = (int) (long) shardWithIndex.getT1(); + var shard = shardWithIndex.getT2(); - shard.search(queryParams.query(), collector); - return collector; + var cm = cmm.get(index); + + return shard.search(queryParams.query(), cm); })) .collectList() - .flatMap(collectors -> Mono.fromCallable(() -> { + .flatMap(results -> Mono.fromCallable(() -> { LLUtils.ensureBlocking(); - return sharedManager.reduce(collectors); + return cmm.reduce(results); })) ); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java index 17e7b4d..fe9e237 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java @@ -9,6 +9,7 @@ import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.FullDocs; import it.cavallium.dbengine.lucene.LLScoreDoc; import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector; +import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollectorMultiManager; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import java.io.Closeable; import java.io.IOException; @@ -70,27 +71,25 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher { .fromCallable(() -> { LLUtils.ensureBlocking(); var totalHitsThreshold = queryParams.getTotalHitsThresholdLong(); - return LMDBFullScoreDocCollector.createSharedManager(env, queryParams.limitLong(), totalHitsThreshold); + return new LMDBFullScoreDocCollectorMultiManager(env, queryParams.limitLong(), totalHitsThreshold); }) - .flatMap(sharedManager -> Flux + .flatMap(cmm -> Flux .fromIterable(indexSearchers) - .flatMap(shard -> Mono.fromCallable(() -> { + .index() + .flatMap(shardWithIndex -> Mono.fromCallable(() -> { LLUtils.ensureBlocking(); - var collector = sharedManager.newCollector(); - assert queryParams.complete() == collector.scoreMode().isExhaustive(); - assert queryParams - .getScoreModeOptional() - .map(scoreMode -> scoreMode == collector.scoreMode()) - .orElse(true); + var index = (int) (long) shardWithIndex.getT1(); + var shard = shardWithIndex.getT2(); - shard.search(queryParams.query(), collector); - return collector; + var cm = cmm.get(index); + + return shard.search(queryParams.query(), cm); })) .collectList() - .flatMap(collectors -> Mono.fromCallable(() -> { + .flatMap(results -> Mono.fromCallable(() -> { LLUtils.ensureBlocking(); - return sharedManager.reduce(collectors); + return cmm.reduce(results); })) ); }