From ef15bf7e8cdbfe0f15e63ffd545d5f4be90033f9 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 8 Nov 2021 18:52:52 +0100 Subject: [PATCH] Bugfixes --- .../it/cavallium/dbengine/client/Hits.java | 5 ++++- .../collections/DatabaseStageMap.java | 2 +- .../collector/ReactiveLeafCollector.java | 4 ++++ ...nsortedUnscoredStreamingMultiSearcher.java | 19 +++++++++++-------- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/client/Hits.java b/src/main/java/it/cavallium/dbengine/client/Hits.java index 120be5a..4b0b92e 100644 --- a/src/main/java/it/cavallium/dbengine/client/Hits.java +++ b/src/main/java/it/cavallium/dbengine/client/Hits.java @@ -7,6 +7,8 @@ import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.collections.ValueGetter; import it.cavallium.dbengine.database.collections.ValueTransformer; +import java.util.Map.Entry; +import java.util.Optional; import java.util.function.Function; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -85,7 +87,8 @@ public final class Hits extends ResourceSupport, Hits> { //noinspection unchecked var keyMono = Mono.just((T) data[0]); //noinspection unchecked - var valMono = Mono.just((U) data[1]); + var val = (Entry>) data[1]; + var valMono = Mono.justOrEmpty(val.getValue()); var score = (Float) data[2]; return new LazyHitEntry<>(keyMono, valMono, score); }, keysFlux, valuesFlux, scoresFlux); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index cc4c8e0..089cf69 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -281,7 +281,7 @@ public interface DatabaseStageMap> extends Dat default ValueTransformer getAsyncDbValueTransformer(@Nullable CompositeSnapshot snapshot) { return keys -> { var sharedKeys = keys.publish().refCount(2); - var values = getMulti(snapshot, sharedKeys); + var values = DatabaseStageMap.this.getMulti(snapshot, sharedKeys); return Flux.zip(sharedKeys, values, Map::entry); }; } diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveLeafCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveLeafCollector.java index 9b4be4a..b0f6dd4 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveLeafCollector.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveLeafCollector.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.lucene.collector; import it.cavallium.dbengine.database.LLUtils; +import java.util.concurrent.CancellationException; import java.util.concurrent.locks.LockSupport; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.LeafCollector; @@ -36,5 +37,8 @@ public class ReactiveLeafCollector implements LeafCollector { LockSupport.parkNanos(100L * 1000000L); } scoreDocsSink.next(scoreDoc); + if (scoreDocsSink.isCancelled()) { + throw new CancellationException("Cancelled"); + } } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java index a815ad1..987937e 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java @@ -9,11 +9,16 @@ import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.ReactiveCollectorManager; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import java.util.Queue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; import java.util.function.Supplier; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ScoreDoc; +import org.warp.commonutils.type.ShortNamedThreadFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; @@ -26,12 +31,8 @@ import reactor.util.concurrent.Queues; public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { - private static final Scheduler UNSCORED_UNSORTED_EXECUTOR = Schedulers.newBoundedElastic( - Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, - Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - "UnscoredUnsortedExecutor" - ); - private static final Supplier> QUEUE_SUPPLIER = Queues.get(1024); + private static final ExecutorService EXECUTOR_SERVICE + = Executors.newCachedThreadPool(new ShortNamedThreadFactory("StreamingExecutor")); @Override public Mono collectMulti(Mono> indexSearchersMono, @@ -73,7 +74,7 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { int mutableShardIndex = 0; for (IndexSearcher shard : shards) { int shardIndex = mutableShardIndex++; - UNSCORED_UNSORTED_EXECUTOR.schedule(() -> { + EXECUTOR_SERVICE.execute(() -> { try { var collector = cm.newCollector(); assert queryParams.complete() == collector.scoreMode().isExhaustive(); @@ -86,7 +87,9 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { shard.search(localQueryParams.query(), collector); } catch (Throwable e) { - scoreDocsSink.error(e); + if (!(e instanceof CancellationException)) { + scoreDocsSink.error(e); + } } finally { if (runningTasks.decrementAndGet() <= 0) { scoreDocsSink.complete();