From 0bc2a616740ffec1cb023266eb2e21b66100bc77 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 25 Feb 2021 23:37:41 +0100 Subject: [PATCH] Fix concurrency errors --- .../client/AsyncTdMiddleEventBusClient.java | 25 +++++++++++++------ .../server/AsyncTdMiddleEventBusServer.java | 7 +++--- .../tdlib-session-container-log4j2.xml | 4 +-- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 7b17b16..ee413f1 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -41,6 +41,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private final TdClusterManager cluster; private final DeliveryOptions deliveryOptions; private final DeliveryOptions deliveryOptionsWithTimeout; + private final DeliveryOptions pingDeliveryOptions; private final One binlog = Sinks.one(); @@ -62,6 +63,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { this.cluster = clusterManager; this.deliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local); this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(30000); + this.pingDeliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(60000); } private Mono initializeEb() { @@ -70,6 +72,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { @Override public Mono initialize() { + // Do nothing here. return Mono.empty(); } @@ -158,7 +161,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .defer(() -> { logger.trace("Requesting ping..."); return cluster.getEventBus() - .rxRequest(botAddress + ".ping", EMPTY, deliveryOptionsWithTimeout).as(MonoUtils::toMono); + .rxRequest(botAddress + ".ping", EMPTY, pingDeliveryOptions) + .as(MonoUtils::toMono); }) .flatMap(msg -> Mono.fromCallable(() -> msg.body()).subscribeOn(Schedulers.boundedElastic())) .repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true)) @@ -170,7 +174,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .doOnNext(s -> logger.trace("PING")) .then() .onErrorResume(ex -> { - logger.trace("Ping failed", ex); + logger.warn("Ping failed", ex); return Mono.empty(); }) .doOnNext(s -> logger.debug("END PING")) @@ -204,6 +208,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .then(); } + @SuppressWarnings("Convert2MethodRef") @Override public Flux receive() { // Here the updates will be received @@ -212,14 +217,17 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .fromRunnable(() -> logger.trace("Called receive() from parent")) .then(updates.asMono()) .publishOn(Schedulers.parallel()) - .timeout(Duration.ofSeconds(5)) + .timeout(Duration.ofSeconds(30)) .flatMap(MonoUtils::fromMessageConsumer) .flatMapMany(registration -> Mono .fromRunnable(() -> logger.trace("Registering updates flux")) .then(registration.getT1()) .doOnSuccess(s -> logger.trace("Registered updates flux")) .doOnSuccess(s -> logger.trace("Sending ready-to-receive")) - .then(cluster.getEventBus().rxRequest(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout).as(MonoUtils::toMono)) + .then(cluster.getEventBus().rxRequest(botAddress + ".ready-to-receive", + EMPTY, + deliveryOptionsWithTimeout + ).as(MonoUtils::toMono)) .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) .doOnSuccess(s -> logger.trace("About to read updates flux")) .thenMany(registration.getT2()) @@ -227,7 +235,10 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .takeUntilOther(Flux .merge( crash.asMono() - .onErrorResume(ex -> Mono.empty()), + .onErrorResume(ex -> { + logger.error("TDLib crashed", ex); + return Mono.empty(); + }), pingFail.asMono() .then(Mono.fromCallable(() -> { var ex = new ConnectException("Server did not respond to ping"); @@ -246,7 +257,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { }) .flatMapSequential(this::interceptUpdate) // Redirect errors to crash sink - .doOnError(crash::tryEmitError) + .doOnError(error -> crash.tryEmitError(error)) .onErrorResume(ex -> { logger.trace("Absorbing the error, the error has been published using the crash sink", ex); return Mono.empty(); @@ -262,7 +273,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; switch (updateAuthorizationState.authorizationState.getConstructor()) { case TdApi.AuthorizationStateClosed.CONSTRUCTOR: - return Mono.fromRunnable(() -> logger.trace("Received AuthorizationStateClosed from tdlib")) + return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib")) .then(cluster.getEventBus().rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) .flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.parallel())) .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 9cce607..989c358 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -316,7 +316,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { private Mono pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { logger.trace("Preparing to pipe requests"); Flux updatesFlux = td - .receive(tdOptions) + .initialize() + .thenMany(td.receive(tdOptions)) .takeUntil(item -> { if (item instanceof Update) { var tdUpdate = (Update) item; @@ -371,13 +372,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { var tdUpdateAuthorizationState = (UpdateAuthorizationState) tdUpdate; if (tdUpdateAuthorizationState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { - logger.debug("Undeploying after receiving AuthorizationStateClosed"); + logger.info("Undeploying after receiving AuthorizationStateClosed"); return rxStop().as(MonoUtils::toMono).thenReturn(item); } } } else if (item instanceof Error) { // An error in updates means that a fatal error occurred - logger.debug("Undeploying after receiving a fatal error"); + logger.info("Undeploying after receiving a fatal error"); return rxStop().as(MonoUtils::toMono).thenReturn(item); } return Mono.just(item); diff --git a/src/main/resources/tdlib-session-container-log4j2.xml b/src/main/resources/tdlib-session-container-log4j2.xml index 244e2d5..01d584e 100644 --- a/src/main/resources/tdlib-session-container-log4j2.xml +++ b/src/main/resources/tdlib-session-container-log4j2.xml @@ -7,7 +7,7 @@ . --> - + @@ -27,7 +27,7 @@ - +