Update AsyncTdEasy.java

This commit is contained in:
Andrea Cavalli 2021-01-13 23:06:09 +01:00
parent 9502b30c29
commit cebf75d87d

View File

@ -58,7 +58,7 @@ public class AsyncTdEasy {
private static final Logger logger = LoggerFactory.getLogger(AsyncTdEasy.class); private static final Logger logger = LoggerFactory.getLogger(AsyncTdEasy.class);
private final Scheduler scheduler = Schedulers.single(); private final Scheduler scheduler = Schedulers.newSingle("AsyncTdEasy", false);
private final ReplayProcessor<AuthorizationState> authState = ReplayProcessor.create(1); private final ReplayProcessor<AuthorizationState> authState = ReplayProcessor.create(1);
private final ReplayProcessor<Boolean> requestedDefinitiveExit = ReplayProcessor.cacheLastOrDefault(false); private final ReplayProcessor<Boolean> requestedDefinitiveExit = ReplayProcessor.cacheLastOrDefault(false);
private final ReplayProcessor<TdEasySettings> settings = ReplayProcessor.cacheLast(); private final ReplayProcessor<TdEasySettings> settings = ReplayProcessor.cacheLast();
@ -130,7 +130,7 @@ public class AsyncTdEasy {
} }
private Flux<TdApi.Update> getIncomingUpdates(boolean includePreAuthUpdates) { private Flux<TdApi.Update> getIncomingUpdates(boolean includePreAuthUpdates) {
return Flux.from(incomingUpdatesCo).subscribeOn(scheduler); return Flux.from(incomingUpdatesCo).subscribeOn(scheduler).doOnComplete(() -> requestedDefinitiveExit.onNext(true));
} }
/** /**
@ -294,10 +294,6 @@ public class AsyncTdEasy {
.then(Mono.from(requestedDefinitiveExit).single()) .then(Mono.from(requestedDefinitiveExit).single())
.filter(closeRequested -> !closeRequested) .filter(closeRequested -> !closeRequested)
.doOnSuccess(v -> requestedDefinitiveExit.onNext(true)) .doOnSuccess(v -> requestedDefinitiveExit.onNext(true))
.then(td.execute(new TdApi.Close(), false))
.doOnNext(ok -> {
logger.debug("Received Ok after TdApi.Close");
})
.then(authState .then(authState
.filter(authorizationState -> authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) .filter(authorizationState -> authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR)
.take(1) .take(1)