From f1c6fcf1a0275a9f7160e50fb7024b53f8d8d920 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 30 Sep 2021 19:18:25 +0200 Subject: [PATCH] Improve client --- .../client/AsyncTdMiddleEventBusClient.java | 70 +++++++++++-------- 1 file changed, 42 insertions(+), 28 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 504355c..697b7e9 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -26,6 +26,7 @@ import it.tdlight.utils.MonoUtils; import java.net.ConnectException; import java.nio.file.Path; import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import org.warp.commonutils.locks.LockUtils; import org.warp.commonutils.log.Logger; @@ -56,6 +57,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private final Empty updatesStreamEnd = Sinks.one(); // This will only result in a crash, never completes in other ways private final Empty crash = Sinks.one(); + // Crash exception + private final AtomicReference crashException = new AtomicReference<>(null); // This will only result in a successful completion, never completes in other ways private final Empty pingFail = Sinks.one(); // This will only result in a successful completion, never completes in other ways. @@ -277,6 +280,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { ex.setStackTrace(new StackTraceElement[0]); throw ex; })).onErrorResume(ex -> MonoUtils.fromBlockingSingle(() -> { + crashException.set(ex); EmitResult result; while ((result = this.crash.tryEmitError(ex)) == EmitResult.FAIL_NON_SERIALIZED) { // 10ms @@ -325,10 +329,18 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { authStateClosing.tryEmitEmpty(); break; case TdApi.AuthorizationStateClosed.CONSTRUCTOR: - return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib")) - .then(cluster.getEventBus().rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) - .flatMap(latestBinlogMsg -> Mono.fromCallable(latestBinlogMsg::body).subscribeOn(Schedulers.parallel())) - .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) + return Mono + .fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib")) + .then(cluster.getEventBus() + .rxRequest(this.botAddress + ".read-binlog", EMPTY) + .as(MonoUtils::toMono) + ) + .flatMap(latestBinlogMsg -> Mono + .fromCallable(latestBinlogMsg::body) + .subscribeOn(Schedulers.parallel()) + ) + .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) .doOnSuccess(s -> logger.info("Overwritten binlog from server")) .thenReturn(update); @@ -340,30 +352,32 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { @Override public Mono> execute(Function request, boolean executeDirectly) { var req = new ExecuteObject(executeDirectly, request); + + var crashMono = crash.asMono() + .doOnSuccess(s -> logger.debug("Failed request {} because the TDLib session was already crashed", request)) + .then(Mono.>empty()); + + var executionMono = Mono + .fromRunnable(() -> logger.trace("Executing request {}", request)) + .then(cluster.getEventBus() + .rxRequest(botAddress + ".execute", req, deliveryOptions) + .as(MonoUtils::toMono) + ) + .onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex)) + .>handle((resp, sink) -> { + if (resp.body() == null) { + var tdError = new TdError(500, "Response is empty"); + sink.error(ResponseError.newResponseError(request, botAlias, tdError)); + } else { + sink.next(resp.body().toTdResult()); + } + }) + .doOnSuccess(s -> logger.trace("Executed request {}", request)) + .doOnError(ex -> logger.debug("Failed request {}: {}", req, ex)); + return Mono - .firstWithSignal( - MonoUtils - .castVoid(crash - .asMono() - .doOnSuccess(s -> logger - .debug("Failed request {} because the TDLib session was already crashed", request)) - ), - Mono - .fromRunnable(() -> logger.trace("Executing request {}", request)) - .then(cluster.getEventBus().rxRequest(botAddress + ".execute", req, deliveryOptions).as(MonoUtils::toMono)) - .onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex)) - .>handle((resp, sink) -> { - if (resp.body() == null) { - sink.error(ResponseError.newResponseError(request, botAlias, new TdError(500, "Response is empty"))); - } else { - sink.next(resp.body().toTdResult()); - } - }) - .doOnSuccess(s -> logger.trace("Executed request {}", request)) - .doOnError(ex -> logger.debug("Failed request {}: {}", req, ex)) - ) - .switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> { - throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty")); - }))); + .firstWithSignal(crashMono, executionMono) + .switchIfEmpty(Mono.error(() -> ResponseError.newResponseError(request, botAlias, + new TdError(500, "Client is closed or response is empty")))); } }