Remove buffering

This commit is contained in:
Andrea Cavalli 2022-10-07 17:50:34 +02:00
parent 3bed3052d0
commit edb8cec873
2 changed files with 3 additions and 15 deletions

View File

@ -160,9 +160,6 @@ public abstract class ReactiveApiPublisher {
.filter(s -> s instanceof TDLibBoundResultingEvent<?>) .filter(s -> s instanceof TDLibBoundResultingEvent<?>)
.map(s -> ((TDLibBoundResultingEvent<?>) s)) .map(s -> ((TDLibBoundResultingEvent<?>) s))
// Buffer requests to avoid halting the event loop
.onBackpressureBuffer()
// Send requests to tdlib // Send requests to tdlib
.flatMap(req -> Mono .flatMap(req -> Mono
.from(rawTelegramClient.send(req.action(), SPECIAL_RAW_TIMEOUT_DURATION)) .from(rawTelegramClient.send(req.action(), SPECIAL_RAW_TIMEOUT_DURATION))
@ -185,9 +182,6 @@ public abstract class ReactiveApiPublisher {
.onErrorResume(ex -> Mono.just(new OnUpdateError(userId, new TdApi.Error(500, ex.getMessage())))) .onErrorResume(ex -> Mono.just(new OnUpdateError(userId, new TdApi.Error(500, ex.getMessage()))))
, Integer.MAX_VALUE) , Integer.MAX_VALUE)
// Buffer requests to avoid halting the event loop
.onBackpressureBuffer()
.doOnError(ex -> LOG.error("Failed to receive resulting events. The instance will be closed", ex)) .doOnError(ex -> LOG.error("Failed to receive resulting events. The instance will be closed", ex))
.onErrorResume(ex -> Mono.just(new OnUpdateError(userId, new TdApi.Error(500, ex.getMessage())))) .onErrorResume(ex -> Mono.just(new OnUpdateError(userId, new TdApi.Error(500, ex.getMessage()))))
@ -204,10 +198,7 @@ public abstract class ReactiveApiPublisher {
// Obtain only client-bound events // Obtain only client-bound events
.filter(s -> s instanceof ClientBoundResultingEvent) .filter(s -> s instanceof ClientBoundResultingEvent)
.cast(ClientBoundResultingEvent.class) .cast(ClientBoundResultingEvent.class)
.map(ClientBoundResultingEvent::event) .map(ClientBoundResultingEvent::event);
// Buffer requests to avoid halting the event loop
.onBackpressureBuffer();
sharedTdlibServers.events(lane, messagesToSend); sharedTdlibServers.events(lane, messagesToSend);
@ -216,9 +207,6 @@ public abstract class ReactiveApiPublisher {
.filter(s -> s instanceof ClusterBoundResultingEvent) .filter(s -> s instanceof ClusterBoundResultingEvent)
.cast(ClusterBoundResultingEvent.class) .cast(ClusterBoundResultingEvent.class)
// Buffer requests to avoid halting the event loop
.onBackpressureBuffer()
// Send events to the cluster // Send events to the cluster
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.subscribe(clusterBoundEvent -> { .subscribe(clusterBoundEvent -> {

View File

@ -57,9 +57,9 @@ public class TdlibChannelsSharedHost implements Closeable {
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.subscribe(n -> {}, ex -> LOG.error("Unexpected error when sending responses", ex)); .subscribe(n -> {}, ex -> LOG.error("Unexpected error when sending responses", ex));
events = allLanes.stream().collect(Collectors.toUnmodifiableMap(Function.identity(), lane -> { events = allLanes.stream().collect(Collectors.toUnmodifiableMap(Function.identity(), lane -> {
Many<Flux<ClientBoundEvent>> sink = Sinks.many().multicast().onBackpressureBuffer(65535); Many<Flux<ClientBoundEvent>> sink = Sinks.many().replay().all();
var outputEventsFlux = Flux var outputEventsFlux = Flux
.merge(sink.asFlux().cache().map(flux -> flux.publish().autoConnect().subscribeOn(Schedulers.parallel())), Integer.MAX_VALUE) .merge(sink.asFlux().map(flux -> flux.publish().autoConnect().subscribeOn(Schedulers.parallel())), Integer.MAX_VALUE)
.doFinally(s -> LOG.debug("Output events flux of lane \"{}\" terminated with signal {}", lane, s)); .doFinally(s -> LOG.debug("Output events flux of lane \"{}\" terminated with signal {}", lane, s));
Mono.defer(() -> tdServersChannels Mono.defer(() -> tdServersChannels
.events(lane) .events(lane)