From a87149f626f943b4b7d648f489ffc957aad2bbc3 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 5 Aug 2021 17:33:21 +0200 Subject: [PATCH] Fix backpressure problem with reactor --- .../td/WrappedReactorTelegramClient.java | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java index e2319fd..5e86a2a 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java @@ -1,9 +1,15 @@ package it.tdlight.tdlibsession.td; +import it.tdlight.common.ReactiveItem; import it.tdlight.common.ReactiveTelegramClient; import it.tdlight.jni.TdApi; import it.tdlight.utils.MonoUtils; +import org.jetbrains.annotations.NotNull; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; public class WrappedReactorTelegramClient implements ReactorTelegramClient { @@ -23,7 +29,28 @@ public class WrappedReactorTelegramClient implements ReactorTelegramClient { @Override public Flux receive() { return Flux - .from(reactiveTelegramClient) + .create(sink -> reactiveTelegramClient.subscribe(new CoreSubscriber<>() { + @Override + public void onSubscribe(@NotNull Subscription s) { + sink.onCancel(s::cancel); + sink.onRequest(s::request); + } + + @Override + public void onNext(ReactiveItem reactiveItem) { + sink.next(reactiveItem); + } + + @Override + public void onError(Throwable t) { + sink.error(t); + } + + @Override + public void onComplete() { + sink.complete(); + } + }), OverflowStrategy.BUFFER) .handle((item, sink) -> { if (item.isUpdate()) { sink.next(item.getUpdate());