Add backpressure

This commit is contained in:
Andrea Cavalli 2022-01-12 21:36:41 +01:00
parent be89c549ef
commit 799fd4149c

View File

@ -141,8 +141,8 @@ public abstract class ReactiveApiPublisher {
.filter(s -> s instanceof TDLibBoundResultingEvent<?>) .filter(s -> s instanceof TDLibBoundResultingEvent<?>)
.map(s -> ((TDLibBoundResultingEvent<?>) s).action()) .map(s -> ((TDLibBoundResultingEvent<?>) s).action())
// Buffer up to 64 requests to avoid halting the event loop, throw an error if too many requests are buffered // Buffer requests to avoid halting the event loop
.onBackpressureBuffer(64, BufferOverflowStrategy.ERROR) .onBackpressureBuffer()
// Send requests to tdlib // Send requests to tdlib
.flatMap(function -> Mono .flatMap(function -> Mono
@ -176,8 +176,11 @@ public abstract class ReactiveApiPublisher {
.cast(ClientBoundResultingEvent.class) .cast(ClientBoundResultingEvent.class)
.map(ClientBoundResultingEvent::event) .map(ClientBoundResultingEvent::event)
// Buffer requests to avoid halting the event loop
.onBackpressureBuffer()
.limitRate(1) .limitRate(1)
.bufferTimeout(64, Duration.ofMillis(10)) .bufferTimeout(512, Duration.ofMillis(100))
// Send events to the client // Send events to the client
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
@ -190,6 +193,9 @@ public abstract class ReactiveApiPublisher {
.filter(s -> s instanceof ClusterBoundResultingEvent) .filter(s -> s instanceof ClusterBoundResultingEvent)
.cast(ClusterBoundResultingEvent.class) .cast(ClusterBoundResultingEvent.class)
// Buffer requests to avoid halting the event loop
.onBackpressureBuffer()
// Send events to the cluster // Send events to the cluster
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.subscribe(clusterBoundEvent -> { .subscribe(clusterBoundEvent -> {