Move reactive collector

This commit is contained in:
Andrea Cavalli 2021-09-25 18:31:41 +02:00
parent f03f7296d4
commit 5d7b403e55
5 changed files with 104 additions and 72 deletions

View File

@ -0,0 +1,32 @@
package it.cavallium.dbengine.lucene.collector;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import reactor.core.publisher.Sinks.Many;
public class ReactiveCollector implements Collector {
private final Many<ScoreDoc> scoreDocsSink;
private int shardIndex;
public ReactiveCollector(Many<ScoreDoc> scoreDocsSink) {
this.scoreDocsSink = scoreDocsSink;
}
@Override
public LeafCollector getLeafCollector(LeafReaderContext leafReaderContext) {
return new ReactiveLeafCollector(leafReaderContext, scoreDocsSink, shardIndex);
}
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
public void setShardIndex(int shardIndex) {
this.shardIndex = shardIndex;
}
}

View File

@ -0,0 +1,26 @@
package it.cavallium.dbengine.lucene.collector;
import java.util.Collection;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.ScoreDoc;
import reactor.core.publisher.Sinks.Many;
public class ReactiveCollectorManager implements CollectorManager<Collector, Void> {
private final Many<ScoreDoc> scoreDocsSink;
public ReactiveCollectorManager(Many<ScoreDoc> scoreDocsSink) {
this.scoreDocsSink = scoreDocsSink;
}
@Override
public ReactiveCollector newCollector() {
return new ReactiveCollector(scoreDocsSink);
}
@Override
public Void reduce(Collection<Collector> collection) {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,43 @@
package it.cavallium.dbengine.lucene.collector;
import java.util.concurrent.locks.LockSupport;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreDoc;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many;
public class ReactiveLeafCollector implements LeafCollector {
private final LeafReaderContext leafReaderContext;
private final Many<ScoreDoc> scoreDocsSink;
private final int shardIndex;
public ReactiveLeafCollector(LeafReaderContext leafReaderContext, Many<ScoreDoc> scoreDocsSink, int shardIndex) {
this.leafReaderContext = leafReaderContext;
this.scoreDocsSink = scoreDocsSink;
this.shardIndex = shardIndex;
}
@Override
public void setScorer(Scorable scorable) {
}
@Override
public void collect(int i) {
var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex);
boolean shouldRetry;
do {
var currentError = scoreDocsSink.tryEmitNext(scoreDoc);
shouldRetry = currentError == EmitResult.FAIL_NON_SERIALIZED || currentError == EmitResult.FAIL_OVERFLOW
|| currentError == EmitResult.FAIL_ZERO_SUBSCRIBER;
if (shouldRetry) {
LockSupport.parkNanos(10);
}
currentError.orThrow();
} while (shouldRetry);
}
}

View File

@ -3,7 +3,6 @@ package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
public interface LuceneMultiSearcher extends LuceneLocalSearcher { public interface LuceneMultiSearcher extends LuceneLocalSearcher {
@ -33,4 +32,5 @@ public interface LuceneMultiSearcher extends LuceneLocalSearcher {
var searchers = indexSearcherMono.map(a -> LLIndexSearchers.unsharded(a).send()); var searchers = indexSearcherMono.map(a -> LLIndexSearchers.unsharded(a).send());
return this.collectMulti(searchers, queryParams, keyFieldName, transformer); return this.collectMulti(searchers, queryParams, keyFieldName, transformer);
} }
} }

View File

@ -1,31 +1,18 @@
package it.cavallium.dbengine.lucene.searcher; package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException; import it.cavallium.dbengine.lucene.collector.ReactiveCollectorManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.SimpleCollector;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks;
@ -69,63 +56,7 @@ public class UnsortedUnscoredContinuousLuceneMultiSearcher implements LuceneMult
Many<ScoreDoc> scoreDocsSink = Sinks.many().unicast().onBackpressureBuffer(QUEUE_SUPPLIER.get()); Many<ScoreDoc> scoreDocsSink = Sinks.many().unicast().onBackpressureBuffer(QUEUE_SUPPLIER.get());
var cm = new CollectorManager<Collector, Void>() { var cm = new ReactiveCollectorManager(scoreDocsSink);
class IterableCollector implements Collector {
private int shardIndex;
@Override
public LeafCollector getLeafCollector(LeafReaderContext leafReaderContext) throws IOException {
return new LeafCollector() {
@Override
public void setScorer(Scorable scorable) throws IOException {
}
@Override
public void collect(int i) throws IOException {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex);
boolean shouldRetry;
do {
var currentError = scoreDocsSink.tryEmitNext(scoreDoc);
shouldRetry = currentError == EmitResult.FAIL_NON_SERIALIZED
|| currentError == EmitResult.FAIL_OVERFLOW
|| currentError == EmitResult.FAIL_ZERO_SUBSCRIBER;
if (shouldRetry) {
LockSupport.parkNanos(10);
}
if (!shouldRetry && currentError.isFailure()) {
currentError.orThrow();
}
} while (shouldRetry);
}
};
}
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
public void setShardIndex(int shardIndex) {
this.shardIndex = shardIndex;
}
}
@Override
public IterableCollector newCollector() {
return new IterableCollector();
}
@Override
public Void reduce(Collection<Collector> collection) {
throw new UnsupportedOperationException();
}
};
AtomicInteger runningTasks = new AtomicInteger(0); AtomicInteger runningTasks = new AtomicInteger(0);
var shards = indexSearchers.shards(); var shards = indexSearchers.shards();