diff --git a/src/main/java/it/cavallium/TransferClient.java b/src/main/java/it/cavallium/TransferClient.java index 348770a..a40fe10 100644 --- a/src/main/java/it/cavallium/TransferClient.java +++ b/src/main/java/it/cavallium/TransferClient.java @@ -22,6 +22,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; public class TransferClient { @@ -30,12 +31,16 @@ public class TransferClient { private final ConcurrentHashMap supergroupInfos = new ConcurrentHashMap<>(); private final ConcurrentHashMap supergroupFullInfos = new ConcurrentHashMap<>(); private final ConcurrentHashMap chats = new ConcurrentHashMap<>(); + private final Scheduler scheduler; public TransferClient(AsyncTdEasy client) { this.client = client; + this.scheduler = Schedulers.boundedElastic(); + this.client.getIncomingUpdates() - .subscribeOn(Schedulers.newParallel("bot_updates")) + .subscribeOn(scheduler) + .publishOn(scheduler) .flatMap(this::onUpdate) .subscribe(); } @@ -96,7 +101,7 @@ public class TransferClient { * @return The response or {@link TdApi.Error}. */ public Mono> send(TdApi.Function request) { - return client.send(request); + return client.send(request).publishOn(scheduler); } public Set getAdminSupergroups(boolean canRestrictMembers, boolean canInviteUsers) { diff --git a/src/main/java/it/cavallium/TransferServiceImpl.java b/src/main/java/it/cavallium/TransferServiceImpl.java index b77325c..a50042a 100644 --- a/src/main/java/it/cavallium/TransferServiceImpl.java +++ b/src/main/java/it/cavallium/TransferServiceImpl.java @@ -35,17 +35,20 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; public class TransferServiceImpl implements TransferService { private final TdClusterManager clusterManager; private final ConcurrentHashMap clients = new ConcurrentHashMap<>(); + private final Scheduler updatesScheduler; private int apiId; private String apiHash; public TransferServiceImpl(TdClusterManager clusterManager) { this.clusterManager = clusterManager; + this.updatesScheduler = Schedulers.boundedElastic(); } @Override @@ -128,7 +131,7 @@ public class TransferServiceImpl implements TransferService { }) .build()) .then(Mono.defer(() -> { - var clientStateFlux = client.getState().publish().autoConnect(2); + var clientStateFlux = client.getState().publish().autoConnect(3); clientStateFlux .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR @@ -140,16 +143,19 @@ public class TransferServiceImpl implements TransferService { .materialize() .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) .dematerialize() + .subscribeOn(updatesScheduler) .subscribe(state -> System.out.println("Userbot closed with state: " + state)); - clientStateFlux.subscribe(state -> System.out.println("state: " + state)); + clientStateFlux.subscribeOn(updatesScheduler).subscribe(state -> System.out.println("state: " + state)); client .getIncomingUpdates() + .subscribeOn(updatesScheduler) .flatMap(update -> onClientUpdate(update)) .subscribe(_v -> {}, e -> System.err.println(e)); return clientStateFlux + .subscribeOn(updatesScheduler) .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR || state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR || state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR