Use buffertimeoutpublisher

This commit is contained in:
Andrea Cavalli 2021-07-27 01:31:18 +02:00
parent 7abcdf05f9
commit b816e1f9e5
3 changed files with 161 additions and 2 deletions

View File

@ -0,0 +1,159 @@
package it.cavallium.dbengine.database.disk;
/** Based on:
* https://gist.github.com/glandais-sparklane/e38834aa9df0c56f23e2d8d2e6899c78
*/
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@SuppressWarnings("ReactiveStreamsPublisherImplementation")
class BufferTimeOutPublisher<T> implements Publisher<List<T>> {
private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor();
private final Publisher<T> source;
private final int size;
private final long duration;
public BufferTimeOutPublisher(Publisher<T> source, int size, Duration duration) {
this.source = source;
this.size = size;
this.duration = duration.toMillis();
}
@Override
public void subscribe(Subscriber<? super List<T>> subscriber) {
subscriber.onSubscribe(new BufferTimeOutSubscription<T>(source, subscriber, size, duration));
}
protected static class BufferTimeOutSubscription<T> implements Subscription, Subscriber<T> {
private final Subscriber<? super List<T>> subscriber;
private final int size;
private final long duration;
private Subscription subscription;
private final ReentrantLock lock = new ReentrantLock();
private List<T> buffer;
private ScheduledFuture<?> scheduledFuture;
private long downstreamRequests = 0;
private long downstreamTransmit = 0;
private long upstreamRequests = 0;
private long upstreamTransmit = 0;
private boolean upstreamCompleted = false;
public BufferTimeOutSubscription(Publisher<T> source,
Subscriber<? super List<T>> subscriber,
int size,
long duration) {
this.subscriber = subscriber;
this.size = size;
this.duration = duration;
this.buffer = new ArrayList<>(size);
source.subscribe(this);
}
// downstream
@Override
public void request(long n) {
lock.lock();
downstreamRequests = downstreamRequests + n;
checkSend();
long downstreamMax = (downstreamRequests - downstreamTransmit) * size;
long upstreamRequested = upstreamRequests - upstreamTransmit;
long toRequest = downstreamMax - upstreamRequested;
if (toRequest > 0) {
subscription.request(toRequest);
upstreamRequests = upstreamRequests + toRequest;
}
lock.unlock();
}
@Override
public void cancel() {
subscription.cancel();
}
// upstream
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
scheduledFuture = EXECUTOR.scheduleAtFixedRate(this::timeout, 0, this.duration, TimeUnit.MILLISECONDS);
}
private void timeout() {
checkSend();
}
private void checkSend() {
lock.lock();
if (!this.buffer.isEmpty() && downstreamRequests > downstreamTransmit) {
List<T> output = prepareOutput();
subscriber.onNext(output);
downstreamTransmit++;
if (!this.buffer.isEmpty()) {
checkSend();
}
}
if (upstreamCompleted && downstreamRequests > downstreamTransmit) {
scheduledFuture.cancel(false);
subscriber.onComplete();
}
lock.unlock();
}
private List<T> prepareOutput() {
if (this.buffer.size() > size) {
List<T> output = new ArrayList<>(this.buffer.subList(0, size));
this.buffer = new ArrayList<>(this.buffer.subList(size, this.buffer.size()));
return output;
} else {
List<T> output = this.buffer;
this.buffer = new ArrayList<>(size);
return output;
}
}
@Override
public void onNext(T t) {
lock.lock();
this.buffer.add(t);
upstreamTransmit++;
if (this.buffer.size() == size) {
checkSend();
}
lock.unlock();
}
@Override
public void onError(Throwable t) {
scheduledFuture.cancel(false);
subscriber.onError(t);
}
@Override
public void onComplete() {
lock.lock();
upstreamCompleted = true;
checkSend();
lock.unlock();
}
}
}

View File

@ -962,7 +962,7 @@ public class LLLocalDictionary implements LLDictionary {
Flux<Tuple2<K, ByteBuf>> keys,
boolean existsAlmostCertainly) {
return keys
.bufferTimeout(MULTI_GET_WINDOW, MULTI_GET_WINDOW_TIMEOUT)
.transform(normal -> new BufferTimeOutPublisher<>(normal, MULTI_GET_WINDOW, MULTI_GET_WINDOW_TIMEOUT))
.doOnDiscard(Tuple2.class, discardedEntry -> {
//noinspection unchecked
var entry = (Tuple2<K, ByteBuf>) discardedEntry;

View File

@ -115,7 +115,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLDocument>> documents) {
return documents
.bufferTimeout(512, Duration.ofSeconds(2))
.transform(normal -> new BufferTimeOutPublisher<>(normal, 512, Duration.ofSeconds(2)))
.flatMap(inputEntries -> {
List<Entry<LLTerm, LLDocument>>[] sortedEntries = new List[luceneIndices.length];
Mono<Void>[] results = new Mono[luceneIndices.length];