Update TransferClient.java and TransferServiceImpl.java

This commit is contained in:
Andrea Cavalli 2020-10-19 17:47:45 +02:00
parent 1045dd0ef3
commit bd4132e6a7
2 changed files with 15 additions and 4 deletions

View File

@ -22,6 +22,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
public class TransferClient { public class TransferClient {
@ -30,12 +31,16 @@ public class TransferClient {
private final ConcurrentHashMap<Integer, Supergroup> supergroupInfos = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Integer, Supergroup> supergroupInfos = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, SupergroupFullInfo> supergroupFullInfos = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Integer, SupergroupFullInfo> supergroupFullInfos = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, Chat> chats = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long, Chat> chats = new ConcurrentHashMap<>();
private final Scheduler scheduler;
public TransferClient(AsyncTdEasy client) { public TransferClient(AsyncTdEasy client) {
this.client = client; this.client = client;
this.scheduler = Schedulers.boundedElastic();
this.client.getIncomingUpdates() this.client.getIncomingUpdates()
.subscribeOn(Schedulers.newParallel("bot_updates")) .subscribeOn(scheduler)
.publishOn(scheduler)
.flatMap(this::onUpdate) .flatMap(this::onUpdate)
.subscribe(); .subscribe();
} }
@ -96,7 +101,7 @@ public class TransferClient {
* @return The response or {@link TdApi.Error}. * @return The response or {@link TdApi.Error}.
*/ */
public <T extends Object> Mono<TdResult<T>> send(TdApi.Function request) { public <T extends Object> Mono<TdResult<T>> send(TdApi.Function request) {
return client.send(request); return client.<T>send(request).publishOn(scheduler);
} }
public Set<BaseChatInfo> getAdminSupergroups(boolean canRestrictMembers, boolean canInviteUsers) { public Set<BaseChatInfo> getAdminSupergroups(boolean canRestrictMembers, boolean canInviteUsers) {

View File

@ -35,17 +35,20 @@ import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
public class TransferServiceImpl implements TransferService { public class TransferServiceImpl implements TransferService {
private final TdClusterManager clusterManager; private final TdClusterManager clusterManager;
private final ConcurrentHashMap<Long, TransferClient> clients = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long, TransferClient> clients = new ConcurrentHashMap<>();
private final Scheduler updatesScheduler;
private int apiId; private int apiId;
private String apiHash; private String apiHash;
public TransferServiceImpl(TdClusterManager clusterManager) { public TransferServiceImpl(TdClusterManager clusterManager) {
this.clusterManager = clusterManager; this.clusterManager = clusterManager;
this.updatesScheduler = Schedulers.boundedElastic();
} }
@Override @Override
@ -128,7 +131,7 @@ public class TransferServiceImpl implements TransferService {
}) })
.build()) .build())
.then(Mono.defer(() -> { .then(Mono.defer(() -> {
var clientStateFlux = client.getState().publish().autoConnect(2); var clientStateFlux = client.getState().publish().autoConnect(3);
clientStateFlux clientStateFlux
.filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR
@ -140,16 +143,19 @@ public class TransferServiceImpl implements TransferService {
.materialize() .materialize()
.flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) .flatMap(signal -> onUserbotClosed.get().thenReturn(signal))
.dematerialize() .dematerialize()
.subscribeOn(updatesScheduler)
.subscribe(state -> System.out.println("Userbot closed with state: " + state)); .subscribe(state -> System.out.println("Userbot closed with state: " + state));
clientStateFlux.subscribe(state -> System.out.println("state: " + state)); clientStateFlux.subscribeOn(updatesScheduler).subscribe(state -> System.out.println("state: " + state));
client client
.getIncomingUpdates() .getIncomingUpdates()
.subscribeOn(updatesScheduler)
.flatMap(update -> onClientUpdate(update)) .flatMap(update -> onClientUpdate(update))
.subscribe(_v -> {}, e -> System.err.println(e)); .subscribe(_v -> {}, e -> System.err.println(e));
return clientStateFlux return clientStateFlux
.subscribeOn(updatesScheduler)
.filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR || state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR || state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR