Finish refactoring SimpleLuceneLocalSearcher

This commit is contained in:
Andrea Cavalli 2021-09-19 12:01:11 +02:00
parent 8bc0284f27
commit d1963a1d65
3 changed files with 194 additions and 110 deletions

View File

@ -5,6 +5,7 @@ import com.google.common.primitives.Longs;
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.CompositeBuffer; import io.net5.buffer.api.CompositeBuffer;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import io.net5.util.IllegalReferenceCountException; import io.net5.util.IllegalReferenceCountException;
import io.net5.util.internal.PlatformDependent; import io.net5.util.internal.PlatformDependent;
@ -24,6 +25,7 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.ToIntFunction; import java.util.function.ToIntFunction;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
@ -344,6 +346,47 @@ public class LLUtils {
} }
} }
/**
* cleanup resource
* @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful
*/
public static <U, T extends Resource<T>> Mono<U> usingSend(Mono<Send<T>> resourceSupplier,
Function<Send<T>, Mono<U>> resourceClosure,
boolean cleanupOnSuccess) {
return Mono.usingWhen(resourceSupplier, resourceClosure,
r -> {
if (cleanupOnSuccess) {
return Mono.fromRunnable(r::close);
} else {
return Mono.empty();
}
},
(r, ex) -> Mono.fromRunnable(r::close),
r -> Mono.fromRunnable(r::close))
.doOnDiscard(Send.class, Send::close);
}
/**
* cleanup resource
* @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful
*/
public static <U, T extends Resource<T>, V extends T> Mono<U> usingResource(Mono<V> resourceSupplier,
Function<V, Mono<U>> resourceClosure,
boolean cleanupOnSuccess) {
return Mono.usingWhen(resourceSupplier, resourceClosure,
r -> {
if (cleanupOnSuccess) {
return Mono.fromRunnable(r::close);
} else {
return Mono.empty();
}
},
(r, ex) -> Mono.fromRunnable(r::close),
r -> Mono.fromRunnable(r::close))
.doOnDiscard(Resource.class, Resource::close)
.doOnDiscard(Send.class, Send::close);
}
public static record DirectBuffer(@NotNull Send<Buffer> buffer, @NotNull ByteBuffer byteBuffer) {} public static record DirectBuffer(@NotNull Send<Buffer> buffer, @NotNull ByteBuffer byteBuffer) {}
@NotNull @NotNull

View File

@ -19,7 +19,7 @@ public interface IndexSearchers extends Resource<IndexSearchers> {
return new ShardedIndexSearchers(indexSearchers, d -> {}); return new ShardedIndexSearchers(indexSearchers, d -> {});
} }
static IndexSearchers unsharded(LLIndexSearcher indexSearcher) { static UnshardedIndexSearchers unsharded(Send<LLIndexSearcher> indexSearcher) {
return new UnshardedIndexSearchers(indexSearcher, d -> {}); return new UnshardedIndexSearchers(indexSearcher, d -> {});
} }
@ -30,9 +30,9 @@ public interface IndexSearchers extends Resource<IndexSearchers> {
private LLIndexSearcher indexSearcher; private LLIndexSearcher indexSearcher;
public UnshardedIndexSearchers(LLIndexSearcher indexSearcher, Drop<UnshardedIndexSearchers> drop) { public UnshardedIndexSearchers(Send<LLIndexSearcher> indexSearcher, Drop<UnshardedIndexSearchers> drop) {
super(new CloseOnDrop(drop)); super(new CloseOnDrop(drop));
this.indexSearcher = indexSearcher; this.indexSearcher = indexSearcher.receive();
} }
@Override @Override
@ -46,6 +46,10 @@ public interface IndexSearchers extends Resource<IndexSearchers> {
return indexSearcher; return indexSearcher;
} }
public LLIndexSearcher shard() {
return this.shard(0);
}
@Override @Override
protected RuntimeException createResourceClosedException() { protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed"); return new IllegalStateException("Closed");
@ -53,7 +57,7 @@ public interface IndexSearchers extends Resource<IndexSearchers> {
@Override @Override
protected Owned<UnshardedIndexSearchers> prepareSend() { protected Owned<UnshardedIndexSearchers> prepareSend() {
LLIndexSearcher indexSearcher = this.indexSearcher; Send<LLIndexSearcher> indexSearcher = this.indexSearcher.send();
this.makeInaccessible(); this.makeInaccessible();
return drop -> new UnshardedIndexSearchers(indexSearcher, drop); return drop -> new UnshardedIndexSearchers(indexSearcher, drop);
} }

View File

@ -4,31 +4,23 @@ import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT; import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT; import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.IndexSearchers.UnshardedIndexSearchers;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector; import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TotalHits;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType; import reactor.core.publisher.SynchronousSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
@ -38,107 +30,152 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
String keyFieldName) { String keyFieldName) {
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
PaginationInfo paginationInfo; PaginationInfo paginationInfo = getPaginationInfo(queryParams);
if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) {
paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true); var indexSearchersMono = indexSearcherMono.map(IndexSearchers::unsharded);
} else {
paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false); return LLUtils.usingResource(indexSearchersMono, indexSearchers -> this
// Search first page results
.searchFirstPage(indexSearchers, queryParams, paginationInfo)
// Compute the results of the first page
.transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers,
keyFieldName, queryParams))
// Compute other results
.transform(firstResult -> this.computeOtherResults(firstResult, indexSearchers, queryParams, keyFieldName))
// Ensure that one LuceneSearchResult is always returned
.single(),
false);
} }
return indexSearcherMono /**
.flatMap(indexSearcherToReceive -> { * Get the pagination info
var indexSearcher = indexSearcherToReceive.receive(); */
var indexSearchers = IndexSearchers.unsharded(indexSearcher); private PaginationInfo getPaginationInfo(LocalQueryParams queryParams) {
return Mono if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) {
.fromCallable(() -> { return new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true);
LLUtils.ensureBlocking(); } else {
TopDocsCollector<ScoreDoc> firstPageCollector = TopDocsSearcher.getTopDocsCollector( return new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false);
queryParams.sort(), }
LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), }
null,
LuceneUtils.totalHitsThreshold(),
!paginationInfo.forceSinglePage(),
queryParams.isScored());
indexSearcher.getIndexSearcher().search(queryParams.query(), firstPageCollector); /**
return firstPageCollector.topDocs(LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()), * Search effectively the raw results of the first page
LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()) */
); private Mono<PageData> searchFirstPage(UnshardedIndexSearchers indexSearchers,
}) LocalQueryParams queryParams,
.map(firstPageTopDocs -> { PaginationInfo paginationInfo) {
Flux<LLKeyScore> firstPageHitsFlux = LuceneUtils.convertHits(Flux.fromArray(firstPageTopDocs.scoreDocs), var limit = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit());
var pagination = !paginationInfo.forceSinglePage();
var resultsOffset = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset());
return Mono
.fromSupplier(() -> new CurrentPageInfo(null, limit, 0))
.handle((s, sink) -> this.searchPageSync(queryParams, indexSearchers, pagination, resultsOffset, s, sink));
}
/**
* Search effectively the merged raw results of the next pages
*/
private Flux<LLKeyScore> searchOtherPages(UnshardedIndexSearchers indexSearchers,
LocalQueryParams queryParams, String keyFieldName, CurrentPageInfo secondPageInfo) {
return Flux
.<PageData, CurrentPageInfo>generate(
() -> secondPageInfo,
(s, sink) -> searchPageSync(queryParams, indexSearchers, true, 0, s, sink),
s -> {}
)
.subscribeOn(Schedulers.boundedElastic())
.map(PageData::topDocs)
.flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs))
.transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers,
keyFieldName, true));
}
private static record FirstPageResults(TotalHitsCount totalHitsCount, Flux<LLKeyScore> firstPageHitsFlux,
CurrentPageInfo nextPageInfo) {}
/**
* Compute the results of the first page, extracting useful data
*/
private Mono<FirstPageResults> computeFirstPageResults(Mono<PageData> firstPageDataMono,
IndexSearchers indexSearchers,
String keyFieldName,
LocalQueryParams queryParams) {
return firstPageDataMono.map(firstPageData -> {
var totalHitsCount = LuceneUtils.convertTotalHitsCount(firstPageData.topDocs().totalHits);
Flux<LLKeyScore> firstPageHitsFlux = LuceneUtils.convertHits(Flux.fromArray(firstPageData.topDocs().scoreDocs),
indexSearchers, keyFieldName, true) indexSearchers, keyFieldName, true)
.take(queryParams.limit(), true); .take(queryParams.limit(), true);
return Tuples.of(Optional.ofNullable(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs)), CurrentPageInfo nextPageInfo = firstPageData.nextPageInfo();
LuceneUtils.convertTotalHitsCount(firstPageTopDocs.totalHits), firstPageHitsFlux);
})
.map(firstResult -> {
var firstPageLastScoreDoc = firstResult.getT1();
var totalHitsCount = firstResult.getT2();
var firstPageFlux = firstResult.getT3();
return new FirstPageResults(totalHitsCount, firstPageHitsFlux, nextPageInfo);
});
}
Flux<LLKeyScore> nextHits; private Mono<Send<LuceneSearchResult>> computeOtherResults(Mono<FirstPageResults> firstResultMono,
if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { UnshardedIndexSearchers indexSearchers,
nextHits = null; LocalQueryParams queryParams,
} else { String keyFieldName) {
nextHits = Flux.defer(() -> Flux return firstResultMono.map(firstResult -> {
.<TopDocs, CurrentPageInfo>generate( var totalHitsCount = firstResult.totalHitsCount();
() -> new CurrentPageInfo(firstPageLastScoreDoc.orElse(null), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), var firstPageHitsFlux = firstResult.firstPageHitsFlux();
(s, sink) -> { var secondPageInfo = firstResult.nextPageInfo();
Flux<LLKeyScore> nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo);
Flux<LLKeyScore> combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux);
return new LuceneSearchResult(totalHitsCount, combinedFlux, d -> indexSearchers.close()).send();
});
}
private static record PageData(TopDocs topDocs, CurrentPageInfo nextPageInfo) {}
/**
*
* @param resultsOffset offset of the resulting topDocs. Useful if you want to
* skip the first n results in the first page
*/
private CurrentPageInfo searchPageSync(LocalQueryParams queryParams,
UnshardedIndexSearchers indexSearchers,
boolean allowPagination,
int resultsOffset,
CurrentPageInfo s,
SynchronousSink<PageData> sink) {
LLUtils.ensureBlocking(); LLUtils.ensureBlocking();
if (s.last() != null && s.remainingLimit() > 0) { if (resultsOffset < 0) {
throw new IndexOutOfBoundsException(resultsOffset);
}
if ((s.pageIndex() == 0 || s.last() != null) && s.remainingLimit() > 0) {
TopDocs pageTopDocs; TopDocs pageTopDocs;
try { try {
TopDocsCollector<ScoreDoc> collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(), TopDocsCollector<ScoreDoc> collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(),
s.currentPageLimit(), s.last(), LuceneUtils.totalHitsThreshold(), true, s.currentPageLimit(), s.last(), LuceneUtils.totalHitsThreshold(), allowPagination,
queryParams.isScored()); queryParams.isScored());
indexSearcher.getIndexSearcher().search(queryParams.query(), collector); indexSearchers.shard().getIndexSearcher().search(queryParams.query(), collector);
if (resultsOffset > 0) {
pageTopDocs = collector.topDocs(resultsOffset, s.currentPageLimit());
} else {
pageTopDocs = collector.topDocs(); pageTopDocs = collector.topDocs();
}
} catch (IOException e) { } catch (IOException e) {
sink.error(e); sink.error(e);
return EMPTY_STATUS; return EMPTY_STATUS;
} }
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs); var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
sink.next(pageTopDocs); long nextRemainingLimit;
return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1); if (allowPagination) {
nextRemainingLimit = s.remainingLimit() - s.currentPageLimit();
} else {
nextRemainingLimit = 0L;
}
var nextPageIndex = s.pageIndex() + 1;
var nextPageInfo = new CurrentPageInfo(pageLastDoc, nextRemainingLimit, nextPageIndex);
sink.next(new PageData(pageTopDocs, nextPageInfo));
return nextPageInfo;
} else { } else {
sink.complete(); sink.complete();
return EMPTY_STATUS; return EMPTY_STATUS;
} }
},
s -> {}
)
.subscribeOn(Schedulers.boundedElastic())
.flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs))
.transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers,
keyFieldName, true))
);
}
Flux<LLKeyScore> combinedFlux;
if (nextHits != null) {
combinedFlux = firstPageFlux
.concatWith(nextHits);
} else {
combinedFlux = firstPageFlux;
}
return new LuceneSearchResult(totalHitsCount, combinedFlux, d -> {
indexSearcher.close();
indexSearchers.close();
}).send();
})
.doFinally(s -> {
// Close searchers if the search result has not been returned
if (s != SignalType.ON_COMPLETE) {
indexSearcher.close();
indexSearchers.close();
}
});
}
)
.doOnDiscard(Send.class, Send::close);
} }
} }