Update BlockingFluxIterable

This commit is contained in:
Andrea Cavalli 2021-02-26 19:14:33 +01:00
parent e854b4252c
commit a4340cdd2b

View File

@ -18,7 +18,7 @@ public abstract class BlockingFluxIterable<T> {
public Flux<T> generate() {
return Flux
.<T>create(sink -> {
boolean alreadyInitialized = false;
AtomicBoolean alreadyInitialized = new AtomicBoolean(false);
AtomicLong requests = new AtomicLong(0);
Semaphore availableRequests = new Semaphore(0);
AtomicBoolean cancelled = new AtomicBoolean(false);
@ -31,39 +31,41 @@ public abstract class BlockingFluxIterable<T> {
availableRequests.release();
});
try {
scheduler.schedule(() -> {
try {
loop:
while (true) {
availableRequests.acquireUninterruptibly();
var remainingRequests = requests.getAndSet(0);
if (remainingRequests == 0 || cancelled.get()) {
break;
try {
loop:
while (true) {
availableRequests.acquireUninterruptibly();
var remainingRequests = requests.getAndSet(0);
if (remainingRequests == 0 || cancelled.get()) {
break;
}
while (remainingRequests-- > 0) {
if (alreadyInitialized.compareAndSet(false, true)) {
this.onStartup();
}
T next = onNext();
if (next == null) {
break loop;
}
sink.next(next);
}
}
while (remainingRequests-- > 0) {
if (!alreadyInitialized) {
alreadyInitialized = true;
this.onStartup();
}
T next = onNext();
if (next == null) {
break loop;
}
sink.next(next);
} catch (InterruptedException ex) {
sink.error(ex);
} finally {
if (alreadyInitialized.get()) {
onTerminate();
}
}
} finally {
if (alreadyInitialized) {
onTerminate();
}
sink.complete();
}
} finally {
sink.complete();
}
})
.subscribeOn(scheduler);
});
});
}
public abstract void onStartup();
@ -71,5 +73,5 @@ public abstract class BlockingFluxIterable<T> {
public abstract void onTerminate();
@Nullable
public abstract T onNext();
public abstract T onNext() throws InterruptedException;
}