Avoid using BufferTimeoutPublisher (it kills lucene Async IO)
This commit is contained in:
parent
2fb96eaf03
commit
74d20204ab
@ -983,9 +983,7 @@ public class LLUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static void discardStage(DatabaseStage<?> stage) {
|
private static void discardStage(DatabaseStage<?> stage) {
|
||||||
if (stage != null && stage.isAccessible()) {
|
// do nothing for now, to avoid double-free problems
|
||||||
stage.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean isDirect(Buffer key) {
|
public static boolean isDirect(Buffer key) {
|
||||||
|
@ -1,159 +0,0 @@
|
|||||||
package it.cavallium.dbengine.database.disk;
|
|
||||||
|
|
||||||
/** Based on:
|
|
||||||
* https://gist.github.com/glandais-sparklane/e38834aa9df0c56f23e2d8d2e6899c78
|
|
||||||
*/
|
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.ScheduledFuture;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
|
||||||
import org.reactivestreams.Publisher;
|
|
||||||
import org.reactivestreams.Subscriber;
|
|
||||||
import org.reactivestreams.Subscription;
|
|
||||||
|
|
||||||
@SuppressWarnings("ReactiveStreamsPublisherImplementation")
|
|
||||||
class BufferTimeOutPublisher<T> implements Publisher<List<T>> {
|
|
||||||
|
|
||||||
private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor();
|
|
||||||
|
|
||||||
private final Publisher<T> source;
|
|
||||||
private final int size;
|
|
||||||
private final long duration;
|
|
||||||
|
|
||||||
public BufferTimeOutPublisher(Publisher<T> source, int size, Duration duration) {
|
|
||||||
this.source = source;
|
|
||||||
this.size = size;
|
|
||||||
this.duration = duration.toMillis();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void subscribe(Subscriber<? super List<T>> subscriber) {
|
|
||||||
subscriber.onSubscribe(new BufferTimeOutSubscription<T>(source, subscriber, size, duration));
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static class BufferTimeOutSubscription<T> implements Subscription, Subscriber<T> {
|
|
||||||
|
|
||||||
private final Subscriber<? super List<T>> subscriber;
|
|
||||||
private final int size;
|
|
||||||
private final long duration;
|
|
||||||
private Subscription subscription;
|
|
||||||
|
|
||||||
private final ReentrantLock lock = new ReentrantLock();
|
|
||||||
|
|
||||||
private List<T> buffer;
|
|
||||||
private ScheduledFuture<?> scheduledFuture;
|
|
||||||
|
|
||||||
private long downstreamRequests = 0;
|
|
||||||
private long downstreamTransmit = 0;
|
|
||||||
|
|
||||||
private long upstreamRequests = 0;
|
|
||||||
private long upstreamTransmit = 0;
|
|
||||||
private boolean upstreamCompleted = false;
|
|
||||||
|
|
||||||
public BufferTimeOutSubscription(Publisher<T> source,
|
|
||||||
Subscriber<? super List<T>> subscriber,
|
|
||||||
int size,
|
|
||||||
long duration) {
|
|
||||||
this.subscriber = subscriber;
|
|
||||||
this.size = size;
|
|
||||||
this.duration = duration;
|
|
||||||
this.buffer = new ArrayList<>(size);
|
|
||||||
source.subscribe(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
// downstream
|
|
||||||
@Override
|
|
||||||
public void request(long n) {
|
|
||||||
lock.lock();
|
|
||||||
downstreamRequests = downstreamRequests + n;
|
|
||||||
|
|
||||||
checkSend();
|
|
||||||
|
|
||||||
long downstreamMax = (downstreamRequests - downstreamTransmit) * size;
|
|
||||||
long upstreamRequested = upstreamRequests - upstreamTransmit;
|
|
||||||
long toRequest = downstreamMax - upstreamRequested;
|
|
||||||
|
|
||||||
if (toRequest > 0) {
|
|
||||||
subscription.request(toRequest);
|
|
||||||
upstreamRequests = upstreamRequests + toRequest;
|
|
||||||
}
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void cancel() {
|
|
||||||
subscription.cancel();
|
|
||||||
}
|
|
||||||
|
|
||||||
// upstream
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(Subscription s) {
|
|
||||||
this.subscription = s;
|
|
||||||
scheduledFuture = EXECUTOR.scheduleAtFixedRate(this::timeout, 0, this.duration, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void timeout() {
|
|
||||||
checkSend();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checkSend() {
|
|
||||||
lock.lock();
|
|
||||||
if (!this.buffer.isEmpty() && downstreamRequests > downstreamTransmit) {
|
|
||||||
List<T> output = prepareOutput();
|
|
||||||
subscriber.onNext(output);
|
|
||||||
downstreamTransmit++;
|
|
||||||
if (!this.buffer.isEmpty()) {
|
|
||||||
checkSend();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (upstreamCompleted && downstreamRequests > downstreamTransmit) {
|
|
||||||
scheduledFuture.cancel(false);
|
|
||||||
subscriber.onComplete();
|
|
||||||
}
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<T> prepareOutput() {
|
|
||||||
if (this.buffer.size() > size) {
|
|
||||||
List<T> output = new ArrayList<>(this.buffer.subList(0, size));
|
|
||||||
this.buffer = new ArrayList<>(this.buffer.subList(size, this.buffer.size()));
|
|
||||||
return output;
|
|
||||||
} else {
|
|
||||||
List<T> output = this.buffer;
|
|
||||||
this.buffer = new ArrayList<>(size);
|
|
||||||
return output;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(T t) {
|
|
||||||
lock.lock();
|
|
||||||
this.buffer.add(t);
|
|
||||||
upstreamTransmit++;
|
|
||||||
if (this.buffer.size() == size) {
|
|
||||||
checkSend();
|
|
||||||
}
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable t) {
|
|
||||||
scheduledFuture.cancel(false);
|
|
||||||
subscriber.onError(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
lock.lock();
|
|
||||||
upstreamCompleted = true;
|
|
||||||
checkSend();
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
@ -1081,7 +1081,7 @@ public class LLLocalDictionary implements LLDictionary {
|
|||||||
Flux<Tuple2<K, Send<Buffer>>> keys,
|
Flux<Tuple2<K, Send<Buffer>>> keys,
|
||||||
boolean existsAlmostCertainly) {
|
boolean existsAlmostCertainly) {
|
||||||
return keys
|
return keys
|
||||||
.transform(normal -> new BufferTimeOutPublisher<>(normal, MULTI_GET_WINDOW, MULTI_GET_WINDOW_TIMEOUT))
|
.buffer(MULTI_GET_WINDOW)
|
||||||
.doOnDiscard(Tuple2.class, discardedEntry -> {
|
.doOnDiscard(Tuple2.class, discardedEntry -> {
|
||||||
//noinspection unchecked
|
//noinspection unchecked
|
||||||
var entry = (Tuple2<K, Buffer>) discardedEntry;
|
var entry = (Tuple2<K, Buffer>) discardedEntry;
|
||||||
|
@ -111,7 +111,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
|||||||
@Override
|
@Override
|
||||||
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLDocument>> documents) {
|
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLDocument>> documents) {
|
||||||
return documents
|
return documents
|
||||||
.transform(normal -> new BufferTimeOutPublisher<>(normal, 512, Duration.ofSeconds(2)))
|
.buffer(512)
|
||||||
.flatMap(inputEntries -> {
|
.flatMap(inputEntries -> {
|
||||||
List<Entry<LLTerm, LLDocument>>[] sortedEntries = new List[luceneIndices.length];
|
List<Entry<LLTerm, LLDocument>>[] sortedEntries = new List[luceneIndices.length];
|
||||||
Mono<Void>[] results = new Mono[luceneIndices.length];
|
Mono<Void>[] results = new Mono[luceneIndices.length];
|
||||||
|
Loading…
Reference in New Issue
Block a user