diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollector.java new file mode 100644 index 0000000..eb3a92a --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollector.java @@ -0,0 +1,32 @@ +package it.cavallium.dbengine.lucene.collector; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.ScoreMode; +import reactor.core.publisher.Sinks.Many; + +public class ReactiveCollector implements Collector { + + private final Many scoreDocsSink; + private int shardIndex; + + public ReactiveCollector(Many scoreDocsSink) { + this.scoreDocsSink = scoreDocsSink; + } + + @Override + public LeafCollector getLeafCollector(LeafReaderContext leafReaderContext) { + return new ReactiveLeafCollector(leafReaderContext, scoreDocsSink, shardIndex); + } + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE_NO_SCORES; + } + + public void setShardIndex(int shardIndex) { + this.shardIndex = shardIndex; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorManager.java new file mode 100644 index 0000000..c569607 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorManager.java @@ -0,0 +1,26 @@ +package it.cavallium.dbengine.lucene.collector; + +import java.util.Collection; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.ScoreDoc; +import reactor.core.publisher.Sinks.Many; + +public class ReactiveCollectorManager implements CollectorManager { + + private final Many scoreDocsSink; + + public ReactiveCollectorManager(Many scoreDocsSink) { + this.scoreDocsSink = scoreDocsSink; + } + + @Override + public ReactiveCollector newCollector() { + return new ReactiveCollector(scoreDocsSink); + } + + @Override + public Void reduce(Collection collection) { + throw new UnsupportedOperationException(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveLeafCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveLeafCollector.java new file mode 100644 index 0000000..0e8e65c --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveLeafCollector.java @@ -0,0 +1,43 @@ +package it.cavallium.dbengine.lucene.collector; + +import java.util.concurrent.locks.LockSupport; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.ScoreDoc; +import reactor.core.publisher.Sinks.EmitResult; +import reactor.core.publisher.Sinks.Many; + +public class ReactiveLeafCollector implements LeafCollector { + + private final LeafReaderContext leafReaderContext; + private final Many scoreDocsSink; + private final int shardIndex; + + public ReactiveLeafCollector(LeafReaderContext leafReaderContext, Many scoreDocsSink, int shardIndex) { + this.leafReaderContext = leafReaderContext; + this.scoreDocsSink = scoreDocsSink; + this.shardIndex = shardIndex; + } + + @Override + public void setScorer(Scorable scorable) { + + } + + @Override + public void collect(int i) { + var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex); + boolean shouldRetry; + do { + var currentError = scoreDocsSink.tryEmitNext(scoreDoc); + shouldRetry = currentError == EmitResult.FAIL_NON_SERIALIZED || currentError == EmitResult.FAIL_OVERFLOW + || currentError == EmitResult.FAIL_ZERO_SUBSCRIBER; + if (shouldRetry) { + LockSupport.parkNanos(10); + } + currentError.orThrow(); + + } while (shouldRetry); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneMultiSearcher.java index 642ca4e..fb81170 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneMultiSearcher.java @@ -3,7 +3,6 @@ package it.cavallium.dbengine.lucene.searcher; import io.net5.buffer.api.Send; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface LuceneMultiSearcher extends LuceneLocalSearcher { @@ -33,4 +32,5 @@ public interface LuceneMultiSearcher extends LuceneLocalSearcher { var searchers = indexSearcherMono.map(a -> LLIndexSearchers.unsharded(a).send()); return this.collectMulti(searchers, queryParams, keyFieldName, transformer); } + } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java index 08403c9..5881c7e 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java @@ -1,31 +1,18 @@ package it.cavallium.dbengine.lucene.searcher; -import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.lucene.LuceneUtils; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import it.cavallium.dbengine.lucene.collector.ReactiveCollectorManager; import java.util.Queue; -import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; import java.util.function.Supplier; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.search.Collector; -import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.LeafCollector; -import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.SimpleCollector; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -69,63 +56,7 @@ public class UnsortedUnscoredContinuousLuceneMultiSearcher implements LuceneMult Many scoreDocsSink = Sinks.many().unicast().onBackpressureBuffer(QUEUE_SUPPLIER.get()); - var cm = new CollectorManager() { - - class IterableCollector implements Collector { - - private int shardIndex; - - @Override - public LeafCollector getLeafCollector(LeafReaderContext leafReaderContext) throws IOException { - return new LeafCollector() { - @Override - public void setScorer(Scorable scorable) throws IOException { - - } - - @Override - public void collect(int i) throws IOException { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called collect in a nonblocking thread"); - } - var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex); - boolean shouldRetry; - do { - var currentError = scoreDocsSink.tryEmitNext(scoreDoc); - shouldRetry = currentError == EmitResult.FAIL_NON_SERIALIZED - || currentError == EmitResult.FAIL_OVERFLOW - || currentError == EmitResult.FAIL_ZERO_SUBSCRIBER; - if (shouldRetry) { - LockSupport.parkNanos(10); - } - if (!shouldRetry && currentError.isFailure()) { - currentError.orThrow(); - } - } while (shouldRetry); - } - }; - } - - @Override - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE_NO_SCORES; - } - - public void setShardIndex(int shardIndex) { - this.shardIndex = shardIndex; - } - } - - @Override - public IterableCollector newCollector() { - return new IterableCollector(); - } - - @Override - public Void reduce(Collection collection) { - throw new UnsupportedOperationException(); - } - }; + var cm = new ReactiveCollectorManager(scoreDocsSink); AtomicInteger runningTasks = new AtomicInteger(0); var shards = indexSearchers.shards();