diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 697b7e9..b6d4798 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -4,6 +4,7 @@ import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.json.JsonObject; import io.vertx.reactivex.core.Vertx; import io.vertx.reactivex.core.buffer.Buffer; +import io.vertx.reactivex.core.eventbus.Message; import io.vertx.reactivex.core.eventbus.MessageConsumer; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; @@ -57,8 +58,6 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private final Empty updatesStreamEnd = Sinks.one(); // This will only result in a crash, never completes in other ways private final Empty crash = Sinks.one(); - // Crash exception - private final AtomicReference crashException = new AtomicReference<>(null); // This will only result in a successful completion, never completes in other ways private final Empty pingFail = Sinks.one(); // This will only result in a successful completion, never completes in other ways. @@ -280,7 +279,6 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { ex.setStackTrace(new StackTraceElement[0]); throw ex; })).onErrorResume(ex -> MonoUtils.fromBlockingSingle(() -> { - crashException.set(ex); EmitResult result; while ((result = this.crash.tryEmitError(ex)) == EmitResult.FAIL_NON_SERIALIZED) { // 10ms @@ -329,18 +327,13 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { authStateClosing.tryEmitEmpty(); break; case TdApi.AuthorizationStateClosed.CONSTRUCTOR: - return Mono - .fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib")) - .then(cluster.getEventBus() - .rxRequest(this.botAddress + ".read-binlog", EMPTY) - .as(MonoUtils::toMono) - ) - .flatMap(latestBinlogMsg -> Mono - .fromCallable(latestBinlogMsg::body) - .subscribeOn(Schedulers.parallel()) - ) - .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " - + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) + logger.info("Received AuthorizationStateClosed from tdlib"); + return cluster.getEventBus() + .rxRequest(this.botAddress + ".read-binlog", EMPTY) + .as(MonoUtils::toMono) + .mapNotNull(Message::body) + .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: {}", + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) .doOnSuccess(s -> logger.info("Overwritten binlog from server")) .thenReturn(update); @@ -357,12 +350,9 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .doOnSuccess(s -> logger.debug("Failed request {} because the TDLib session was already crashed", request)) .then(Mono.>empty()); - var executionMono = Mono - .fromRunnable(() -> logger.trace("Executing request {}", request)) - .then(cluster.getEventBus() - .rxRequest(botAddress + ".execute", req, deliveryOptions) - .as(MonoUtils::toMono) - ) + var executionMono = cluster.getEventBus() + .rxRequest(botAddress + ".execute", req, deliveryOptions) + .as(MonoUtils::toMono) .onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex)) .>handle((resp, sink) -> { if (resp.body() == null) { @@ -372,6 +362,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { sink.next(resp.body().toTdResult()); } }) + .doFirst(() -> logger.trace("Executing request {}", request)) .doOnSuccess(s -> logger.trace("Executed request {}", request)) .doOnError(ex -> logger.debug("Failed request {}: {}", req, ex));