diff --git a/pom.xml b/pom.xml
index a13accc..eaca16c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -275,29 +275,4 @@
-
-
- standalone
-
- true
-
-
-
- org.slf4j
- slf4j-api
- 1.7.32
-
-
- org.apache.logging.log4j
- log4j-slf4j-impl
- 2.17.1
-
-
- org.apache.logging.log4j
- log4j-slf4j18-impl
- 2.17.1
-
-
-
-
diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
index cf95b61..16fad0e 100644
--- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
+++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
@@ -490,7 +490,7 @@ public class AtomixReactiveApi implements ReactiveApi {
@Override
public Mono close() {
- return Mono.fromCompletionStage(this.atomix::stop);
+ return Mono.fromCompletionStage(this.atomix::stop).timeout(Duration.ofSeconds(8), Mono.empty());
}
private record DiskSessionAndId(DiskSession diskSession, long id) {}
diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java
index 813563b..a2e36eb 100644
--- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java
+++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java
@@ -14,6 +14,7 @@ import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, AutoCloseable {
@@ -38,6 +39,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut
);
sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close));
}, OverflowStrategy.ERROR)
+ .subscribeOn(Schedulers.boundedElastic())
.onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR)
.flatMapIterable(list -> list)
.takeUntil(s -> closed)
@@ -61,7 +63,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut
LiveAtomixReactiveApiClient::deserializeResponse,
Duration.between(Instant.now(), timeout)
);
- }).handle((item, sink) -> {
+ }).subscribeOn(Schedulers.boundedElastic()).handle((item, sink) -> {
if (item instanceof TdApi.Error error) {
sink.error(new TdError(error.code, error.message));
} else {
diff --git a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java
index 029034c..e2fb2e7 100644
--- a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java
+++ b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java
@@ -50,6 +50,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
);
sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close));
}, OverflowStrategy.ERROR)
+ .subscribeOn(Schedulers.boundedElastic())
.onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR)
.flatMapIterable(list -> list)
.filter(e -> e.userId() == userId)
@@ -78,7 +79,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
LiveAtomixReactiveApiClient::serializeRequest,
LiveAtomixReactiveApiClient::deserializeResponse,
Duration.between(Instant.now(), timeout)
- )).onErrorMap(ex -> {
+ )).subscribeOn(Schedulers.boundedElastic()).onErrorMap(ex -> {
if (ex instanceof MessagingException.NoRemoteHandler) {
return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster");
} else {
diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java
index cb30f8c..a93ca95 100644
--- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java
+++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java
@@ -27,6 +27,7 @@ import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
@@ -65,19 +66,22 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
@Override
public Mono request(TdApi.Function request, Instant timeout) {
- return Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests",
- new Request<>(liveId, request, timeout),
- LiveAtomixReactiveApiClient::serializeRequest,
- LiveAtomixReactiveApiClient::deserializeResponse,
- Duration.between(Instant.now(), timeout)
- )).handle((item, sink) -> {
- if (item instanceof TdApi.Error error) {
- sink.error(new TdError(error.code, error.message));
- } else {
- //noinspection unchecked
- sink.next((T) item);
- }
- });
+ return Mono
+ .fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests",
+ new Request<>(liveId, request, timeout),
+ LiveAtomixReactiveApiClient::serializeRequest,
+ LiveAtomixReactiveApiClient::deserializeResponse,
+ Duration.between(Instant.now(), timeout)
+ ))
+ .subscribeOn(Schedulers.boundedElastic())
+ .handle((item, sink) -> {
+ if (item instanceof TdApi.Error error) {
+ sink.error(new TdError(error.code, error.message));
+ } else {
+ //noinspection unchecked
+ sink.next((T) item);
+ }
+ });
}
@Override
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
index 3d353b6..cc0aba1 100644
--- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
+++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
@@ -97,7 +97,7 @@ public abstract class ReactiveApiPublisher {
subscription.close();
rawTelegramClient.dispose();
});
- })).share();
+ })).publishOn(Schedulers.parallel()).share();
}
public static ReactiveApiPublisher fromToken(Atomix atomix,
@@ -181,6 +181,7 @@ public abstract class ReactiveApiPublisher {
// Send events to the client
.subscribeOn(Schedulers.parallel())
+ .publishOn(Schedulers.boundedElastic())
.subscribe(clientBoundEvent -> eventService.broadcast("session-client-bound-events",
clientBoundEvent, ReactiveApiPublisher::serializeEvents));
@@ -466,6 +467,7 @@ public abstract class ReactiveApiPublisher {
}
})
.map(responseObj -> new Response(liveId, responseObj))
+ .publishOn(Schedulers.boundedElastic())
.toFuture();
}