2021-02-26 14:06:16 +01:00
|
|
|
package it.cavallium.dbengine.database;
|
|
|
|
|
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
import org.jetbrains.annotations.Nullable;
|
2021-02-26 21:21:02 +01:00
|
|
|
import org.warp.commonutils.log.Logger;
|
|
|
|
import org.warp.commonutils.log.LoggerFactory;
|
2021-02-26 14:06:16 +01:00
|
|
|
import reactor.core.publisher.Flux;
|
2021-02-26 23:30:11 +01:00
|
|
|
import reactor.core.scheduler.Scheduler;
|
2021-02-26 14:06:16 +01:00
|
|
|
|
|
|
|
public abstract class BlockingFluxIterable<T> {
|
|
|
|
|
2021-02-26 21:21:02 +01:00
|
|
|
private static final Logger logger = LoggerFactory.getLogger(BlockingFluxIterable.class);
|
|
|
|
private final String name;
|
|
|
|
private AtomicBoolean cancelled = new AtomicBoolean(false);
|
2021-02-26 14:06:16 +01:00
|
|
|
|
2021-02-26 21:21:02 +01:00
|
|
|
public BlockingFluxIterable(String name) {
|
|
|
|
this.name = name;
|
2021-02-26 14:06:16 +01:00
|
|
|
}
|
|
|
|
|
2021-02-26 23:30:11 +01:00
|
|
|
public Flux<T> generate(Scheduler scheduler) {
|
2021-02-26 21:21:02 +01:00
|
|
|
logger.trace("Generating iterable flux {}", this.name);
|
2021-02-26 23:30:11 +01:00
|
|
|
AtomicBoolean alreadyInitialized = new AtomicBoolean(false);
|
|
|
|
AtomicLong requests = new AtomicLong(0);
|
|
|
|
Semaphore availableRequests = new Semaphore(0);
|
2021-02-26 14:06:16 +01:00
|
|
|
return Flux
|
|
|
|
.<T>create(sink -> {
|
|
|
|
sink.onRequest(n -> {
|
|
|
|
requests.addAndGet(n);
|
|
|
|
availableRequests.release();
|
|
|
|
});
|
|
|
|
sink.onDispose(() -> {
|
|
|
|
cancelled.set(true);
|
|
|
|
availableRequests.release();
|
|
|
|
});
|
|
|
|
|
2021-02-26 23:30:11 +01:00
|
|
|
scheduler.schedule(() -> {
|
2021-02-26 21:21:02 +01:00
|
|
|
logger.trace("Starting iterable flux {}", this.name);
|
2021-02-26 14:06:16 +01:00
|
|
|
try {
|
2021-02-26 19:14:33 +01:00
|
|
|
try {
|
|
|
|
loop:
|
|
|
|
while (true) {
|
|
|
|
availableRequests.acquireUninterruptibly();
|
|
|
|
var remainingRequests = requests.getAndSet(0);
|
2021-02-26 21:21:02 +01:00
|
|
|
if (cancelled.get()) {
|
2021-02-26 19:14:33 +01:00
|
|
|
break;
|
2021-02-26 14:06:16 +01:00
|
|
|
}
|
|
|
|
|
2021-02-26 19:14:33 +01:00
|
|
|
while (remainingRequests-- > 0) {
|
|
|
|
if (alreadyInitialized.compareAndSet(false, true)) {
|
|
|
|
this.onStartup();
|
|
|
|
}
|
|
|
|
|
|
|
|
T next = onNext();
|
|
|
|
if (next == null) {
|
|
|
|
break loop;
|
|
|
|
}
|
|
|
|
sink.next(next);
|
2021-02-26 14:06:16 +01:00
|
|
|
}
|
2021-02-26 19:14:33 +01:00
|
|
|
}
|
|
|
|
} catch (InterruptedException ex) {
|
|
|
|
sink.error(ex);
|
|
|
|
} finally {
|
|
|
|
if (alreadyInitialized.get()) {
|
|
|
|
onTerminate();
|
2021-02-26 14:06:16 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
} finally {
|
2021-02-26 19:14:33 +01:00
|
|
|
sink.complete();
|
2021-02-26 14:06:16 +01:00
|
|
|
}
|
2021-02-26 23:30:11 +01:00
|
|
|
});
|
2021-02-26 19:14:33 +01:00
|
|
|
});
|
2021-02-26 14:06:16 +01:00
|
|
|
}
|
|
|
|
|
2021-02-26 23:30:11 +01:00
|
|
|
public Flux<T> generateNonblocking(Scheduler scheduler, int rateLimit) {
|
|
|
|
logger.trace("Generating nonblocking iterable flux {}", this.name);
|
|
|
|
AtomicBoolean alreadyInitialized = new AtomicBoolean(false);
|
|
|
|
final Object lock = new Object();
|
|
|
|
return Flux
|
|
|
|
.<T>create(sink -> {
|
|
|
|
sink.onRequest(n -> {
|
|
|
|
if (n > rateLimit || n < 1) {
|
|
|
|
sink.error(new IndexOutOfBoundsException("Requests must be <= " + rateLimit));
|
|
|
|
} else {
|
|
|
|
scheduler.schedule(() -> {
|
|
|
|
synchronized (lock) {
|
|
|
|
if (cancelled.get()) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
int remaining = (int) n;
|
|
|
|
try {
|
|
|
|
T next;
|
|
|
|
do {
|
|
|
|
if (alreadyInitialized.compareAndSet(false, true)) {
|
|
|
|
this.onStartup();
|
|
|
|
}
|
|
|
|
next = this.onNext();
|
|
|
|
if (next != null) {
|
|
|
|
sink.next(next);
|
|
|
|
} else {
|
|
|
|
cancelled.set(true);
|
|
|
|
sink.complete();
|
|
|
|
}
|
|
|
|
} while (next != null && --remaining > 0 && !cancelled.get());
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
sink.error(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
});
|
|
|
|
sink.onCancel(() -> {
|
|
|
|
});
|
|
|
|
sink.onDispose(() -> {
|
|
|
|
cancelled.set(true);
|
|
|
|
scheduler.schedule(() -> {
|
|
|
|
synchronized (lock) {
|
|
|
|
if (alreadyInitialized.get()) {
|
|
|
|
this.onTerminate();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
})
|
|
|
|
.limitRate(rateLimit);
|
|
|
|
}
|
|
|
|
|
2021-02-26 14:06:16 +01:00
|
|
|
public abstract void onStartup();
|
|
|
|
|
|
|
|
public abstract void onTerminate();
|
|
|
|
|
|
|
|
@Nullable
|
2021-02-26 19:14:33 +01:00
|
|
|
public abstract T onNext() throws InterruptedException;
|
2021-02-26 21:21:02 +01:00
|
|
|
|
|
|
|
protected boolean isCancelled() {
|
|
|
|
return cancelled.get();
|
|
|
|
}
|
2021-02-26 14:06:16 +01:00
|
|
|
}
|