Optimistic emission of results

This commit is contained in:
Andrea Cavalli 2021-09-25 13:26:59 +02:00
parent 8e15020f5b
commit 5443e330bb
1 changed files with 11 additions and 3 deletions

View File

@ -78,11 +78,19 @@ public class UnsortedUnscoredContinuousLuceneMultiSearcher implements LuceneMult
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
var scoreDoc = new ScoreDoc(context.docBase + i, 0, shardIndex);
synchronized (scoreDocsSink) {
while (scoreDocsSink.tryEmitNext(scoreDoc) == EmitResult.FAIL_OVERFLOW) {
boolean shouldRetry;
do {
var currentError = scoreDocsSink.tryEmitNext(scoreDoc);
shouldRetry = currentError == EmitResult.FAIL_NON_SERIALIZED
|| currentError == EmitResult.FAIL_OVERFLOW
|| currentError == EmitResult.FAIL_ZERO_SUBSCRIBER;
if (shouldRetry) {
LockSupport.parkNanos(10);
}
}
if (!shouldRetry && currentError.isFailure()) {
currentError.orThrow();
}
} while (shouldRetry);
}
@Override