This commit is contained in:
Andrea Cavalli 2021-12-17 16:24:18 +01:00
parent 6d92ba8a68
commit 0e9c8c089e
3 changed files with 167 additions and 15 deletions

View File

@ -507,12 +507,28 @@ public class LLUtils {
boolean cleanupOnSuccess) { boolean cleanupOnSuccess) {
return Mono.usingWhen(resourceSupplier, resourceClosure, r -> { return Mono.usingWhen(resourceSupplier, resourceClosure, r -> {
if (cleanupOnSuccess) { if (cleanupOnSuccess) {
return Mono.fromRunnable(() -> r.close()); return Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
});
} else { } else {
return Mono.empty(); return Mono.empty();
} }
}, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) }, (r, ex) -> Mono.fromRunnable(() -> {
.doOnDiscard(Resource.class, resource -> resource.close()) if (r.isAccessible()) {
r.close();
}
}), r -> Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
}))
.doOnDiscard(Resource.class, resource -> {
if (resource.isAccessible()) {
resource.close();
}
})
.doOnDiscard(Send.class, send -> send.close()); .doOnDiscard(Send.class, send -> send.close());
} }
@ -526,12 +542,28 @@ public class LLUtils {
boolean cleanupOnSuccess) { boolean cleanupOnSuccess) {
return Flux.usingWhen(resourceSupplier, resourceClosure, r -> { return Flux.usingWhen(resourceSupplier, resourceClosure, r -> {
if (cleanupOnSuccess) { if (cleanupOnSuccess) {
return Mono.fromRunnable(() -> r.close()); return Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
});
} else { } else {
return Mono.empty(); return Mono.empty();
} }
}, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) }, (r, ex) -> Mono.fromRunnable(() -> {
.doOnDiscard(Resource.class, resource -> resource.close()) if (r.isAccessible()) {
r.close();
}
}), r -> Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
}))
.doOnDiscard(Resource.class, resource -> {
if (resource.isAccessible()) {
resource.close();
}
})
.doOnDiscard(Send.class, send -> send.close()); .doOnDiscard(Send.class, send -> send.close());
} }
@ -546,12 +578,28 @@ public class LLUtils {
return resourceSupplier return resourceSupplier
.concatMap(resource -> Mono.usingWhen(Mono.just(resource), resourceClosure, r -> { .concatMap(resource -> Mono.usingWhen(Mono.just(resource), resourceClosure, r -> {
if (cleanupOnSuccess) { if (cleanupOnSuccess) {
return Mono.fromRunnable(() -> r.close()); return Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
});
} else { } else {
return Mono.empty(); return Mono.empty();
} }
}, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close()))) }, (r, ex) -> Mono.fromRunnable(() -> {
.doOnDiscard(Resource.class, resource -> resource.close()) if (r.isAccessible()) {
r.close();
}
}), r -> Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
})))
.doOnDiscard(Resource.class, resource -> {
if (resource.isAccessible()) {
resource.close();
}
})
.doOnDiscard(Send.class, send -> send.close()); .doOnDiscard(Send.class, send -> send.close());
} }
@ -569,8 +617,20 @@ public class LLUtils {
} else { } else {
return Mono.empty(); return Mono.empty();
} }
}, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) }, (r, ex) -> Mono.fromRunnable(() -> {
.doOnDiscard(Resource.class, resource -> resource.close()) if (r.isAccessible()) {
r.close();
}
}), r -> Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
}))
.doOnDiscard(Resource.class, resource -> {
if (resource.isAccessible()) {
resource.close();
}
})
.doOnDiscard(Send.class, send -> send.close()); .doOnDiscard(Send.class, send -> send.close());
} }
@ -584,12 +644,28 @@ public class LLUtils {
boolean cleanupOnSuccess) { boolean cleanupOnSuccess) {
return Flux.usingWhen(resourceSupplier.map(Send::receive), resourceClosure, r -> { return Flux.usingWhen(resourceSupplier.map(Send::receive), resourceClosure, r -> {
if (cleanupOnSuccess) { if (cleanupOnSuccess) {
return Mono.fromRunnable(() -> r.close()); return Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
});
} else { } else {
return Mono.empty(); return Mono.empty();
} }
}, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) }, (r, ex) -> Mono.fromRunnable(() -> {
.doOnDiscard(Resource.class, resource -> resource.close()) if (r.isAccessible()) {
r.close();
}
}), r -> Mono.fromRunnable(() -> {
if (r.isAccessible()) {
r.close();
}
}))
.doOnDiscard(Resource.class, resource -> {
if (resource.isAccessible()) {
resource.close();
}
})
.doOnDiscard(Send.class, send -> send.close()); .doOnDiscard(Send.class, send -> send.close());
} }

View File

@ -0,0 +1,64 @@
package it.cavallium.dbengine.lucene.collector;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.TotalHitCountCollectorManager.TimeLimitingTotalHitCountCollector;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.TotalHitCountCollector;
public class TotalHitCountCollectorManager implements CollectorManager<TimeLimitingTotalHitCountCollector, Long> {
private final Duration timeout;
public TotalHitCountCollectorManager(Duration timeout) {
this.timeout = timeout;
}
@Override
public TimeLimitingTotalHitCountCollector newCollector() {
var totalHitCountCollector = new TotalHitCountCollector();
var timeLimitingCollector = LuceneUtils.withTimeout(totalHitCountCollector, timeout);
return new TimeLimitingTotalHitCountCollector(totalHitCountCollector, timeLimitingCollector);
}
@Override
public Long reduce(Collection<TimeLimitingTotalHitCountCollector> collectors) throws IOException {
long totalHits = 0;
for (var collector : collectors) {
totalHits += collector.getTotalHits();
}
return totalHits;
}
public static final class TimeLimitingTotalHitCountCollector implements Collector {
private final TotalHitCountCollector totalHitCountCollector;
private final Collector timeLimitingCollector;
private TimeLimitingTotalHitCountCollector(TotalHitCountCollector totalHitCountCollector,
Collector timeLimitingCollector) {
this.totalHitCountCollector = totalHitCountCollector;
this.timeLimitingCollector = timeLimitingCollector;
}
@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
return timeLimitingCollector.getLeafCollector(context);
}
@Override
public ScoreMode scoreMode() {
return timeLimitingCollector.scoreMode();
}
public long getTotalHits() {
return totalHitCountCollector.getTotalHits();
}
}
}

View File

@ -9,9 +9,14 @@ import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.TotalHitCountCollectorManager;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TotalHitCountCollector;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
@ -120,7 +125,14 @@ public class CountMultiSearcher implements MultiSearcher {
.flatMap(queryParams2 -> Mono.fromCallable(() -> { .flatMap(queryParams2 -> Mono.fromCallable(() -> {
try (var is = indexSearcher.receive()) { try (var is = indexSearcher.receive()) {
LLUtils.ensureBlocking(); LLUtils.ensureBlocking();
return is.getIndexSearcher().count(queryParams2.query());
// If the timeout is very big, use the default count without timeout, because it's faster
if (queryParams2.timeout().compareTo(Duration.ofHours(1)) >= 0) {
return (long) is.getIndexSearcher().count(queryParams2.query());
} else {
var totalHitsCountCollectorManager = new TotalHitCountCollectorManager(queryParams2.timeout());
return is.getIndexSearcher().search(queryParams2.query(), totalHitsCountCollectorManager);
}
} }
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.publishOn(Schedulers.parallel()) .publishOn(Schedulers.parallel())