From 0e9c8c089e733965fa27d069b4ebb75386db0b85 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 17 Dec 2021 16:24:18 +0100 Subject: [PATCH] Bugfixes --- .../cavallium/dbengine/database/LLUtils.java | 104 +++++++++++++++--- .../TotalHitCountCollectorManager.java | 64 +++++++++++ .../lucene/searcher/CountMultiSearcher.java | 14 ++- 3 files changed, 167 insertions(+), 15 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/collector/TotalHitCountCollectorManager.java diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 139e0ee..a0f11ac 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -507,12 +507,28 @@ public class LLUtils { boolean cleanupOnSuccess) { return Mono.usingWhen(resourceSupplier, resourceClosure, r -> { if (cleanupOnSuccess) { - return Mono.fromRunnable(() -> r.close()); + return Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + }); } else { return Mono.empty(); } - }, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) - .doOnDiscard(Resource.class, resource -> resource.close()) + }, (r, ex) -> Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + }), r -> Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + })) + .doOnDiscard(Resource.class, resource -> { + if (resource.isAccessible()) { + resource.close(); + } + }) .doOnDiscard(Send.class, send -> send.close()); } @@ -526,12 +542,28 @@ public class LLUtils { boolean cleanupOnSuccess) { return Flux.usingWhen(resourceSupplier, resourceClosure, r -> { if (cleanupOnSuccess) { - return Mono.fromRunnable(() -> r.close()); + return Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + }); } else { return Mono.empty(); } - }, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) - .doOnDiscard(Resource.class, resource -> resource.close()) + }, (r, ex) -> Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + }), r -> Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + })) + .doOnDiscard(Resource.class, resource -> { + if (resource.isAccessible()) { + resource.close(); + } + }) .doOnDiscard(Send.class, send -> send.close()); } @@ -546,12 +578,28 @@ public class LLUtils { return resourceSupplier .concatMap(resource -> Mono.usingWhen(Mono.just(resource), resourceClosure, r -> { if (cleanupOnSuccess) { - return Mono.fromRunnable(() -> r.close()); + return Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + }); } else { return Mono.empty(); } - }, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close()))) - .doOnDiscard(Resource.class, resource -> resource.close()) + }, (r, ex) -> Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + }), r -> Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + }))) + .doOnDiscard(Resource.class, resource -> { + if (resource.isAccessible()) { + resource.close(); + } + }) .doOnDiscard(Send.class, send -> send.close()); } @@ -569,8 +617,20 @@ public class LLUtils { } else { return Mono.empty(); } - }, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) - .doOnDiscard(Resource.class, resource -> resource.close()) + }, (r, ex) -> Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + }), r -> Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + })) + .doOnDiscard(Resource.class, resource -> { + if (resource.isAccessible()) { + resource.close(); + } + }) .doOnDiscard(Send.class, send -> send.close()); } @@ -584,12 +644,28 @@ public class LLUtils { boolean cleanupOnSuccess) { return Flux.usingWhen(resourceSupplier.map(Send::receive), resourceClosure, r -> { if (cleanupOnSuccess) { - return Mono.fromRunnable(() -> r.close()); + return Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + }); } else { return Mono.empty(); } - }, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) - .doOnDiscard(Resource.class, resource -> resource.close()) + }, (r, ex) -> Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + }), r -> Mono.fromRunnable(() -> { + if (r.isAccessible()) { + r.close(); + } + })) + .doOnDiscard(Resource.class, resource -> { + if (resource.isAccessible()) { + resource.close(); + } + }) .doOnDiscard(Send.class, send -> send.close()); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/TotalHitCountCollectorManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/TotalHitCountCollectorManager.java new file mode 100644 index 0000000..aaab68d --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/TotalHitCountCollectorManager.java @@ -0,0 +1,64 @@ +package it.cavallium.dbengine.lucene.collector; + +import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.collector.TotalHitCountCollectorManager.TimeLimitingTotalHitCountCollector; +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.TotalHitCountCollector; + +public class TotalHitCountCollectorManager implements CollectorManager { + + private final Duration timeout; + + public TotalHitCountCollectorManager(Duration timeout) { + this.timeout = timeout; + } + + @Override + public TimeLimitingTotalHitCountCollector newCollector() { + var totalHitCountCollector = new TotalHitCountCollector(); + var timeLimitingCollector = LuceneUtils.withTimeout(totalHitCountCollector, timeout); + return new TimeLimitingTotalHitCountCollector(totalHitCountCollector, timeLimitingCollector); + } + + @Override + public Long reduce(Collection collectors) throws IOException { + long totalHits = 0; + for (var collector : collectors) { + totalHits += collector.getTotalHits(); + } + return totalHits; + } + + public static final class TimeLimitingTotalHitCountCollector implements Collector { + + private final TotalHitCountCollector totalHitCountCollector; + private final Collector timeLimitingCollector; + + private TimeLimitingTotalHitCountCollector(TotalHitCountCollector totalHitCountCollector, + Collector timeLimitingCollector) { + this.totalHitCountCollector = totalHitCountCollector; + this.timeLimitingCollector = timeLimitingCollector; + } + + @Override + public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { + return timeLimitingCollector.getLeafCollector(context); + } + + @Override + public ScoreMode scoreMode() { + return timeLimitingCollector.scoreMode(); + } + + public long getTotalHits() { + return totalHitCountCollector.getTotalHits(); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java index d5bafff..edb8dbe 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java @@ -9,9 +9,14 @@ import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.collector.TotalHitCountCollectorManager; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; +import java.time.Duration; import java.util.ArrayList; import java.util.List; +import org.apache.lucene.search.TimeLimitingCollector; +import org.apache.lucene.search.TotalHitCountCollector; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -120,7 +125,14 @@ public class CountMultiSearcher implements MultiSearcher { .flatMap(queryParams2 -> Mono.fromCallable(() -> { try (var is = indexSearcher.receive()) { LLUtils.ensureBlocking(); - return is.getIndexSearcher().count(queryParams2.query()); + + // If the timeout is very big, use the default count without timeout, because it's faster + if (queryParams2.timeout().compareTo(Duration.ofHours(1)) >= 0) { + return (long) is.getIndexSearcher().count(queryParams2.query()); + } else { + var totalHitsCountCollectorManager = new TotalHitCountCollectorManager(queryParams2.timeout()); + return is.getIndexSearcher().search(queryParams2.query(), totalHitsCountCollectorManager); + } } }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) .publishOn(Schedulers.parallel())