Fix concurrency errors

This commit is contained in:
Andrea Cavalli 2021-02-25 23:37:41 +01:00
parent 6a52cab8de
commit 0bc2a61674
3 changed files with 24 additions and 12 deletions

View File

@ -41,6 +41,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
private final TdClusterManager cluster; private final TdClusterManager cluster;
private final DeliveryOptions deliveryOptions; private final DeliveryOptions deliveryOptions;
private final DeliveryOptions deliveryOptionsWithTimeout; private final DeliveryOptions deliveryOptionsWithTimeout;
private final DeliveryOptions pingDeliveryOptions;
private final One<BinlogAsyncFile> binlog = Sinks.one(); private final One<BinlogAsyncFile> binlog = Sinks.one();
@ -62,6 +63,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
this.cluster = clusterManager; this.cluster = clusterManager;
this.deliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local); this.deliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local);
this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(30000); this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(30000);
this.pingDeliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(60000);
} }
private Mono<AsyncTdMiddleEventBusClient> initializeEb() { private Mono<AsyncTdMiddleEventBusClient> initializeEb() {
@ -70,6 +72,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
@Override @Override
public Mono<Void> initialize() { public Mono<Void> initialize() {
// Do nothing here.
return Mono.empty(); return Mono.empty();
} }
@ -158,7 +161,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.defer(() -> { .defer(() -> {
logger.trace("Requesting ping..."); logger.trace("Requesting ping...");
return cluster.getEventBus() return cluster.getEventBus()
.<byte[]>rxRequest(botAddress + ".ping", EMPTY, deliveryOptionsWithTimeout).as(MonoUtils::toMono); .<byte[]>rxRequest(botAddress + ".ping", EMPTY, pingDeliveryOptions)
.as(MonoUtils::toMono);
}) })
.flatMap(msg -> Mono.fromCallable(() -> msg.body()).subscribeOn(Schedulers.boundedElastic())) .flatMap(msg -> Mono.fromCallable(() -> msg.body()).subscribeOn(Schedulers.boundedElastic()))
.repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true)) .repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true))
@ -170,7 +174,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.doOnNext(s -> logger.trace("PING")) .doOnNext(s -> logger.trace("PING"))
.then() .then()
.onErrorResume(ex -> { .onErrorResume(ex -> {
logger.trace("Ping failed", ex); logger.warn("Ping failed", ex);
return Mono.empty(); return Mono.empty();
}) })
.doOnNext(s -> logger.debug("END PING")) .doOnNext(s -> logger.debug("END PING"))
@ -204,6 +208,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.then(); .then();
} }
@SuppressWarnings("Convert2MethodRef")
@Override @Override
public Flux<TdApi.Object> receive() { public Flux<TdApi.Object> receive() {
// Here the updates will be received // Here the updates will be received
@ -212,14 +217,17 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.fromRunnable(() -> logger.trace("Called receive() from parent")) .fromRunnable(() -> logger.trace("Called receive() from parent"))
.then(updates.asMono()) .then(updates.asMono())
.publishOn(Schedulers.parallel()) .publishOn(Schedulers.parallel())
.timeout(Duration.ofSeconds(5)) .timeout(Duration.ofSeconds(30))
.flatMap(MonoUtils::fromMessageConsumer) .flatMap(MonoUtils::fromMessageConsumer)
.flatMapMany(registration -> Mono .flatMapMany(registration -> Mono
.fromRunnable(() -> logger.trace("Registering updates flux")) .fromRunnable(() -> logger.trace("Registering updates flux"))
.then(registration.getT1()) .then(registration.getT1())
.doOnSuccess(s -> logger.trace("Registered updates flux")) .doOnSuccess(s -> logger.trace("Registered updates flux"))
.doOnSuccess(s -> logger.trace("Sending ready-to-receive")) .doOnSuccess(s -> logger.trace("Sending ready-to-receive"))
.then(cluster.getEventBus().<byte[]>rxRequest(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout).as(MonoUtils::toMono)) .then(cluster.getEventBus().<byte[]>rxRequest(botAddress + ".ready-to-receive",
EMPTY,
deliveryOptionsWithTimeout
).as(MonoUtils::toMono))
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
.doOnSuccess(s -> logger.trace("About to read updates flux")) .doOnSuccess(s -> logger.trace("About to read updates flux"))
.thenMany(registration.getT2()) .thenMany(registration.getT2())
@ -227,7 +235,10 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.takeUntilOther(Flux .takeUntilOther(Flux
.merge( .merge(
crash.asMono() crash.asMono()
.onErrorResume(ex -> Mono.empty()), .onErrorResume(ex -> {
logger.error("TDLib crashed", ex);
return Mono.empty();
}),
pingFail.asMono() pingFail.asMono()
.then(Mono.fromCallable(() -> { .then(Mono.fromCallable(() -> {
var ex = new ConnectException("Server did not respond to ping"); var ex = new ConnectException("Server did not respond to ping");
@ -246,7 +257,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
}) })
.flatMapSequential(this::interceptUpdate) .flatMapSequential(this::interceptUpdate)
// Redirect errors to crash sink // Redirect errors to crash sink
.doOnError(crash::tryEmitError) .doOnError(error -> crash.tryEmitError(error))
.onErrorResume(ex -> { .onErrorResume(ex -> {
logger.trace("Absorbing the error, the error has been published using the crash sink", ex); logger.trace("Absorbing the error, the error has been published using the crash sink", ex);
return Mono.empty(); return Mono.empty();
@ -262,7 +273,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
switch (updateAuthorizationState.authorizationState.getConstructor()) { switch (updateAuthorizationState.authorizationState.getConstructor()) {
case TdApi.AuthorizationStateClosed.CONSTRUCTOR: case TdApi.AuthorizationStateClosed.CONSTRUCTOR:
return Mono.fromRunnable(() -> logger.trace("Received AuthorizationStateClosed from tdlib")) return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib"))
.then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) .then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono))
.flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.parallel())) .flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.parallel()))
.doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length())))

View File

@ -316,7 +316,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
private Mono<Void> pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { private Mono<Void> pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) {
logger.trace("Preparing to pipe requests"); logger.trace("Preparing to pipe requests");
Flux<TdResultList> updatesFlux = td Flux<TdResultList> updatesFlux = td
.receive(tdOptions) .initialize()
.thenMany(td.receive(tdOptions))
.takeUntil(item -> { .takeUntil(item -> {
if (item instanceof Update) { if (item instanceof Update) {
var tdUpdate = (Update) item; var tdUpdate = (Update) item;
@ -371,13 +372,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
var tdUpdateAuthorizationState = (UpdateAuthorizationState) tdUpdate; var tdUpdateAuthorizationState = (UpdateAuthorizationState) tdUpdate;
if (tdUpdateAuthorizationState.authorizationState.getConstructor() if (tdUpdateAuthorizationState.authorizationState.getConstructor()
== AuthorizationStateClosed.CONSTRUCTOR) { == AuthorizationStateClosed.CONSTRUCTOR) {
logger.debug("Undeploying after receiving AuthorizationStateClosed"); logger.info("Undeploying after receiving AuthorizationStateClosed");
return rxStop().as(MonoUtils::toMono).thenReturn(item); return rxStop().as(MonoUtils::toMono).thenReturn(item);
} }
} }
} else if (item instanceof Error) { } else if (item instanceof Error) {
// An error in updates means that a fatal error occurred // An error in updates means that a fatal error occurred
logger.debug("Undeploying after receiving a fatal error"); logger.info("Undeploying after receiving a fatal error");
return rxStop().as(MonoUtils::toMono).thenReturn(item); return rxStop().as(MonoUtils::toMono).thenReturn(item);
} }
return Mono.just(item); return Mono.just(item);

View File

@ -7,7 +7,7 @@
<http://logging.apache.org/log4j/2.x/manual/appenders.html>. <http://logging.apache.org/log4j/2.x/manual/appenders.html>.
--> -->
<Configuration status="INFO"> <Configuration status="DEBUG">
<Appenders> <Appenders>
<!-- DEFAULT APPENDERS --> <!-- DEFAULT APPENDERS -->
@ -27,7 +27,7 @@
<Loggers> <Loggers>
<Logger name="com.hazelcast.internal.diagnostics.HealthMonitor" level="WARN" /> <Logger name="com.hazelcast.internal.diagnostics.HealthMonitor" level="WARN" />
<Logger name="com.hazelcast" level="INFO" /> <Logger name="com.hazelcast" level="INFO" />
<Root level="INFO"> <Root level="DEBUG">
<filters> <filters>
<MarkerFilter marker="NETWORK_PACKETS" onMatch="DENY" <MarkerFilter marker="NETWORK_PACKETS" onMatch="DENY"
onMismatch="NEUTRAL" /> onMismatch="NEUTRAL" />