From 3cd57bf61f4b790ca80b750158b1d5173c42e2fa Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 11 Jan 2022 19:59:27 +0100 Subject: [PATCH] Fix scheduling --- pom.xml | 25 ---------------- .../reactiveapi/AtomixReactiveApi.java | 2 +- .../AtomixReactiveApiMultiClient.java | 4 ++- .../DynamicAtomixReactiveApiClient.java | 3 +- .../LiveAtomixReactiveApiClient.java | 30 +++++++++++-------- .../reactiveapi/ReactiveApiPublisher.java | 4 ++- 6 files changed, 26 insertions(+), 42 deletions(-) diff --git a/pom.xml b/pom.xml index a13accc..eaca16c 100644 --- a/pom.xml +++ b/pom.xml @@ -275,29 +275,4 @@ - - - standalone - - true - - - - org.slf4j - slf4j-api - 1.7.32 - - - org.apache.logging.log4j - log4j-slf4j-impl - 2.17.1 - - - org.apache.logging.log4j - log4j-slf4j18-impl - 2.17.1 - - - - diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index cf95b61..16fad0e 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -490,7 +490,7 @@ public class AtomixReactiveApi implements ReactiveApi { @Override public Mono close() { - return Mono.fromCompletionStage(this.atomix::stop); + return Mono.fromCompletionStage(this.atomix::stop).timeout(Duration.ofSeconds(8), Mono.empty()); } private record DiskSessionAndId(DiskSession diskSession, long id) {} diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java index 813563b..a2e36eb 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java @@ -14,6 +14,7 @@ import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, AutoCloseable { @@ -38,6 +39,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut ); sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); }, OverflowStrategy.ERROR) + .subscribeOn(Schedulers.boundedElastic()) .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) .flatMapIterable(list -> list) .takeUntil(s -> closed) @@ -61,7 +63,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut LiveAtomixReactiveApiClient::deserializeResponse, Duration.between(Instant.now(), timeout) ); - }).handle((item, sink) -> { + }).subscribeOn(Schedulers.boundedElastic()).handle((item, sink) -> { if (item instanceof TdApi.Error error) { sink.error(new TdError(error.code, error.message)); } else { diff --git a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java index 029034c..e2fb2e7 100644 --- a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java @@ -50,6 +50,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl ); sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); }, OverflowStrategy.ERROR) + .subscribeOn(Schedulers.boundedElastic()) .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) .flatMapIterable(list -> list) .filter(e -> e.userId() == userId) @@ -78,7 +79,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl LiveAtomixReactiveApiClient::serializeRequest, LiveAtomixReactiveApiClient::deserializeResponse, Duration.between(Instant.now(), timeout) - )).onErrorMap(ex -> { + )).subscribeOn(Schedulers.boundedElastic()).onErrorMap(ex -> { if (ex instanceof MessagingException.NoRemoteHandler) { return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster"); } else { diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index cb30f8c..a93ca95 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -27,6 +27,7 @@ import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class LiveAtomixReactiveApiClient implements ReactiveApiClient { @@ -65,19 +66,22 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient { @Override public Mono request(TdApi.Function request, Instant timeout) { - return Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests", - new Request<>(liveId, request, timeout), - LiveAtomixReactiveApiClient::serializeRequest, - LiveAtomixReactiveApiClient::deserializeResponse, - Duration.between(Instant.now(), timeout) - )).handle((item, sink) -> { - if (item instanceof TdApi.Error error) { - sink.error(new TdError(error.code, error.message)); - } else { - //noinspection unchecked - sink.next((T) item); - } - }); + return Mono + .fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests", + new Request<>(liveId, request, timeout), + LiveAtomixReactiveApiClient::serializeRequest, + LiveAtomixReactiveApiClient::deserializeResponse, + Duration.between(Instant.now(), timeout) + )) + .subscribeOn(Schedulers.boundedElastic()) + .handle((item, sink) -> { + if (item instanceof TdApi.Error error) { + sink.error(new TdError(error.code, error.message)); + } else { + //noinspection unchecked + sink.next((T) item); + } + }); } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 3d353b6..cc0aba1 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -97,7 +97,7 @@ public abstract class ReactiveApiPublisher { subscription.close(); rawTelegramClient.dispose(); }); - })).share(); + })).publishOn(Schedulers.parallel()).share(); } public static ReactiveApiPublisher fromToken(Atomix atomix, @@ -181,6 +181,7 @@ public abstract class ReactiveApiPublisher { // Send events to the client .subscribeOn(Schedulers.parallel()) + .publishOn(Schedulers.boundedElastic()) .subscribe(clientBoundEvent -> eventService.broadcast("session-client-bound-events", clientBoundEvent, ReactiveApiPublisher::serializeEvents)); @@ -466,6 +467,7 @@ public abstract class ReactiveApiPublisher { } }) .map(responseObj -> new Response(liveId, responseObj)) + .publishOn(Schedulers.boundedElastic()) .toFuture(); }