From edb8cec87356bb6c2a2711e66882a76df3aac9c7 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 7 Oct 2022 17:50:34 +0200 Subject: [PATCH] Remove buffering --- .../tdlight/reactiveapi/ReactiveApiPublisher.java | 14 +------------- .../reactiveapi/TdlibChannelsSharedHost.java | 4 ++-- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 13959f1..e71bf22 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -160,9 +160,6 @@ public abstract class ReactiveApiPublisher { .filter(s -> s instanceof TDLibBoundResultingEvent) .map(s -> ((TDLibBoundResultingEvent) s)) - // Buffer requests to avoid halting the event loop - .onBackpressureBuffer() - // Send requests to tdlib .flatMap(req -> Mono .from(rawTelegramClient.send(req.action(), SPECIAL_RAW_TIMEOUT_DURATION)) @@ -185,9 +182,6 @@ public abstract class ReactiveApiPublisher { .onErrorResume(ex -> Mono.just(new OnUpdateError(userId, new TdApi.Error(500, ex.getMessage())))) , Integer.MAX_VALUE) - // Buffer requests to avoid halting the event loop - .onBackpressureBuffer() - .doOnError(ex -> LOG.error("Failed to receive resulting events. The instance will be closed", ex)) .onErrorResume(ex -> Mono.just(new OnUpdateError(userId, new TdApi.Error(500, ex.getMessage())))) @@ -204,10 +198,7 @@ public abstract class ReactiveApiPublisher { // Obtain only client-bound events .filter(s -> s instanceof ClientBoundResultingEvent) .cast(ClientBoundResultingEvent.class) - .map(ClientBoundResultingEvent::event) - - // Buffer requests to avoid halting the event loop - .onBackpressureBuffer(); + .map(ClientBoundResultingEvent::event); sharedTdlibServers.events(lane, messagesToSend); @@ -216,9 +207,6 @@ public abstract class ReactiveApiPublisher { .filter(s -> s instanceof ClusterBoundResultingEvent) .cast(ClusterBoundResultingEvent.class) - // Buffer requests to avoid halting the event loop - .onBackpressureBuffer() - // Send events to the cluster .subscribeOn(Schedulers.parallel()) .subscribe(clusterBoundEvent -> { diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java index c48f4f0..5e7fa5e 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java @@ -57,9 +57,9 @@ public class TdlibChannelsSharedHost implements Closeable { .subscribeOn(Schedulers.parallel()) .subscribe(n -> {}, ex -> LOG.error("Unexpected error when sending responses", ex)); events = allLanes.stream().collect(Collectors.toUnmodifiableMap(Function.identity(), lane -> { - Many> sink = Sinks.many().multicast().onBackpressureBuffer(65535); + Many> sink = Sinks.many().replay().all(); var outputEventsFlux = Flux - .merge(sink.asFlux().cache().map(flux -> flux.publish().autoConnect().subscribeOn(Schedulers.parallel())), Integer.MAX_VALUE) + .merge(sink.asFlux().map(flux -> flux.publish().autoConnect().subscribeOn(Schedulers.parallel())), Integer.MAX_VALUE) .doFinally(s -> LOG.debug("Output events flux of lane \"{}\" terminated with signal {}", lane, s)); Mono.defer(() -> tdServersChannels .events(lane)