Remove stream executor

This commit is contained in:
Andrea Cavalli 2021-11-09 02:14:21 +01:00
parent 7fb8183c63
commit d4dae4667d
3 changed files with 19 additions and 25 deletions

View File

@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLSnapshot;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
@ -24,6 +25,7 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import org.warp.commonutils.type.ShortNamedThreadFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
@ -32,8 +34,9 @@ import reactor.core.scheduler.Schedulers;
public class CachedIndexSearcherManager implements IndexSearcherManager {
private static final Logger logger = LoggerFactory.getLogger(CachedIndexSearcherManager.class);
private static final Executor SEARCH_EXECUTOR = ForkJoinPool.commonPool();
private static final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR);
private final Executor SEARCH_EXECUTOR = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ShortNamedThreadFactory("lucene-search"));
private final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR);
private final SnapshotsManager snapshotsManager;
private final Similarity similarity;

View File

@ -1,7 +1,6 @@
package it.cavallium.dbengine.lucene.collector;
import it.cavallium.dbengine.database.LLUtils;
import java.util.concurrent.CancellationException;
import java.util.concurrent.locks.LockSupport;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;

View File

@ -8,7 +8,6 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.ReactiveCollectorMultiManager;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@ -21,9 +20,6 @@ import reactor.core.publisher.Mono;
public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
private static final ExecutorService EXECUTOR_SERVICE
= Executors.newCachedThreadPool(new ShortNamedThreadFactory("StreamingExecutor"));
@Override
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams,
@ -64,26 +60,22 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
int mutableShardIndex = 0;
for (IndexSearcher shard : shards) {
int shardIndex = mutableShardIndex++;
EXECUTOR_SERVICE.execute(() -> {
try {
var collector = cmm.get(shardIndex);
assert queryParams.complete() == cmm.scoreMode().isExhaustive();
assert queryParams
.getScoreModeOptional()
.map(scoreMode -> scoreMode == cmm.scoreMode())
.orElse(true);
try {
var collector = cmm.get(shardIndex);
assert queryParams.complete() == cmm.scoreMode().isExhaustive();
assert queryParams
.getScoreModeOptional()
.map(scoreMode -> scoreMode == cmm.scoreMode())
.orElse(true);
shard.search(localQueryParams.query(), collector);
} catch (Throwable e) {
if (!(e instanceof CancellationException)) {
scoreDocsSink.error(e);
}
} finally {
if (runningTasks.decrementAndGet() <= 0) {
scoreDocsSink.complete();
}
shard.search(localQueryParams.query(), collector);
} catch (Throwable e) {
scoreDocsSink.error(e);
} finally {
if (runningTasks.decrementAndGet() <= 0) {
scoreDocsSink.complete();
}
});
}
}
}, OverflowStrategy.BUFFER);