This commit is contained in:
Andrea Cavalli 2021-11-08 18:52:52 +01:00
parent 81b7df8702
commit ef15bf7e8c
4 changed files with 20 additions and 10 deletions

View File

@ -7,6 +7,8 @@ import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.collections.ValueGetter;
import it.cavallium.dbengine.database.collections.ValueTransformer;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -85,7 +87,8 @@ public final class Hits<T> extends ResourceSupport<Hits<T>, Hits<T>> {
//noinspection unchecked
var keyMono = Mono.just((T) data[0]);
//noinspection unchecked
var valMono = Mono.just((U) data[1]);
var val = (Entry<T, Optional<U>>) data[1];
var valMono = Mono.justOrEmpty(val.getValue());
var score = (Float) data[2];
return new LazyHitEntry<>(keyMono, valMono, score);
}, keysFlux, valuesFlux, scoresFlux);

View File

@ -281,7 +281,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
default ValueTransformer<T, U> getAsyncDbValueTransformer(@Nullable CompositeSnapshot snapshot) {
return keys -> {
var sharedKeys = keys.publish().refCount(2);
var values = getMulti(snapshot, sharedKeys);
var values = DatabaseStageMap.this.getMulti(snapshot, sharedKeys);
return Flux.zip(sharedKeys, values, Map::entry);
};
}

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.lucene.collector;
import it.cavallium.dbengine.database.LLUtils;
import java.util.concurrent.CancellationException;
import java.util.concurrent.locks.LockSupport;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.LeafCollector;
@ -36,5 +37,8 @@ public class ReactiveLeafCollector implements LeafCollector {
LockSupport.parkNanos(100L * 1000000L);
}
scoreDocsSink.next(scoreDoc);
if (scoreDocsSink.isCancelled()) {
throw new CancellationException("Cancelled");
}
}
}

View File

@ -9,11 +9,16 @@ import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.ReactiveCollectorManager;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.warp.commonutils.type.ShortNamedThreadFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
@ -26,12 +31,8 @@ import reactor.util.concurrent.Queues;
public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
private static final Scheduler UNSCORED_UNSORTED_EXECUTOR = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"UnscoredUnsortedExecutor"
);
private static final Supplier<Queue<ScoreDoc>> QUEUE_SUPPLIER = Queues.get(1024);
private static final ExecutorService EXECUTOR_SERVICE
= Executors.newCachedThreadPool(new ShortNamedThreadFactory("StreamingExecutor"));
@Override
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
@ -73,7 +74,7 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
int mutableShardIndex = 0;
for (IndexSearcher shard : shards) {
int shardIndex = mutableShardIndex++;
UNSCORED_UNSORTED_EXECUTOR.schedule(() -> {
EXECUTOR_SERVICE.execute(() -> {
try {
var collector = cm.newCollector();
assert queryParams.complete() == collector.scoreMode().isExhaustive();
@ -86,7 +87,9 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
shard.search(localQueryParams.query(), collector);
} catch (Throwable e) {
scoreDocsSink.error(e);
if (!(e instanceof CancellationException)) {
scoreDocsSink.error(e);
}
} finally {
if (runningTasks.decrementAndGet() <= 0) {
scoreDocsSink.complete();