From a4a8926e02dd0f4dedbe8c646aa7fd504c3d2f53 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 23 Jul 2022 02:42:48 +0200 Subject: [PATCH] Close lucene objects in the correct thread --- .../cavallium/dbengine/database/LLUtils.java | 9 +- .../disk/CachedIndexSearcherManager.java | 12 +- .../database/disk/LLIndexSearchers.java | 5 +- .../disk/LLLocalDatabaseConnection.java | 5 +- .../database/disk/LLLocalLuceneIndex.java | 17 ++- .../disk/LLLocalMultiLuceneIndex.java | 10 +- .../database/disk/LuceneIndexSnapshot.java | 3 +- .../database/disk/LuceneThreadFactory.java | 27 ++++ .../disk/SimpleIndexSearcherManager.java | 14 ++- .../database/disk/SnapshotsManager.java | 7 +- .../memory/LLMemoryDatabaseConnection.java | 5 +- .../dbengine/lucene/CheckIndexInput.java | 111 ++++++++++++++++ .../dbengine/lucene/CheckIndexOutput.java | 60 +++++++++ .../dbengine/lucene/CheckOutputDirectory.java | 89 +++++++++++++ .../dbengine/lucene/LuceneCloseable.java | 8 ++ .../LuceneConcurrentMergeScheduler.java | 33 +++++ .../dbengine/lucene/LuceneThread.java | 10 ++ .../dbengine/lucene/LuceneUtils.java | 85 ++++++++++--- .../lucene/searcher/CountMultiSearcher.java | 7 +- .../searcher/DecimalBucketMultiSearcher.java | 10 +- .../lucene/searcher/LuceneGenerator.java | 6 +- .../lucene/searcher/LuceneSearchResult.java | 13 +- .../lucene/searcher/PagedLocalSearcher.java | 45 ++++--- .../searcher/ScoredPagedMultiSearcher.java | 52 +++++--- .../SortedByScoreFullMultiSearcher.java | 55 +++++--- .../SortedScoredFullMultiSearcher.java | 48 +++++-- .../lucene/searcher/StandardSearcher.java | 119 ++++++++++-------- .../UnsortedStreamingMultiSearcher.java | 31 +++-- .../utils/ShortNamedThreadFactory.java | 14 +-- .../UnsortedUnscoredSimpleMultiSearcher.java | 36 ++++-- 30 files changed, 734 insertions(+), 212 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/LuceneThreadFactory.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/CheckIndexInput.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/CheckIndexOutput.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/CheckOutputDirectory.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/LuceneCloseable.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/LuceneConcurrentMergeScheduler.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/LuceneThread.java diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 4c664a0..43fd953 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database; import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY; import com.google.common.primitives.Ints; @@ -21,6 +22,7 @@ import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.RandomSortField; import it.cavallium.dbengine.utils.SimpleResource; import java.io.Closeable; @@ -648,7 +650,12 @@ public class LLUtils { } public static Mono finalizeResource(SafeCloseable resource) { - return Mono.fromRunnable(resource::close); + Mono runnable = Mono.fromRunnable(resource::close); + if (resource instanceof LuceneCloseable) { + return runnable.subscribeOn(luceneScheduler()); + } else { + return runnable; + } } public static void finalizeResourceNow(Resource resource) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java index cde59a5..494242c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java @@ -1,12 +1,15 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.lucene.LuceneCloseable; +import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.utils.SimpleResource; import java.io.IOException; import java.io.UncheckedIOException; @@ -34,12 +37,12 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; // todo: deduplicate code between Cached and Simple searcher managers -public class CachedIndexSearcherManager extends SimpleResource implements IndexSearcherManager { +public class CachedIndexSearcherManager extends SimpleResource implements IndexSearcherManager, LuceneCloseable { private static final Logger LOG = LogManager.getLogger(SimpleIndexSearcherManager.class); private static final ExecutorService SEARCH_EXECUTOR = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors(), - new ShortNamedThreadFactory("lucene-search") + new LuceneThreadFactory("lucene-search") .setDaemon(true).withGroup(new ThreadGroup("lucene-search")) ); private static final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR); @@ -123,8 +126,7 @@ public class CachedIndexSearcherManager extends SimpleResource implements IndexS throw ex; } }) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .publishOn(Schedulers.parallel()); + .transform(LuceneUtils::scheduleLucene); } private void dropCachedIndexSearcher() { @@ -204,7 +206,7 @@ public class CachedIndexSearcherManager extends SimpleResource implements IndexS return activeRefreshes.get(); } - private class MainIndexSearcher extends LLIndexSearcherImpl { + private class MainIndexSearcher extends LLIndexSearcherImpl implements LuceneCloseable { public MainIndexSearcher(IndexSearcher indexSearcher, SearcherManager searcherManager) { super(indexSearcher, () -> releaseOnCleanup(searcherManager, indexSearcher)); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java index 4d0fedc..f1da746 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.DiscardingCloseable; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher; import it.cavallium.dbengine.utils.SimpleResource; import java.util.ArrayList; @@ -26,7 +27,7 @@ public interface LLIndexSearchers extends DiscardingCloseable { LLIndexSearcher llShard(int shardIndex); - class UnshardedIndexSearchers extends SimpleResource implements LLIndexSearchers { + class UnshardedIndexSearchers extends SimpleResource implements LLIndexSearchers, LuceneCloseable { private final LLIndexSearcher indexSearcher; @@ -74,7 +75,7 @@ public interface LLIndexSearchers extends DiscardingCloseable { } } - class ShardedIndexSearchers extends SimpleResource implements LLIndexSearchers { + class ShardedIndexSearchers extends SimpleResource implements LLIndexSearchers, LuceneCloseable { private final List indexSearchers; private final List indexSearchersVals; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java index f550bc8..eb5bb53 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java @@ -1,11 +1,14 @@ package it.cavallium.dbengine.database.disk; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; + import io.micrometer.core.instrument.MeterRegistry; import io.netty5.buffer.api.BufferAllocator; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.LuceneRocksDBManager; +import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; @@ -130,7 +133,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { ); } }) - .subscribeOn(Schedulers.boundedElastic()); + .transform(LuceneUtils::scheduleLucene); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 203eb91..6b23566 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -27,6 +27,8 @@ import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLUpdateFields; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.lucene.LuceneCloseable; +import it.cavallium.dbengine.lucene.LuceneConcurrentMergeScheduler; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.LuceneRocksDBManager; import it.cavallium.dbengine.lucene.LuceneUtils; @@ -80,7 +82,7 @@ import reactor.core.publisher.SignalType; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex { +public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex, LuceneCloseable { protected static final Logger logger = LogManager.getLogger(LLLocalLuceneIndex.class); @@ -93,13 +95,13 @@ public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex private static final Scheduler luceneHeavyTasksScheduler = uninterruptibleScheduler(Schedulers.newBoundedElastic( DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - new ShortNamedThreadFactory("heavy-tasks").setDaemon(true).withGroup(new ThreadGroup("lucene-heavy-tasks")), + new LuceneThreadFactory("heavy-tasks").setDaemon(true).withGroup(new ThreadGroup("lucene-heavy-tasks")), Math.toIntExact(Duration.ofHours(1).toSeconds()) )); private static final Scheduler luceneWriteScheduler = uninterruptibleScheduler(Schedulers.newBoundedElastic( DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - new ShortNamedThreadFactory("lucene-write").setDaemon(true).withGroup(new ThreadGroup("lucene-write")), + new LuceneThreadFactory("lucene-write").setDaemon(true).withGroup(new ThreadGroup("lucene-write")), Math.toIntExact(Duration.ofHours(1).toSeconds()) )); private static final Scheduler bulkScheduler = luceneWriteScheduler; @@ -191,13 +193,8 @@ public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex mergeScheduler = new SerialMergeScheduler(); writerSchedulerMaxThreadCount = 1; } else { - ConcurrentMergeScheduler concurrentMergeScheduler; - if (indexWriterConfig.getMergeScheduler() instanceof ConcurrentMergeScheduler defaultScheduler) { - concurrentMergeScheduler = defaultScheduler; - } else { - //noinspection resource - concurrentMergeScheduler = new ConcurrentMergeScheduler(); - } + //noinspection resource + ConcurrentMergeScheduler concurrentMergeScheduler = new LuceneConcurrentMergeScheduler(); // false means SSD, true means HDD boolean spins = false; concurrentMergeScheduler.setDefaultMaxMergesAndThreads(spins); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 8309553..3b8e1ba 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; import com.google.common.collect.Multimap; import io.micrometer.core.instrument.MeterRegistry; @@ -17,6 +18,7 @@ import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.LuceneRocksDBManager; import it.cavallium.dbengine.lucene.LuceneUtils; @@ -60,7 +62,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; import reactor.core.scheduler.Schedulers; -public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneIndex { +public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneIndex, LuceneCloseable { private static final Logger LOG = LogManager.getLogger(LLLuceneIndex.class); private static final boolean BYPASS_GROUPBY_BUG = Boolean.parseBoolean(System.getProperty( @@ -341,8 +343,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI .stream() .map(part -> Mono .fromRunnable(part::close) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .publishOn(Schedulers.parallel()) + .transform(LuceneUtils::scheduleLucene) ) .iterator(); var indicesCloseMono = Mono.whenDelayError(it); @@ -353,8 +354,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI closeable.close(); } return null; - }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) - .publishOn(Schedulers.parallel()) + }).transform(LuceneUtils::scheduleLucene)) .then() .transform(LLUtils::handleDiscard) .block(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java b/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java index 4b88319..ca0909c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.DiscardingCloseable; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.utils.SimpleResource; import java.io.Closeable; import java.io.IOException; @@ -12,7 +13,7 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.search.IndexSearcher; import org.jetbrains.annotations.Nullable; -public class LuceneIndexSnapshot extends SimpleResource implements DiscardingCloseable { +public class LuceneIndexSnapshot extends SimpleResource implements DiscardingCloseable, LuceneCloseable { private final IndexCommit snapshot; private boolean initialized; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LuceneThreadFactory.java b/src/main/java/it/cavallium/dbengine/database/disk/LuceneThreadFactory.java new file mode 100644 index 0000000..7ad0f3c --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/LuceneThreadFactory.java @@ -0,0 +1,27 @@ +package it.cavallium.dbengine.database.disk; + +import it.cavallium.dbengine.lucene.LuceneThread; +import it.cavallium.dbengine.utils.ShortNamedThreadFactory; +import java.util.Locale; +import org.jetbrains.annotations.NotNull; + +public class LuceneThreadFactory extends ShortNamedThreadFactory { + + /** + * Creates a new {@link ShortNamedThreadFactory} instance + * + * @param threadNamePrefix the name prefix assigned to each thread created. + */ + public LuceneThreadFactory(String threadNamePrefix) { + super(threadNamePrefix); + } + + @Override + public Thread newThread(@NotNull Runnable r) { + final Thread t = new LuceneThread(group, r, String.format(Locale.ROOT, "%s-%d", + this.threadNamePrefix, threadNumber.getAndIncrement()), 0); + t.setDaemon(daemon); + t.setPriority(Thread.NORM_PRIORITY); + return t; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/SimpleIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/SimpleIndexSearcherManager.java index a37c972..f64e7de 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/SimpleIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/SimpleIndexSearcherManager.java @@ -1,12 +1,15 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.lucene.LuceneCloseable; +import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.utils.SimpleResource; import java.io.IOException; import java.io.UncheckedIOException; @@ -37,12 +40,12 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; // todo: deduplicate code between Cached and Simple searcher managers -public class SimpleIndexSearcherManager extends SimpleResource implements IndexSearcherManager { +public class SimpleIndexSearcherManager extends SimpleResource implements IndexSearcherManager, LuceneCloseable { private static final Logger LOG = LogManager.getLogger(SimpleIndexSearcherManager.class); private static final ExecutorService SEARCH_EXECUTOR = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors(), - new ShortNamedThreadFactory("lucene-search") + new LuceneThreadFactory("lucene-search") .setDaemon(true).withGroup(new ThreadGroup("lucene-search")) ); private static final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR); @@ -145,8 +148,7 @@ public class SimpleIndexSearcherManager extends SimpleResource implements IndexS throw ex; } }) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .publishOn(Schedulers.parallel()); + .transform(LuceneUtils::scheduleLucene); } @Override @@ -186,7 +188,7 @@ public class SimpleIndexSearcherManager extends SimpleResource implements IndexS return activeRefreshes.get(); } - private class MainIndexSearcher extends LLIndexSearcherImpl { + private class MainIndexSearcher extends LLIndexSearcherImpl implements LuceneCloseable { public MainIndexSearcher(IndexSearcher indexSearcher) { super(indexSearcher, () -> releaseOnCleanup(searcherManager, indexSearcher)); @@ -224,7 +226,7 @@ public class SimpleIndexSearcherManager extends SimpleResource implements IndexS } } - private class OnDemandIndexSearcher extends LLIndexSearcher { + private class OnDemandIndexSearcher extends LLIndexSearcher implements LuceneCloseable { private final SearcherManager searcherManager; private final Similarity similarity; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java b/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java index 23954e8..ae159bf 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java @@ -1,8 +1,10 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.utils.SimpleResource; import java.io.IOException; import java.io.UncheckedIOException; @@ -50,8 +52,7 @@ public class SnapshotsManager extends SimpleResource { public Mono takeSnapshot() { return Mono .fromCallable(() -> takeLuceneSnapshot()) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .publishOn(Schedulers.parallel()); + .transform(LuceneUtils::scheduleLucene); } /** @@ -97,7 +98,7 @@ public class SnapshotsManager extends SimpleResource { } finally { activeTasks.arriveAndDeregister(); } - }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())).publishOn(Schedulers.parallel()); + }).transform(LuceneUtils::scheduleLucene); } /** diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java index 29e9ee4..d7c8e26 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database.memory; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; + import io.micrometer.core.instrument.MeterRegistry; import io.netty5.buffer.api.BufferAllocator; import it.cavallium.dbengine.database.LLDatabaseConnection; @@ -8,6 +10,7 @@ import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex; import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; import it.cavallium.dbengine.lucene.LuceneHacks; +import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory; import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; @@ -100,7 +103,7 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { null ); }) - .subscribeOn(Schedulers.boundedElastic()); + .transform(LuceneUtils::scheduleLucene); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/CheckIndexInput.java b/src/main/java/it/cavallium/dbengine/lucene/CheckIndexInput.java new file mode 100644 index 0000000..fa4a282 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/CheckIndexInput.java @@ -0,0 +1,111 @@ +package it.cavallium.dbengine.lucene; + +import static it.cavallium.dbengine.lucene.LuceneUtils.warnLuceneThread; + +import java.io.IOException; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.RandomAccessInput; + +public class CheckIndexInput extends IndexInput { + + private final IndexInput input; + + public CheckIndexInput(IndexInput input) { + super(input.toString()); + this.input = input; + } + + private void checkThread() { + LuceneUtils.checkLuceneThread(); + } + + @Override + public void close() throws IOException { + warnLuceneThread(); + input.close(); + } + + @Override + public long getFilePointer() { + checkThread(); + return input.getFilePointer(); + } + + @Override + public void seek(long pos) throws IOException { + checkThread(); + input.seek(pos); + } + + @Override + public long length() { + checkThread(); + return input.length(); + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + checkThread(); + return input.slice(sliceDescription, offset, length); + } + + @Override + public byte readByte() throws IOException { + checkThread(); + return input.readByte(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + checkThread(); + input.readBytes(b, offset, len); + } + + @Override + public void skipBytes(long numBytes) throws IOException { + checkThread(); + input.skipBytes(numBytes); + } + + @Override + public IndexInput clone() { + return new CheckIndexInput(input.clone()); + } + + @Override + public String toString() { + checkThread(); + return input.toString(); + } + + @Override + public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException { + var ras = input.randomAccessSlice(offset, length); + return new RandomAccessInput() { + @Override + public byte readByte(long pos) throws IOException { + checkThread(); + return ras.readByte(pos); + } + + @Override + public short readShort(long pos) throws IOException { + checkThread(); + return ras.readShort(pos); + } + + @Override + public int readInt(long pos) throws IOException { + checkThread(); + return ras.readInt(pos); + } + + @Override + public long readLong(long pos) throws IOException { + checkThread(); + return ras.readLong(pos); + } + }; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/CheckIndexOutput.java b/src/main/java/it/cavallium/dbengine/lucene/CheckIndexOutput.java new file mode 100644 index 0000000..2a4173f --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/CheckIndexOutput.java @@ -0,0 +1,60 @@ +package it.cavallium.dbengine.lucene; + +import static it.cavallium.dbengine.lucene.LuceneUtils.warnLuceneThread; + +import java.io.IOException; +import org.apache.lucene.store.IndexOutput; + +public class CheckIndexOutput extends IndexOutput { + + private final IndexOutput output; + + public CheckIndexOutput(IndexOutput output) { + super(output.toString(), output.getName()); + this.output = output; + } + + private void checkThread() { + LuceneUtils.checkLuceneThread(); + } + + @Override + public void close() throws IOException { + warnLuceneThread(); + output.close(); + } + + @Override + public long getFilePointer() { + checkThread(); + return output.getFilePointer(); + } + + @Override + public long getChecksum() throws IOException { + checkThread(); + return output.getChecksum(); + } + + @Override + public void writeByte(byte b) throws IOException { + checkThread(); + output.writeByte(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + checkThread(); + output.writeBytes(b, offset, length); + } + + @Override + public String getName() { + return output.getName(); + } + + @Override + public String toString() { + return output.toString(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/CheckOutputDirectory.java b/src/main/java/it/cavallium/dbengine/lucene/CheckOutputDirectory.java new file mode 100644 index 0000000..b835ef8 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/CheckOutputDirectory.java @@ -0,0 +1,89 @@ +package it.cavallium.dbengine.lucene; + +import static it.cavallium.dbengine.lucene.LuceneUtils.warnLuceneThread; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; + +public class CheckOutputDirectory extends Directory { + + private final Directory directory; + + public CheckOutputDirectory(Directory directory) { + this.directory = directory; + } + + @Override + public String[] listAll() throws IOException { + return directory.listAll(); + } + + @Override + public void deleteFile(String name) throws IOException { + directory.deleteFile(name); + } + + @Override + public long fileLength(String name) throws IOException { + return directory.fileLength(name); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + LuceneUtils.checkLuceneThread(); + return new CheckIndexOutput(directory.createOutput(name, context)); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { + LuceneUtils.checkLuceneThread(); + return new CheckIndexOutput(directory.createTempOutput(prefix, suffix, context)); + } + + @Override + public void sync(Collection names) throws IOException { + LuceneUtils.checkLuceneThread(); + directory.sync(names); + } + + @Override + public void syncMetaData() throws IOException { + LuceneUtils.checkLuceneThread(); + directory.syncMetaData(); + } + + @Override + public void rename(String source, String dest) throws IOException { + LuceneUtils.checkLuceneThread(); + directory.rename(source, dest); + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + LuceneUtils.checkLuceneThread(); + return new CheckIndexInput(directory.openInput(name, context)); + } + + @Override + public Lock obtainLock(String name) throws IOException { + LuceneUtils.checkLuceneThread(); + return directory.obtainLock(name); + } + + @Override + public void close() throws IOException { + warnLuceneThread(); + directory.close(); + } + + @Override + public Set getPendingDeletions() throws IOException { + return directory.getPendingDeletions(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneCloseable.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneCloseable.java new file mode 100644 index 0000000..b4c968d --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneCloseable.java @@ -0,0 +1,8 @@ +package it.cavallium.dbengine.lucene; + +import it.cavallium.dbengine.database.SafeCloseable; + +/** + * This closeable should be run on a lucene thread + */ +public interface LuceneCloseable extends SafeCloseable {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneConcurrentMergeScheduler.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneConcurrentMergeScheduler.java new file mode 100644 index 0000000..3054f5a --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneConcurrentMergeScheduler.java @@ -0,0 +1,33 @@ +package it.cavallium.dbengine.lucene; + +import java.io.IOException; +import org.apache.lucene.index.ConcurrentMergeScheduler; +import org.apache.lucene.index.MergePolicy.OneMerge; + +public class LuceneConcurrentMergeScheduler extends ConcurrentMergeScheduler { + + public LuceneConcurrentMergeScheduler() { + super(); + } + + @Override + protected synchronized MergeThread getMergeThread(MergeSource mergeSource, OneMerge merge) throws IOException { + final MergeThread thread = new LuceneMergeThread(mergeSource, merge); + thread.setDaemon(true); + thread.setName("lucene-merge-" + mergeThreadCount++); + return thread; + } + + public class LuceneMergeThread extends MergeThread { + + /** + * Sole constructor. + * + * @param mergeSource + * @param merge + */ + public LuceneMergeThread(MergeSource mergeSource, OneMerge merge) { + super(mergeSource, merge); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneThread.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneThread.java new file mode 100644 index 0000000..323450b --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneThread.java @@ -0,0 +1,10 @@ +package it.cavallium.dbengine.lucene; + +import org.jetbrains.annotations.NotNull; + +public class LuceneThread extends Thread { + + public LuceneThread(ThreadGroup group, @NotNull Runnable runnable, String name, int stackSize) { + super(group, runnable, name, stackSize); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index e1095b1..fd42f49 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -1,8 +1,9 @@ package it.cavallium.dbengine.lucene; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; -import static it.cavallium.dbengine.database.LLUtils.singleOrClose; import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE; +import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE; +import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -24,6 +25,8 @@ import it.cavallium.dbengine.database.collections.ValueGetter; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLIndexSearchers.UnshardedIndexSearchers; +import it.cavallium.dbengine.database.disk.LuceneThreadFactory; +import it.cavallium.dbengine.lucene.LuceneConcurrentMergeScheduler.LuceneMergeThread; import it.cavallium.dbengine.lucene.analyzer.LegacyWordAnalyzer; import it.cavallium.dbengine.lucene.analyzer.NCharGramAnalyzer; import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer; @@ -33,7 +36,6 @@ import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer; import it.cavallium.dbengine.lucene.directory.RocksdbDirectory; import it.cavallium.dbengine.lucene.mlt.BigCompositeReader; import it.cavallium.dbengine.lucene.mlt.MultiMoreLikeThis; -import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher; import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; @@ -119,6 +121,7 @@ import org.novasearch.lucene.search.similarities.LtcSimilarity; import org.novasearch.lucene.search.similarities.RobertsonSimilarity; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.concurrent.Queues; @@ -170,6 +173,13 @@ public class LuceneUtils { Nullabledouble.empty() ); + private static final Scheduler LUCENE_COMMON_SCHEDULER = uninterruptibleScheduler(Schedulers.newBoundedElastic( + DEFAULT_BOUNDED_ELASTIC_SIZE, + DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + new LuceneThreadFactory("lucene-common").setDaemon(true).withGroup(new ThreadGroup("lucene-common")), + Math.toIntExact(Duration.ofHours(1).toSeconds()) + )); + static { var cas = new CharArraySet( EnglishAnalyzer.ENGLISH_STOP_WORDS_SET.size() + ItalianAnalyzer.getDefaultStopSet().size(), true); @@ -400,7 +410,7 @@ public class LuceneUtils { boolean preserveOrder) { if (preserveOrder) { return hitsFlux - .publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .publishOn(LuceneUtils.luceneScheduler()) .mapNotNull(hit -> mapHitBlocking(hit, indexSearchers, keyFieldName)) .publishOn(Schedulers.parallel()); } else { @@ -423,7 +433,7 @@ public class LuceneUtils { //noinspection unchecked return (List) (List) shardHits; } - }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) + }).subscribeOn(luceneScheduler())) .flatMapIterable(a -> a) .publishOn(Schedulers.parallel()); } @@ -608,8 +618,9 @@ public class LuceneUtils { String directoryName, LuceneRocksDBManager rocksDBManager) throws IOException { + Directory directory; if (directoryOptions instanceof ByteBuffersDirectory) { - return new org.apache.lucene.store.ByteBuffersDirectory(); + directory = new org.apache.lucene.store.ByteBuffersDirectory(); } else if (directoryOptions instanceof DirectIOFSDirectory directIOFSDirectory) { FSDirectory delegateDirectory = (FSDirectory) createLuceneDirectory(directIOFSDirectory.delegate(), directoryName, @@ -619,32 +630,32 @@ public class LuceneUtils { try { int mergeBufferSize = directIOFSDirectory.mergeBufferSize().orElse(DirectIODirectory.DEFAULT_MERGE_BUFFER_SIZE); long minBytesDirect = directIOFSDirectory.minBytesDirect().orElse(DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT); - return new DirectIODirectory(delegateDirectory, mergeBufferSize, minBytesDirect); + directory = new DirectIODirectory(delegateDirectory, mergeBufferSize, minBytesDirect); } catch (UnsupportedOperationException ex) { logger.warn("Failed to open FSDirectory with DIRECT flag", ex); - return delegateDirectory; + directory = delegateDirectory; } } else { logger.warn("Failed to open FSDirectory with DIRECT flag because the operating system is Windows"); - return delegateDirectory; + directory = delegateDirectory; } } else if (directoryOptions instanceof MemoryMappedFSDirectory memoryMappedFSDirectory) { - return new MMapDirectory(memoryMappedFSDirectory.managedPath().resolve(directoryName + ".lucene.db")); + directory = new MMapDirectory(memoryMappedFSDirectory.managedPath().resolve(directoryName + ".lucene.db")); } else if (directoryOptions instanceof NIOFSDirectory niofsDirectory) { - return new org.apache.lucene.store.NIOFSDirectory(niofsDirectory + directory = new org.apache.lucene.store.NIOFSDirectory(niofsDirectory .managedPath() .resolve(directoryName + ".lucene.db")); } else if (directoryOptions instanceof RAFFSDirectory rafFsDirectory) { - return new RAFDirectory(rafFsDirectory.managedPath().resolve(directoryName + ".lucene.db")); + directory = new RAFDirectory(rafFsDirectory.managedPath().resolve(directoryName + ".lucene.db")); } else if (directoryOptions instanceof NRTCachingDirectory nrtCachingDirectory) { var delegateDirectory = createLuceneDirectory(nrtCachingDirectory.delegate(), directoryName, rocksDBManager); - return new org.apache.lucene.store.NRTCachingDirectory(delegateDirectory, + directory = new org.apache.lucene.store.NRTCachingDirectory(delegateDirectory, toMB(nrtCachingDirectory.maxMergeSizeBytes()), toMB(nrtCachingDirectory.maxCachedBytes()) ); } else if (directoryOptions instanceof RocksDBSharedDirectory rocksDBSharedDirectory) { var dbInstance = rocksDBManager.getOrCreate(rocksDBSharedDirectory.managedPath()); - return new RocksdbDirectory(rocksDBManager.getAllocator(), + directory = new RocksdbDirectory(rocksDBManager.getAllocator(), dbInstance.db(), dbInstance.handles(), directoryName, @@ -652,15 +663,16 @@ public class LuceneUtils { ); } else if (directoryOptions instanceof RocksDBStandaloneDirectory rocksDBStandaloneDirectory) { var dbInstance = rocksDBManager.getOrCreate(rocksDBStandaloneDirectory.managedPath()); - return new RocksdbDirectory(rocksDBManager.getAllocator(), + directory = new RocksdbDirectory(rocksDBManager.getAllocator(), dbInstance.db(), dbInstance.handles(), directoryName, rocksDBStandaloneDirectory.blockSize() ); - }else { + } else { throw new UnsupportedOperationException("Unsupported directory: " + directoryName + ", " + directoryOptions); } + return new CheckOutputDirectory(directory); } public static Optional getManagedPath(LuceneDirectoryOptions directoryOptions) { @@ -778,7 +790,7 @@ public class LuceneUtils { try (UnshardedIndexSearchers indexSearchers = LLIndexSearchers.unsharded(indexSearcher)) { return Mono .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .transform(LuceneUtils::scheduleLucene) .flatMap(queryParams2 -> localSearcher.collect(indexSearcherMono, queryParams2, keyFieldName, NO_REWRITE)); } @@ -796,10 +808,49 @@ public class LuceneUtils { return Mono.usingWhen(indexSearchersMono, indexSearchers -> Mono .fromCallable(() -> transformer.rewrite(indexSearchers, queryParams)) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .transform(LuceneUtils::scheduleLucene) .flatMap(queryParams2 -> multiSearcher.collectMulti(indexSearchersMono, queryParams2, keyFieldName, NO_REWRITE)), LLUtils::finalizeResource ); } + + public static void checkLuceneThread() { + var thread = Thread.currentThread(); + if (!isLuceneThread()) { + throw printLuceneThreadWarning(thread); + } + } + + @SuppressWarnings("ThrowableNotThrown") + public static void warnLuceneThread() { + var thread = Thread.currentThread(); + if (!isLuceneThread()) { + printLuceneThreadWarning(thread); + } + } + + private static IllegalStateException printLuceneThreadWarning(Thread thread) { + var error = new IllegalStateException("Current thread is not a lucene thread: " + thread.getId() + " " + thread + + ". Schedule it using LuceneUtils.luceneScheduler()"); + logger.warn("Current thread is not a lucene thread: {} {}", thread.getId(), thread, error); + return error; + } + + public static boolean isLuceneThread() { + var thread = Thread.currentThread(); + return thread instanceof LuceneThread || thread instanceof LuceneMergeThread; + } + + public static Scheduler luceneScheduler() { + return LUCENE_COMMON_SCHEDULER; + } + + public static Mono scheduleLucene(Mono prev) { + return prev.subscribeOn(LUCENE_COMMON_SCHEDULER).publishOn(Schedulers.parallel()); + } + + public static Flux scheduleLucene(Flux prev) { + return prev.subscribeOn(LUCENE_COMMON_SCHEDULER).publishOn(Schedulers.parallel()); + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java index dfcf607..ea32944 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; import static it.cavallium.dbengine.database.LLUtils.singleOrClose; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE; import io.netty5.util.Send; @@ -61,7 +62,7 @@ public class CountMultiSearcher implements MultiSearcher { var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount); - return new LuceneSearchResult(totalHitsCount, Flux.empty(), null); + return new LuceneSearchResult(totalHitsCount, Flux.empty()); }) .doOnDiscard(LuceneSearchResult.class, luceneSearchResult -> luceneSearchResult.close()), LLUtils::finalizeResource); @@ -79,10 +80,10 @@ public class CountMultiSearcher implements MultiSearcher { return Mono.usingWhen(indexSearcherMono, indexSearcher -> Mono.fromCallable(() -> { LLUtils.ensureBlocking(); return (long) indexSearcher.getIndexSearcher().count(queryParams.query()); - }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())), LLUtils::finalizeResource) + }).subscribeOn(luceneScheduler()), LLUtils::finalizeResource) .publishOn(Schedulers.parallel()) .transform(TimeoutUtil.timeoutMono(queryParams.timeout())) - .map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), null)); + .map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty())); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java index 60749d6..6e40b61 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java @@ -1,10 +1,12 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; import io.netty5.util.Send; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.collector.DecimalBucketMultiCollectorManager; import java.util.List; @@ -32,10 +34,9 @@ public class DecimalBucketMultiSearcher { .search(indexSearchers.shards(), bucketParams, queries, normalizationQuery) // Ensure that one result is always returned .single(), indexSearchers -> Mono.fromCallable(() -> { - //noinspection BlockingMethodInNonBlockingContext indexSearchers.close(); return null; - }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))).publishOn(Schedulers.parallel()); + }).transform(LuceneUtils::scheduleLucene)); } private Mono search(Iterable indexSearchers, @@ -58,10 +59,9 @@ public class DecimalBucketMultiSearcher { .flatMap(shard -> Mono.fromCallable(() -> { LLUtils.ensureBlocking(); return cmm.search(shard); - })) + }).subscribeOn(luceneScheduler())) .collectList() - .flatMap(results -> Mono.fromSupplier(() -> cmm.reduce(results))) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) + .flatMap(results -> Mono.fromSupplier(() -> cmm.reduce(results)).subscribeOn(luceneScheduler())) .publishOn(Schedulers.parallel()); }); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneGenerator.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneGenerator.java index 6fa27a8..beed8af 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneGenerator.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneGenerator.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; +import it.cavallium.dbengine.lucene.LuceneUtils; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Iterator; @@ -24,7 +25,7 @@ import reactor.core.scheduler.Schedulers; public class LuceneGenerator implements Supplier { - private static final Scheduler SCHED = uninterruptibleScheduler(Schedulers.boundedElastic()); + private static final Scheduler SCHED = LuceneUtils.luceneScheduler(); private final IndexSearcher shard; private final int shardIndex; private final Query query; @@ -66,7 +67,8 @@ public class LuceneGenerator implements Supplier { return s; } ) - .subscribeOn(SCHED); + .subscribeOn(SCHED) + .publishOn(Schedulers.parallel()); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java index 3dd8cc7..a8e91d4 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java @@ -12,18 +12,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import reactor.core.publisher.Flux; -public final class LuceneSearchResult extends SimpleResource implements DiscardingCloseable { +public class LuceneSearchResult extends SimpleResource implements DiscardingCloseable { private static final Logger logger = LogManager.getLogger(LuceneSearchResult.class); private final TotalHitsCount totalHitsCount; private final Flux results; - private final Runnable onClose; - public LuceneSearchResult(TotalHitsCount totalHitsCount, Flux results, Runnable onClose) { + public LuceneSearchResult(TotalHitsCount totalHitsCount, Flux results) { this.totalHitsCount = totalHitsCount; this.results = results; - this.onClose = onClose; } public TotalHitsCount totalHitsCount() { @@ -58,12 +56,5 @@ public final class LuceneSearchResult extends SimpleResource implements Discardi @Override protected void onClose() { - try { - if (onClose != null) { - onClose.run(); - } - } catch (Throwable ex) { - logger.error("Failed to close onClose", ex); - } } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java index 9f533f0..0271daa 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLocalSearcher.java @@ -2,15 +2,18 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; import static it.cavallium.dbengine.database.LLUtils.singleOrClose; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS; import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT; import io.netty5.util.Send; import io.netty5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.TopDocsCollectorMultiManager; import java.io.IOException; @@ -60,13 +63,7 @@ public class PagedLocalSearcher implements LocalSearcher { indexSearchers.shards(), queryParams, keyFieldName, - () -> { - try { - indexSearchers.close(); - } catch (UncheckedIOException e) { - LOG.error(e); - } - } + () -> indexSearchers.close() )) // Ensure that one LuceneSearchResult is always returned .single()); @@ -103,8 +100,7 @@ public class PagedLocalSearcher implements LocalSearcher { .handle((s, sink) -> this.searchPageSync(queryParams, indexSearchers, pagination, resultsOffset, s, sink)) //defaultIfEmpty(new PageData(new TopDocs(new TotalHits(0, Relation.EQUAL_TO), new ScoreDoc[0]), currentPageInfo)) .single() - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .publishOn(Schedulers.parallel()); + .transform(LuceneUtils::scheduleLucene); } /** @@ -134,7 +130,7 @@ public class PagedLocalSearcher implements LocalSearcher { LocalQueryParams queryParams, String keyFieldName, Runnable onClose) { - return firstResultMono.map(firstResult -> { + return firstResultMono.map(firstResult -> { var totalHitsCount = firstResult.totalHitsCount(); var firstPageHitsFlux = firstResult.firstPageHitsFlux(); var secondPageInfo = firstResult.nextPageInfo(); @@ -142,7 +138,7 @@ public class PagedLocalSearcher implements LocalSearcher { Flux nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo); Flux combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux); - return new LuceneSearchResult(totalHitsCount, combinedFlux, onClose); + return new MyLuceneSearchResult(totalHitsCount, combinedFlux, onClose); }).single(); } @@ -157,12 +153,11 @@ public class PagedLocalSearcher implements LocalSearcher { (s, sink) -> searchPageSync(queryParams, indexSearchers, true, 0, s, sink), s -> {} ) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .publishOn(Schedulers.parallel()) + .subscribeOn(luceneScheduler()) .map(pageData -> pageData.topDocs()) .flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs)) - .transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers, - keyFieldName, true)); + .transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers, keyFieldName, true)) + .publishOn(Schedulers.parallel()); } /** @@ -222,4 +217,24 @@ public class PagedLocalSearcher implements LocalSearcher { return EMPTY_STATUS; } } + + private static class MyLuceneSearchResult extends LuceneSearchResult implements LuceneCloseable { + + private final Runnable onClose; + + public MyLuceneSearchResult(TotalHitsCount totalHitsCount, Flux combinedFlux, Runnable onClose) { + super(totalHitsCount, combinedFlux); + this.onClose = onClose; + } + + @Override + protected void onClose() { + try { + onClose.run(); + } catch (Throwable ex) { + LOG.error("Failed to close the search result", ex); + } + super.onClose(); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java index e478f6c..3c9f786 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java @@ -2,13 +2,16 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; import static it.cavallium.dbengine.database.LLUtils.singleOrClose; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE; import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT; import io.netty5.util.Send; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.PageLimits; import it.cavallium.dbengine.lucene.collector.ScoringShardsCollectorMultiManager; @@ -53,14 +56,11 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers, keyFieldName, queryParams )) // Compute other results - .map(firstResult -> this.computeOtherResults(firstResult, indexSearchers.shards(), queryParams, keyFieldName, - () -> { - try { - indexSearchers.close(); - } catch (UncheckedIOException e) { - LOG.error("Can't close index searchers", e); - } - } + .map(firstResult -> this.computeOtherResults(firstResult, + indexSearchers.shards(), + queryParams, + keyFieldName, + () -> indexSearchers.close() )) // Ensure that one LuceneSearchResult is always returned .single()); @@ -130,7 +130,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { Flux nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo); Flux combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux); - return new LuceneSearchResult(totalHitsCount, combinedFlux, onClose); + return new MyLuceneSearchResult(totalHitsCount, combinedFlux, onClose); } /** @@ -150,8 +150,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { .doOnNext(s -> currentPageInfoRef.set(s.nextPageInfo())) .repeatWhen(s -> s.takeWhile(n -> n > 0)); }) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .publishOn(Schedulers.parallel()) + .transform(LuceneUtils::scheduleLucene) .map(pageData -> pageData.topDocs()) .flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs)) .transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers, @@ -187,7 +186,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { return null; } }) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(luceneScheduler()) .flatMap(cmm -> Flux .fromIterable(indexSearchers) .index() @@ -200,8 +199,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { var cm = cmm.get(shard, index); return shard.search(queryParams.query(), cm); - }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) - .publishOn(Schedulers.parallel()) + }).subscribeOn(luceneScheduler())) .collectList() .flatMap(results -> Mono.fromCallable(() -> { LLUtils.ensureBlocking(); @@ -217,13 +215,33 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { var nextPageIndex = s.pageIndex() + 1; var nextPageInfo = new CurrentPageInfo(pageLastDoc, nextRemainingLimit, nextPageIndex); return new PageData(pageTopDocs, nextPageInfo); - }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) - ) - .publishOn(Schedulers.parallel()); + }).subscribeOn(luceneScheduler())) + ).publishOn(Schedulers.parallel()); } @Override public String getName() { return "scored paged multi"; } + + + private static class MyLuceneSearchResult extends LuceneSearchResult implements LuceneCloseable { + + private final Runnable onClose; + + public MyLuceneSearchResult(TotalHitsCount totalHitsCount, Flux combinedFlux, Runnable onClose) { + super(totalHitsCount, combinedFlux); + this.onClose = onClose; + } + + @Override + protected void onClose() { + try { + onClose.run(); + } catch (Throwable ex) { + LOG.error("Failed to close the search result", ex); + } + super.onClose(); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java index 9f86e18..d35e866 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java @@ -1,18 +1,18 @@ package it.cavallium.dbengine.lucene.searcher; -import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; import static it.cavallium.dbengine.database.LLUtils.singleOrClose; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; -import io.netty5.util.Send; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.FullDocs; import it.cavallium.dbengine.lucene.LLScoreDoc; import it.cavallium.dbengine.lucene.hugepq.search.HugePqFullScoreDocCollector; -import java.io.IOException; import java.io.UncheckedIOException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -79,7 +79,7 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher { collector.close(); throw ex; } - }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) + }).subscribeOn(luceneScheduler())) .collectList() .flatMap(collectors -> Mono.fromCallable(() -> { try { @@ -91,9 +91,8 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher { } throw ex; } - })) - ) - .publishOn(Schedulers.parallel()); + }).subscribeOn(luceneScheduler())) + ).publishOn(Schedulers.parallel()); } /** @@ -111,18 +110,7 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher { indexSearchers.shards(), keyFieldName, true) .take(queryParams.limitLong(), true); - return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> { - try { - indexSearchers.close(); - } catch (UncheckedIOException e) { - LOG.error("Can't close index searchers", e); - } - try { - data.close(); - } catch (Exception e) { - LOG.error("Failed to discard data", e); - } - }); + return new MyLuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers, data); }); } @@ -130,4 +118,33 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher { public String getName() { return "sorted by score full multi"; } + + private static class MyLuceneSearchResult extends LuceneSearchResult implements LuceneCloseable { + + private final LLIndexSearchers indexSearchers; + private final FullDocs data; + + public MyLuceneSearchResult(TotalHitsCount totalHitsCount, Flux hitsFlux, + LLIndexSearchers indexSearchers, + FullDocs data) { + super(totalHitsCount, hitsFlux); + this.indexSearchers = indexSearchers; + this.data = data; + } + + @Override + protected void onClose() { + try { + indexSearchers.close(); + } catch (Throwable e) { + LOG.error("Can't close index searchers", e); + } + try { + data.close(); + } catch (Throwable e) { + LOG.error("Failed to discard data", e); + } + super.onClose(); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java index 85310ff..d2736a9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedScoredFullMultiSearcher.java @@ -2,14 +2,17 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; import static it.cavallium.dbengine.database.LLUtils.singleOrClose; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; import io.netty5.util.Send; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; import it.cavallium.dbengine.lucene.FullDocs; import it.cavallium.dbengine.lucene.LLFieldDoc; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.hugepq.search.HugePqFullFieldDocCollector; import java.io.IOException; @@ -61,6 +64,7 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher { return HugePqFullFieldDocCollector.createSharedManager(env, queryParams.sort(), queryParams.limitInt(), totalHitsThreshold); }) + .subscribeOn(luceneScheduler()) .>flatMap(sharedManager -> Flux .fromIterable(indexSearchers) .flatMap(shard -> Mono.fromCallable(() -> { @@ -77,7 +81,7 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher { collector.close(); throw ex; } - })) + }).subscribeOn(luceneScheduler())) .collectList() .flatMap(collectors -> Mono.fromCallable(() -> { try { @@ -89,9 +93,8 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher { } throw ex; } - })) + }).subscribeOn(luceneScheduler())) ) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) .publishOn(Schedulers.parallel()); } @@ -110,14 +113,7 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher { indexSearchers.shards(), keyFieldName, true) .take(queryParams.limitLong(), true); - return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> { - try { - indexSearchers.close(); - } catch (UncheckedIOException e) { - LOG.error("Can't close index searchers", e); - } - data.close(); - }); + return new MyLuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers, data); }); } @@ -125,4 +121,34 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher { public String getName() { return "sorted scored full multi"; } + + private static class MyLuceneSearchResult extends LuceneSearchResult implements LuceneCloseable { + + private final LLIndexSearchers indexSearchers; + private final FullDocs data; + + public MyLuceneSearchResult(TotalHitsCount totalHitsCount, + Flux hitsFlux, + LLIndexSearchers indexSearchers, + FullDocs data) { + super(totalHitsCount, hitsFlux); + this.indexSearchers = indexSearchers; + this.data = data; + } + + @Override + protected void onClose() { + try { + indexSearchers.close(); + } catch (Throwable e) { + LOG.error("Can't close index searchers", e); + } + try { + data.close(); + } catch (Throwable e) { + LOG.error("Failed to discard data", e); + } + super.onClose(); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java index cbc72b2..c087b4f 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java @@ -2,11 +2,15 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; import static it.cavallium.dbengine.database.LLUtils.singleOrClose; +import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; +import static it.cavallium.dbengine.lucene.LuceneUtils.sum; import static java.util.Objects.requireNonNull; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneUtils; import java.io.IOException; import java.io.UncheckedIOException; @@ -60,61 +64,50 @@ public class StandardSearcher implements MultiSearcher { LLUtils.ensureBlocking(); var totalHitsThreshold = queryParams.getTotalHitsThresholdInt(); if (queryParams.isSorted() && !queryParams.isSortedByScore()) { - return TopFieldCollector.createSharedManager(queryParams.sort(), queryParams.limitInt(), null, - totalHitsThreshold); + return TopFieldCollector.createSharedManager(queryParams.sort(), + queryParams.limitInt(), null, totalHitsThreshold); } else { return TopScoreDocCollector.createSharedManager(queryParams.limitInt(), null, totalHitsThreshold); } }) - .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) - .flatMap(sharedManager -> Flux - .fromIterable(indexSearchers) - .>handle((shard, sink) -> { - LLUtils.ensureBlocking(); - try { - var collector = sharedManager.newCollector(); - assert queryParams.computePreciseHitsCount() == null || (queryParams.computePreciseHitsCount() - == collector.scoreMode().isExhaustive()); + .transform(LuceneUtils::scheduleLucene) + .flatMap(sharedManager -> Flux.fromIterable(indexSearchers).flatMapSequential(shard -> Mono.fromCallable(() -> { + LLUtils.ensureBlocking(); + var collector = sharedManager.newCollector(); + assert queryParams.computePreciseHitsCount() == null || (queryParams.computePreciseHitsCount() == collector + .scoreMode().isExhaustive()); - shard.search(queryParams.query(), LuceneUtils.withTimeout(collector, queryParams.timeout())); - sink.next(collector); - } catch (IOException e) { - sink.error(e); + shard.search(queryParams.query(), LuceneUtils.withTimeout(collector, queryParams.timeout())); + return collector; + }).subscribeOn(luceneScheduler())).collectList().flatMap(collectors -> Mono.fromCallable(() -> { + LLUtils.ensureBlocking(); + if (collectors.size() <= 1) { + return sharedManager.reduce((List) collectors); + } else if (queryParams.isSorted() && !queryParams.isSortedByScore()) { + final TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()]; + int i = 0; + for (var collector : collectors) { + var topFieldDocs = ((TopFieldCollector) collector).topDocs(); + for (ScoreDoc scoreDoc : topFieldDocs.scoreDocs) { + scoreDoc.shardIndex = i; } - }) - .collectList() - .handle((collectors, sink) -> { - LLUtils.ensureBlocking(); - try { - if (collectors.size() <= 1) { - sink.next(sharedManager.reduce((List) collectors)); - } else if (queryParams.isSorted() && !queryParams.isSortedByScore()) { - final TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()]; - int i = 0; - for (var collector : collectors) { - var topFieldDocs = ((TopFieldCollector) collector).topDocs(); - for (ScoreDoc scoreDoc : topFieldDocs.scoreDocs) { - scoreDoc.shardIndex = i; - } - topDocs[i++] = topFieldDocs; - } - sink.next(TopDocs.merge(requireNonNull(queryParams.sort()), 0, queryParams.limitInt(), topDocs)); - } else { - final TopDocs[] topDocs = new TopDocs[collectors.size()]; - int i = 0; - for (var collector : collectors) { - var topScoreDocs = collector.topDocs(); - for (ScoreDoc scoreDoc : topScoreDocs.scoreDocs) { - scoreDoc.shardIndex = i; - } - topDocs[i++] = topScoreDocs; - } - sink.next(TopDocs.merge(0, queryParams.limitInt(), topDocs)); - } - } catch (IOException ex) { - sink.error(ex); + topDocs[i++] = topFieldDocs; + } + return TopDocs.merge(requireNonNull(queryParams.sort()), 0, queryParams.limitInt(), topDocs); + } else { + final TopDocs[] topDocs = new TopDocs[collectors.size()]; + int i = 0; + for (var collector : collectors) { + var topScoreDocs = collector.topDocs(); + for (ScoreDoc scoreDoc : topScoreDocs.scoreDocs) { + scoreDoc.shardIndex = i; } - })); + topDocs[i++] = topScoreDocs; + } + return TopDocs.merge(0, queryParams.limitInt(), topDocs); + } + }).subscribeOn(luceneScheduler()))) + .publishOn(Schedulers.parallel()); } /** @@ -133,13 +126,7 @@ public class StandardSearcher implements MultiSearcher { .skip(queryParams.offsetLong()) .take(queryParams.limitLong(), true); - return new LuceneSearchResult(totalHitsCount, hitsFlux, () -> { - try { - indexSearchers.close(); - } catch (UncheckedIOException e) { - LOG.error("Can't close index searchers", e); - } - }); + return new MyLuceneSearchResult(totalHitsCount, hitsFlux, indexSearchers); }); } @@ -147,4 +134,26 @@ public class StandardSearcher implements MultiSearcher { public String getName() { return "standard"; } + + private static class MyLuceneSearchResult extends LuceneSearchResult implements LuceneCloseable { + + private final LLIndexSearchers indexSearchers; + + public MyLuceneSearchResult(TotalHitsCount totalHitsCount, + Flux hitsFlux, + LLIndexSearchers indexSearchers) { + super(totalHitsCount, hitsFlux); + this.indexSearchers = indexSearchers; + } + + @Override + protected void onClose() { + try { + indexSearchers.close(); + } catch (Throwable e) { + LOG.error("Can't close index searchers", e); + } + super.onClose(); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java index c8709eb..b250537 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java @@ -8,6 +8,7 @@ import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.MaxScoreAccumulator; import java.io.IOException; @@ -50,13 +51,7 @@ public class UnsortedStreamingMultiSearcher implements MultiSearcher { var totalHitsCount = new TotalHitsCount(0, false); Flux mergedFluxes = resultsFlux.skip(queryParams.offsetLong()).take(queryParams.limitLong(), true); - return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { - try { - indexSearchers.close(); - } catch (UncheckedIOException e) { - LOG.error("Can't close index searchers", e); - } - }); + return new MyLuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers); })); } @@ -87,4 +82,26 @@ public class UnsortedStreamingMultiSearcher implements MultiSearcher { public String getName() { return "unsorted streaming multi"; } + + private static class MyLuceneSearchResult extends LuceneSearchResult implements LuceneCloseable { + + private final LLIndexSearchers indexSearchers; + + public MyLuceneSearchResult(TotalHitsCount totalHitsCount, + Flux hitsFlux, + LLIndexSearchers indexSearchers) { + super(totalHitsCount, hitsFlux); + this.indexSearchers = indexSearchers; + } + + @Override + protected void onClose() { + try { + indexSearchers.close(); + } catch (Throwable e) { + LOG.error("Can't close index searchers", e); + } + super.onClose(); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/utils/ShortNamedThreadFactory.java b/src/main/java/it/cavallium/dbengine/utils/ShortNamedThreadFactory.java index 722da2c..af89ef1 100644 --- a/src/main/java/it/cavallium/dbengine/utils/ShortNamedThreadFactory.java +++ b/src/main/java/it/cavallium/dbengine/utils/ShortNamedThreadFactory.java @@ -31,18 +31,18 @@ import org.jetbrains.annotations.NotNull; */ public class ShortNamedThreadFactory implements ThreadFactory { - private static int POOL_NUMBERS_COUNT = 50; - private static final AtomicInteger[] threadPoolNumber = new AtomicInteger[POOL_NUMBERS_COUNT]; + protected static int POOL_NUMBERS_COUNT = 50; + protected static final AtomicInteger[] threadPoolNumber = new AtomicInteger[POOL_NUMBERS_COUNT]; static { for (int i = 0; i < threadPoolNumber.length; i++) { threadPoolNumber[i] = new AtomicInteger(1); } } - private ThreadGroup group; - private boolean daemon; - private final AtomicInteger threadNumber = new AtomicInteger(1); - private static final String NAME_PATTERN = "%s-%d"; - private final String threadNamePrefix; + protected ThreadGroup group; + protected boolean daemon; + protected final AtomicInteger threadNumber = new AtomicInteger(1); + protected static final String NAME_PATTERN = "%s-%d"; + protected final String threadNamePrefix; /** * Creates a new {@link ShortNamedThreadFactory} instance diff --git a/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java index 6fb2d0e..6283c12 100644 --- a/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java +++ b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java @@ -10,6 +10,7 @@ import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; @@ -79,14 +80,7 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { .skip(queryParams.offsetLong()) .take(queryParams.limitLong(), true); - return new LuceneSearchResult(totalHitsCount, mergedFluxes, () -> { - resultsToDrop.forEach(SimpleResource::close); - try { - indexSearchers.close(); - } catch (UncheckedIOException e) { - LOG.error("Can't close index searchers", e); - } - }); + return new MyLuceneSearchResult(totalHitsCount, mergedFluxes, resultsToDrop, indexSearchers); }); }); } @@ -106,4 +100,30 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { public String getName() { return "unsorted unscored simple multi"; } + + private static class MyLuceneSearchResult extends LuceneSearchResult implements LuceneCloseable { + + private final List resultsToDrop; + private final LLIndexSearchers indexSearchers; + + public MyLuceneSearchResult(TotalHitsCount totalHitsCount, + Flux mergedFluxes, + List resultsToDrop, + LLIndexSearchers indexSearchers) { + super(totalHitsCount, mergedFluxes); + this.resultsToDrop = resultsToDrop; + this.indexSearchers = indexSearchers; + } + + @Override + protected void onClose() { + resultsToDrop.forEach(SimpleResource::close); + try { + indexSearchers.close(); + } catch (UncheckedIOException e) { + LOG.error("Can't close index searchers", e); + } + super.onClose(); + } + } }