2021-10-13 00:23:56 +02:00
|
|
|
package it.cavallium.dbengine.lucene;
|
|
|
|
|
|
|
|
import static it.cavallium.dbengine.lucene.LLDocElementScoreComparator.SCORE_DOC_SCORE_ELEM_COMPARATOR;
|
|
|
|
import static org.apache.lucene.search.TotalHits.Relation.*;
|
|
|
|
|
2021-10-15 22:03:53 +02:00
|
|
|
import it.cavallium.dbengine.lucene.collector.FullFieldDocs;
|
2022-06-30 13:54:55 +02:00
|
|
|
import it.cavallium.dbengine.utils.SimpleResource;
|
2021-12-18 21:01:14 +01:00
|
|
|
import java.io.IOException;
|
2021-10-13 00:23:56 +02:00
|
|
|
import java.util.Comparator;
|
|
|
|
import org.apache.lucene.search.FieldComparator;
|
|
|
|
import org.apache.lucene.search.Sort;
|
|
|
|
import org.apache.lucene.search.SortField;
|
|
|
|
import org.apache.lucene.search.TotalHits;
|
|
|
|
import org.apache.lucene.search.TotalHits.Relation;
|
|
|
|
import org.jetbrains.annotations.Nullable;
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
|
2021-10-15 00:03:41 +02:00
|
|
|
public interface FullDocs<T extends LLDoc> extends ResourceIterable<T> {
|
2021-10-13 00:23:56 +02:00
|
|
|
|
2021-10-15 00:03:41 +02:00
|
|
|
Comparator<LLDoc> SHARD_INDEX_TIE_BREAKER = Comparator.comparingInt(LLDoc::shardIndex);
|
|
|
|
Comparator<LLDoc> DOC_ID_TIE_BREAKER = Comparator.comparingInt(LLDoc::doc);
|
|
|
|
Comparator<LLDoc> DEFAULT_TIE_BREAKER = SHARD_INDEX_TIE_BREAKER.thenComparing(DOC_ID_TIE_BREAKER);
|
2021-10-13 00:23:56 +02:00
|
|
|
|
|
|
|
@Override
|
|
|
|
Flux<T> iterate();
|
|
|
|
|
|
|
|
@Override
|
|
|
|
Flux<T> iterate(long skips);
|
|
|
|
|
|
|
|
TotalHits totalHits();
|
|
|
|
|
2021-10-15 00:03:41 +02:00
|
|
|
static <T extends LLDoc> FullDocs<T> merge(@Nullable Sort sort, FullDocs<T>[] fullDocs) {
|
2021-10-13 00:23:56 +02:00
|
|
|
ResourceIterable<T> mergedIterable = mergeResourceIterable(sort, fullDocs);
|
|
|
|
TotalHits mergedTotalHits = mergeTotalHits(fullDocs);
|
2022-06-30 13:54:55 +02:00
|
|
|
FullDocs<T> docs = new MergedFullDocs<>(mergedIterable, mergedTotalHits);
|
2021-10-15 22:03:53 +02:00
|
|
|
if (sort != null) {
|
|
|
|
return new FullFieldDocs<>(docs, sort.getSort());
|
|
|
|
} else {
|
|
|
|
return docs;
|
|
|
|
}
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
|
|
|
|
2021-10-15 00:03:41 +02:00
|
|
|
static <T extends LLDoc> int tieBreakCompare(
|
2021-10-13 00:23:56 +02:00
|
|
|
T firstDoc,
|
|
|
|
T secondDoc,
|
|
|
|
Comparator<T> tieBreaker) {
|
|
|
|
assert tieBreaker != null;
|
|
|
|
|
|
|
|
int value = tieBreaker.compare(firstDoc, secondDoc);
|
|
|
|
if (value == 0) {
|
|
|
|
throw new IllegalStateException();
|
|
|
|
} else {
|
|
|
|
return value;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-15 00:03:41 +02:00
|
|
|
static <T extends LLDoc> ResourceIterable<T> mergeResourceIterable(
|
2021-10-13 00:23:56 +02:00
|
|
|
@Nullable Sort sort,
|
|
|
|
FullDocs<T>[] fullDocs) {
|
2022-06-30 13:54:55 +02:00
|
|
|
return new MergedResourceIterable<>(fullDocs, sort);
|
|
|
|
}
|
2021-10-13 00:23:56 +02:00
|
|
|
|
2022-06-30 13:54:55 +02:00
|
|
|
static <T extends LLDoc> TotalHits mergeTotalHits(FullDocs<T>[] fullDocs) {
|
|
|
|
long totalCount = 0;
|
|
|
|
Relation totalRelation = EQUAL_TO;
|
|
|
|
for (FullDocs<T> fullDoc : fullDocs) {
|
|
|
|
var totalHits = fullDoc.totalHits();
|
|
|
|
totalCount += totalHits.value;
|
|
|
|
totalRelation = switch (totalHits.relation) {
|
|
|
|
case EQUAL_TO -> totalRelation;
|
|
|
|
case GREATER_THAN_OR_EQUAL_TO -> totalRelation == EQUAL_TO ? GREATER_THAN_OR_EQUAL_TO : totalRelation;
|
|
|
|
};
|
|
|
|
}
|
|
|
|
return new TotalHits(totalCount, totalRelation);
|
|
|
|
}
|
2021-10-13 00:23:56 +02:00
|
|
|
|
2022-06-30 13:54:55 +02:00
|
|
|
class MergedResourceIterable<T extends LLDoc> extends SimpleResource implements ResourceIterable<T> {
|
2021-10-13 00:23:56 +02:00
|
|
|
|
2022-06-30 13:54:55 +02:00
|
|
|
private final FullDocs<T>[] fullDocs;
|
|
|
|
private final @Nullable Sort sort;
|
2021-10-13 00:23:56 +02:00
|
|
|
|
2022-06-30 13:54:55 +02:00
|
|
|
public MergedResourceIterable(FullDocs<T>[] fullDocs, @Nullable Sort sort) {
|
|
|
|
this.fullDocs = fullDocs;
|
|
|
|
this.sort = sort;
|
|
|
|
}
|
2021-10-13 00:23:56 +02:00
|
|
|
|
2022-06-30 13:54:55 +02:00
|
|
|
@Override
|
|
|
|
protected void onClose() {
|
|
|
|
for (FullDocs<T> fullDoc : fullDocs) {
|
|
|
|
fullDoc.close();
|
|
|
|
}
|
|
|
|
}
|
2021-10-13 00:23:56 +02:00
|
|
|
|
2022-06-30 13:54:55 +02:00
|
|
|
@Override
|
|
|
|
public Flux<T> iterate() {
|
|
|
|
@SuppressWarnings("unchecked") Flux<T>[] iterables = new Flux[fullDocs.length];
|
2021-10-13 00:23:56 +02:00
|
|
|
|
2022-06-30 13:54:55 +02:00
|
|
|
for (int i = 0; i < fullDocs.length; i++) {
|
|
|
|
var singleFullDocs = fullDocs[i].iterate();
|
|
|
|
iterables[i] = singleFullDocs;
|
|
|
|
}
|
|
|
|
|
|
|
|
Comparator<LLDoc> comp;
|
|
|
|
if (sort == null) {
|
|
|
|
// Merge maintaining sorting order (Algorithm taken from TopDocs.ScoreMergeSortQueue)
|
2021-10-13 00:23:56 +02:00
|
|
|
|
2022-06-30 13:54:55 +02:00
|
|
|
comp = SCORE_DOC_SCORE_ELEM_COMPARATOR.thenComparing(DEFAULT_TIE_BREAKER);
|
|
|
|
} else {
|
|
|
|
// Merge maintaining sorting order (Algorithm taken from TopDocs.MergeSortQueue)
|
|
|
|
|
|
|
|
SortField[] sortFields = sort.getSort();
|
|
|
|
var comparators = new FieldComparator[sortFields.length];
|
|
|
|
var reverseMul = new int[sortFields.length];
|
|
|
|
|
|
|
|
for (int compIDX = 0; compIDX < sortFields.length; ++compIDX) {
|
|
|
|
SortField sortField = sortFields[compIDX];
|
|
|
|
comparators[compIDX] = sortField.getComparator(1, compIDX == 0);
|
|
|
|
reverseMul[compIDX] = sortField.getReverse() ? -1 : 1;
|
2021-12-18 21:01:14 +01:00
|
|
|
}
|
2021-10-13 00:23:56 +02:00
|
|
|
|
2022-06-30 13:54:55 +02:00
|
|
|
comp = (first, second) -> {
|
|
|
|
assert first != second;
|
|
|
|
|
|
|
|
LLFieldDoc firstFD = (LLFieldDoc) first;
|
|
|
|
LLFieldDoc secondFD = (LLFieldDoc) second;
|
|
|
|
|
|
|
|
for (int compIDX = 0; compIDX < comparators.length; ++compIDX) {
|
|
|
|
//noinspection rawtypes
|
|
|
|
FieldComparator fieldComp = comparators[compIDX];
|
|
|
|
//noinspection unchecked
|
|
|
|
int cmp = reverseMul[compIDX] * fieldComp.compareValues(firstFD.fields().get(compIDX),
|
|
|
|
secondFD.fields().get(compIDX)
|
|
|
|
);
|
|
|
|
if (cmp != 0) {
|
|
|
|
return cmp;
|
2022-02-25 15:46:32 +01:00
|
|
|
}
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
|
|
|
|
2022-06-30 13:54:55 +02:00
|
|
|
return tieBreakCompare(first, second, DEFAULT_TIE_BREAKER);
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") Flux<T>[] fluxes = new Flux[fullDocs.length];
|
|
|
|
for (int i = 0; i < iterables.length; i++) {
|
|
|
|
var shardIndex = i;
|
|
|
|
fluxes[i] = iterables[i].map(shard -> {
|
|
|
|
if (shard instanceof LLScoreDoc scoreDoc) {
|
|
|
|
//noinspection unchecked
|
|
|
|
return (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex);
|
|
|
|
} else if (shard instanceof LLFieldDoc fieldDoc) {
|
|
|
|
//noinspection unchecked
|
|
|
|
return (T) new LLFieldDoc(fieldDoc.doc(), fieldDoc.score(), shardIndex, fieldDoc.fields());
|
|
|
|
} else if (shard instanceof LLSlotDoc slotDoc) {
|
|
|
|
//noinspection unchecked
|
|
|
|
return (T) new LLSlotDoc(slotDoc.doc(), slotDoc.score(), shardIndex, slotDoc.slot());
|
|
|
|
} else {
|
|
|
|
throw new UnsupportedOperationException("Unsupported type " + (shard == null ? null : shard.getClass()));
|
|
|
|
}
|
|
|
|
});
|
|
|
|
if (fullDocs[i].totalHits().relation == EQUAL_TO) {
|
|
|
|
fluxes[i] = fluxes[i].take(fullDocs[i].totalHits().value, true);
|
|
|
|
}
|
2021-12-18 21:01:14 +01:00
|
|
|
}
|
2022-06-30 13:54:55 +02:00
|
|
|
|
|
|
|
return Flux.mergeComparing(comp, fluxes);
|
|
|
|
}
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
|
|
|
|
2022-06-30 13:54:55 +02:00
|
|
|
class MergedFullDocs<T extends LLDoc> extends SimpleResource implements FullDocs<T> {
|
|
|
|
|
|
|
|
private final ResourceIterable<T> mergedIterable;
|
|
|
|
private final TotalHits mergedTotalHits;
|
|
|
|
|
|
|
|
public MergedFullDocs(ResourceIterable<T> mergedIterable, TotalHits mergedTotalHits) {
|
|
|
|
this.mergedIterable = mergedIterable;
|
|
|
|
this.mergedTotalHits = mergedTotalHits;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void onClose() {
|
|
|
|
mergedIterable.close();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Flux<T> iterate() {
|
|
|
|
return mergedIterable.iterate();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Flux<T> iterate(long skips) {
|
|
|
|
return mergedIterable.iterate(skips);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public TotalHits totalHits() {
|
|
|
|
return mergedTotalHits;
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|