Add nonblocking method to flux iterable

This commit is contained in:
Andrea Cavalli 2021-02-26 23:30:11 +01:00
parent 9140c7f3d6
commit 2d565c8d17
2 changed files with 64 additions and 10 deletions

View File

@ -7,22 +7,23 @@ import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory; import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
public abstract class BlockingFluxIterable<T> { public abstract class BlockingFluxIterable<T> {
private static final Logger logger = LoggerFactory.getLogger(BlockingFluxIterable.class); private static final Logger logger = LoggerFactory.getLogger(BlockingFluxIterable.class);
private final String name; private final String name;
private AtomicBoolean alreadyInitialized = new AtomicBoolean(false);
private AtomicLong requests = new AtomicLong(0);
private Semaphore availableRequests = new Semaphore(0);
private AtomicBoolean cancelled = new AtomicBoolean(false); private AtomicBoolean cancelled = new AtomicBoolean(false);
public BlockingFluxIterable(String name) { public BlockingFluxIterable(String name) {
this.name = name; this.name = name;
} }
public Flux<T> generate() { public Flux<T> generate(Scheduler scheduler) {
logger.trace("Generating iterable flux {}", this.name); logger.trace("Generating iterable flux {}", this.name);
AtomicBoolean alreadyInitialized = new AtomicBoolean(false);
AtomicLong requests = new AtomicLong(0);
Semaphore availableRequests = new Semaphore(0);
return Flux return Flux
.<T>create(sink -> { .<T>create(sink -> {
sink.onRequest(n -> { sink.onRequest(n -> {
@ -34,7 +35,7 @@ public abstract class BlockingFluxIterable<T> {
availableRequests.release(); availableRequests.release();
}); });
new Thread(() -> { scheduler.schedule(() -> {
logger.trace("Starting iterable flux {}", this.name); logger.trace("Starting iterable flux {}", this.name);
try { try {
try { try {
@ -68,8 +69,61 @@ public abstract class BlockingFluxIterable<T> {
} finally { } finally {
sink.complete(); sink.complete();
} }
}, "blocking-flux-iterable").start();
}); });
});
}
public Flux<T> generateNonblocking(Scheduler scheduler, int rateLimit) {
logger.trace("Generating nonblocking iterable flux {}", this.name);
AtomicBoolean alreadyInitialized = new AtomicBoolean(false);
final Object lock = new Object();
return Flux
.<T>create(sink -> {
sink.onRequest(n -> {
if (n > rateLimit || n < 1) {
sink.error(new IndexOutOfBoundsException("Requests must be <= " + rateLimit));
} else {
scheduler.schedule(() -> {
synchronized (lock) {
if (cancelled.get()) {
return;
}
int remaining = (int) n;
try {
T next;
do {
if (alreadyInitialized.compareAndSet(false, true)) {
this.onStartup();
}
next = this.onNext();
if (next != null) {
sink.next(next);
} else {
cancelled.set(true);
sink.complete();
}
} while (next != null && --remaining > 0 && !cancelled.get());
} catch (InterruptedException e) {
sink.error(e);
}
}
});
}
});
sink.onCancel(() -> {
});
sink.onDispose(() -> {
cancelled.set(true);
scheduler.schedule(() -> {
synchronized (lock) {
if (alreadyInitialized.get()) {
this.onTerminate();
}
}
});
});
})
.limitRate(rateLimit);
} }
public abstract void onStartup(); public abstract void onStartup();

View File

@ -568,7 +568,7 @@ public class LLLocalDictionary implements LLDictionary {
protected Entry<byte[], byte[]> transformEntry(byte[] key) { protected Entry<byte[], byte[]> transformEntry(byte[] key) {
return Map.entry(key, this.getValue()); return Map.entry(key, this.getValue());
} }
}.generate(); }.generateNonblocking(dbScheduler, 128);
} }
private Flux<List<Entry<byte[],byte[]>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) { private Flux<List<Entry<byte[],byte[]>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) {
@ -583,7 +583,7 @@ public class LLLocalDictionary implements LLDictionary {
protected Entry<byte[], byte[]> transformEntry(byte[] key) { protected Entry<byte[], byte[]> transformEntry(byte[] key) {
return Map.entry(key, this.getValue()); return Map.entry(key, this.getValue());
} }
}.generate(); }.generateNonblocking(dbScheduler, 128);
} }
@Override @Override
@ -610,7 +610,7 @@ public class LLLocalDictionary implements LLDictionary {
protected byte[] transformEntry(byte[] key) { protected byte[] transformEntry(byte[] key) {
return key; return key;
} }
}.generate(); }.generateNonblocking(dbScheduler, 128);
} }
private Flux<byte[]> getRangeKeysSingle(LLSnapshot snapshot, byte[] key) { private Flux<byte[]> getRangeKeysSingle(LLSnapshot snapshot, byte[] key) {
@ -633,7 +633,7 @@ public class LLLocalDictionary implements LLDictionary {
protected byte[] transformEntry(byte[] key) { protected byte[] transformEntry(byte[] key) {
return key; return key;
} }
}.generate(); }.generateNonblocking(dbScheduler, 128);
} }
//todo: replace implementation with a simple Flux.push //todo: replace implementation with a simple Flux.push