diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 8140892..5d1249b 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -983,9 +983,7 @@ public class LLUtils { } private static void discardStage(DatabaseStage stage) { - if (stage != null && stage.isAccessible()) { - stage.close(); - } + // do nothing for now, to avoid double-free problems } public static boolean isDirect(Buffer key) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/BufferTimeOutPublisher.java b/src/main/java/it/cavallium/dbengine/database/disk/BufferTimeOutPublisher.java deleted file mode 100644 index 22f1244..0000000 --- a/src/main/java/it/cavallium/dbengine/database/disk/BufferTimeOutPublisher.java +++ /dev/null @@ -1,159 +0,0 @@ -package it.cavallium.dbengine.database.disk; - -/** Based on: - * https://gist.github.com/glandais-sparklane/e38834aa9df0c56f23e2d8d2e6899c78 - */ - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -@SuppressWarnings("ReactiveStreamsPublisherImplementation") -class BufferTimeOutPublisher implements Publisher> { - - private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(); - - private final Publisher source; - private final int size; - private final long duration; - - public BufferTimeOutPublisher(Publisher source, int size, Duration duration) { - this.source = source; - this.size = size; - this.duration = duration.toMillis(); - } - - @Override - public void subscribe(Subscriber> subscriber) { - subscriber.onSubscribe(new BufferTimeOutSubscription(source, subscriber, size, duration)); - } - - protected static class BufferTimeOutSubscription implements Subscription, Subscriber { - - private final Subscriber> subscriber; - private final int size; - private final long duration; - private Subscription subscription; - - private final ReentrantLock lock = new ReentrantLock(); - - private List buffer; - private ScheduledFuture scheduledFuture; - - private long downstreamRequests = 0; - private long downstreamTransmit = 0; - - private long upstreamRequests = 0; - private long upstreamTransmit = 0; - private boolean upstreamCompleted = false; - - public BufferTimeOutSubscription(Publisher source, - Subscriber> subscriber, - int size, - long duration) { - this.subscriber = subscriber; - this.size = size; - this.duration = duration; - this.buffer = new ArrayList<>(size); - source.subscribe(this); - } - - // downstream - @Override - public void request(long n) { - lock.lock(); - downstreamRequests = downstreamRequests + n; - - checkSend(); - - long downstreamMax = (downstreamRequests - downstreamTransmit) * size; - long upstreamRequested = upstreamRequests - upstreamTransmit; - long toRequest = downstreamMax - upstreamRequested; - - if (toRequest > 0) { - subscription.request(toRequest); - upstreamRequests = upstreamRequests + toRequest; - } - lock.unlock(); - } - - @Override - public void cancel() { - subscription.cancel(); - } - - // upstream - @Override - public void onSubscribe(Subscription s) { - this.subscription = s; - scheduledFuture = EXECUTOR.scheduleAtFixedRate(this::timeout, 0, this.duration, TimeUnit.MILLISECONDS); - } - - private void timeout() { - checkSend(); - } - - private void checkSend() { - lock.lock(); - if (!this.buffer.isEmpty() && downstreamRequests > downstreamTransmit) { - List output = prepareOutput(); - subscriber.onNext(output); - downstreamTransmit++; - if (!this.buffer.isEmpty()) { - checkSend(); - } - } - if (upstreamCompleted && downstreamRequests > downstreamTransmit) { - scheduledFuture.cancel(false); - subscriber.onComplete(); - } - lock.unlock(); - } - - private List prepareOutput() { - if (this.buffer.size() > size) { - List output = new ArrayList<>(this.buffer.subList(0, size)); - this.buffer = new ArrayList<>(this.buffer.subList(size, this.buffer.size())); - return output; - } else { - List output = this.buffer; - this.buffer = new ArrayList<>(size); - return output; - } - } - - @Override - public void onNext(T t) { - lock.lock(); - this.buffer.add(t); - upstreamTransmit++; - if (this.buffer.size() == size) { - checkSend(); - } - lock.unlock(); - } - - @Override - public void onError(Throwable t) { - scheduledFuture.cancel(false); - subscriber.onError(t); - } - - @Override - public void onComplete() { - lock.lock(); - upstreamCompleted = true; - checkSend(); - lock.unlock(); - } - - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 104f00c..cf082a6 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -1081,7 +1081,7 @@ public class LLLocalDictionary implements LLDictionary { Flux>> keys, boolean existsAlmostCertainly) { return keys - .transform(normal -> new BufferTimeOutPublisher<>(normal, MULTI_GET_WINDOW, MULTI_GET_WINDOW_TIMEOUT)) + .buffer(MULTI_GET_WINDOW) .doOnDiscard(Tuple2.class, discardedEntry -> { //noinspection unchecked var entry = (Tuple2) discardedEntry; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 7b3814a..a38b5d3 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -111,7 +111,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public Mono addDocuments(Flux> documents) { return documents - .transform(normal -> new BufferTimeOutPublisher<>(normal, 512, Duration.ofSeconds(2))) + .buffer(512) .flatMap(inputEntries -> { List>[] sortedEntries = new List[luceneIndices.length]; Mono[] results = new Mono[luceneIndices.length];