This commit is contained in:
Andrea Cavalli 2021-01-13 23:43:09 +01:00
parent cebf75d87d
commit 606cd23deb
2 changed files with 8 additions and 4 deletions

View File

@ -87,9 +87,9 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
// Send close if the stream is disposed before tdlib is closed // Send close if the stream is disposed before tdlib is closed
emitter.onDispose(() -> { emitter.onDispose(() -> {
closedFromTd.asMono().take(Duration.ofMillis(10)).switchIfEmpty(Mono.fromRunnable(() -> client.send(new Close(), closedFromTd.asMono().take(Duration.ofMillis(10)).switchIfEmpty(Mono.fromRunnable(() -> client.send(new Close(),
result -> logger.trace("Close result: {}", result), result -> logger.warn("Close result: {}", result),
ex -> logger.trace("Error when disposing td client", ex) ex -> logger.error("Error when disposing td client", ex)
))).subscribe(); ))).subscribeOn(tdScheduler).subscribe();
}); });
}).subscribeOn(tdScheduler); }).subscribeOn(tdScheduler);
} }

View File

@ -130,7 +130,7 @@ public class AsyncTdEasy {
} }
private Flux<TdApi.Update> getIncomingUpdates(boolean includePreAuthUpdates) { private Flux<TdApi.Update> getIncomingUpdates(boolean includePreAuthUpdates) {
return Flux.from(incomingUpdatesCo).subscribeOn(scheduler).doOnComplete(() -> requestedDefinitiveExit.onNext(true)); return Flux.from(incomingUpdatesCo).doFinally(s -> requestedDefinitiveExit.onNext(true)).subscribeOn(scheduler);
} }
/** /**
@ -294,6 +294,10 @@ public class AsyncTdEasy {
.then(Mono.from(requestedDefinitiveExit).single()) .then(Mono.from(requestedDefinitiveExit).single())
.filter(closeRequested -> !closeRequested) .filter(closeRequested -> !closeRequested)
.doOnSuccess(v -> requestedDefinitiveExit.onNext(true)) .doOnSuccess(v -> requestedDefinitiveExit.onNext(true))
.then(td.execute(new TdApi.Close(), false))
.doOnNext(ok -> {
logger.debug("Received Ok after TdApi.Close");
})
.then(authState .then(authState
.filter(authorizationState -> authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) .filter(authorizationState -> authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR)
.take(1) .take(1)