diff --git a/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java b/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java index af1abe4..9c4a592 100644 --- a/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java +++ b/src/main/java/it/cavallium/dbengine/database/BlockingFluxIterable.java @@ -7,22 +7,23 @@ import org.jetbrains.annotations.Nullable; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; public abstract class BlockingFluxIterable { private static final Logger logger = LoggerFactory.getLogger(BlockingFluxIterable.class); private final String name; - private AtomicBoolean alreadyInitialized = new AtomicBoolean(false); - private AtomicLong requests = new AtomicLong(0); - private Semaphore availableRequests = new Semaphore(0); private AtomicBoolean cancelled = new AtomicBoolean(false); public BlockingFluxIterable(String name) { this.name = name; } - public Flux generate() { + public Flux generate(Scheduler scheduler) { logger.trace("Generating iterable flux {}", this.name); + AtomicBoolean alreadyInitialized = new AtomicBoolean(false); + AtomicLong requests = new AtomicLong(0); + Semaphore availableRequests = new Semaphore(0); return Flux .create(sink -> { sink.onRequest(n -> { @@ -34,7 +35,7 @@ public abstract class BlockingFluxIterable { availableRequests.release(); }); - new Thread(() -> { + scheduler.schedule(() -> { logger.trace("Starting iterable flux {}", this.name); try { try { @@ -68,10 +69,63 @@ public abstract class BlockingFluxIterable { } finally { sink.complete(); } - }, "blocking-flux-iterable").start(); + }); }); } + public Flux generateNonblocking(Scheduler scheduler, int rateLimit) { + logger.trace("Generating nonblocking iterable flux {}", this.name); + AtomicBoolean alreadyInitialized = new AtomicBoolean(false); + final Object lock = new Object(); + return Flux + .create(sink -> { + sink.onRequest(n -> { + if (n > rateLimit || n < 1) { + sink.error(new IndexOutOfBoundsException("Requests must be <= " + rateLimit)); + } else { + scheduler.schedule(() -> { + synchronized (lock) { + if (cancelled.get()) { + return; + } + int remaining = (int) n; + try { + T next; + do { + if (alreadyInitialized.compareAndSet(false, true)) { + this.onStartup(); + } + next = this.onNext(); + if (next != null) { + sink.next(next); + } else { + cancelled.set(true); + sink.complete(); + } + } while (next != null && --remaining > 0 && !cancelled.get()); + } catch (InterruptedException e) { + sink.error(e); + } + } + }); + } + }); + sink.onCancel(() -> { + }); + sink.onDispose(() -> { + cancelled.set(true); + scheduler.schedule(() -> { + synchronized (lock) { + if (alreadyInitialized.get()) { + this.onTerminate(); + } + } + }); + }); + }) + .limitRate(rateLimit); + } + public abstract void onStartup(); public abstract void onTerminate(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 3ba371f..6f92473 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -568,7 +568,7 @@ public class LLLocalDictionary implements LLDictionary { protected Entry transformEntry(byte[] key) { return Map.entry(key, this.getValue()); } - }.generate(); + }.generateNonblocking(dbScheduler, 128); } private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) { @@ -583,7 +583,7 @@ public class LLLocalDictionary implements LLDictionary { protected Entry transformEntry(byte[] key) { return Map.entry(key, this.getValue()); } - }.generate(); + }.generateNonblocking(dbScheduler, 128); } @Override @@ -610,7 +610,7 @@ public class LLLocalDictionary implements LLDictionary { protected byte[] transformEntry(byte[] key) { return key; } - }.generate(); + }.generateNonblocking(dbScheduler, 128); } private Flux getRangeKeysSingle(LLSnapshot snapshot, byte[] key) { @@ -633,7 +633,7 @@ public class LLLocalDictionary implements LLDictionary { protected byte[] transformEntry(byte[] key) { return key; } - }.generate(); + }.generateNonblocking(dbScheduler, 128); } //todo: replace implementation with a simple Flux.push