From 799fd4149ca148fdd40c1c609c739797112663cd Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 12 Jan 2022 21:36:41 +0100 Subject: [PATCH] Add backpressure --- .../it/tdlight/reactiveapi/ReactiveApiPublisher.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index cc0aba1..e2c7a2c 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -141,8 +141,8 @@ public abstract class ReactiveApiPublisher { .filter(s -> s instanceof TDLibBoundResultingEvent) .map(s -> ((TDLibBoundResultingEvent) s).action()) - // Buffer up to 64 requests to avoid halting the event loop, throw an error if too many requests are buffered - .onBackpressureBuffer(64, BufferOverflowStrategy.ERROR) + // Buffer requests to avoid halting the event loop + .onBackpressureBuffer() // Send requests to tdlib .flatMap(function -> Mono @@ -176,8 +176,11 @@ public abstract class ReactiveApiPublisher { .cast(ClientBoundResultingEvent.class) .map(ClientBoundResultingEvent::event) + // Buffer requests to avoid halting the event loop + .onBackpressureBuffer() + .limitRate(1) - .bufferTimeout(64, Duration.ofMillis(10)) + .bufferTimeout(512, Duration.ofMillis(100)) // Send events to the client .subscribeOn(Schedulers.parallel()) @@ -190,6 +193,9 @@ public abstract class ReactiveApiPublisher { .filter(s -> s instanceof ClusterBoundResultingEvent) .cast(ClusterBoundResultingEvent.class) + // Buffer requests to avoid halting the event loop + .onBackpressureBuffer() + // Send events to the cluster .subscribeOn(Schedulers.parallel()) .subscribe(clusterBoundEvent -> {