Fix scheduling

This commit is contained in:
Andrea Cavalli 2021-01-25 01:20:33 +01:00
parent c18530b7be
commit 57918a3ab9
4 changed files with 21 additions and 9 deletions

View File

@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class TdClusterManager {
@ -180,8 +181,10 @@ public class TdClusterManager {
} else {
sink.success(Vertx.vertx(vertxOptions));
}
})
.map(vertx -> new TdClusterManager(mgr, vertxOptions, vertx));
}).flatMap(vertx -> Mono
.fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx))
.subscribeOn(Schedulers.boundedElastic())
);
}
public Vertx getVertx() {

View File

@ -127,7 +127,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.then(Mono.defer(() -> local ? Mono.empty()
: cluster.getEventBus().<byte[]>rxRequest("bots.start-bot", msg).as(MonoUtils::toMono)))
.then();
});
})
.publishOn(Schedulers.single());
}
@SuppressWarnings("CallingSubscribeInNonBlockingScope")
@ -158,9 +159,10 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
.doOnSuccess(s -> logger.trace("About to read updates flux"))
.thenMany(updates.readAsFlux())
// Cast to fix bug of reactivex
.cast(io.vertx.core.eventbus.Message.class)
.timeout(Duration.ofSeconds(20), Mono.fromCallable(() -> {
var ex = new ConnectException("Server did not respond to 4 pings after 20 seconds (5 seconds per ping)");
.timeout(Duration.ofMinutes(1), Mono.fromCallable(() -> {
var ex = new ConnectException("Server did not respond to 12 pings after 1 minute (5 seconds per ping)");
ex.setStackTrace(new StackTraceElement[0]);
throw ex;
}))
@ -175,7 +177,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
})
.flatMap(this::interceptUpdate)
.doOnError(crash::tryEmitError)
.doOnTerminate(updatesStreamEnd::tryEmitEmpty);
.doOnTerminate(updatesStreamEnd::tryEmitEmpty)
.publishOn(Schedulers.single());
}
private Mono<TdApi.Object> interceptUpdate(TdApi.Object update) {
@ -219,6 +222,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
)
.switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> {
throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty"));
})));
})))
.publishOn(Schedulers.single());
}
}

View File

@ -98,6 +98,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
public <T extends Object> Mono<TdResult<T>> execute(Function requestFunction, boolean executeDirectly) {
return td
.<T>execute(requestFunction, executeDirectly)
.onErrorMap(error -> ResponseError.newResponseError(requestFunction, botAlias, error));
.onErrorMap(error -> ResponseError.newResponseError(requestFunction, botAlias, error))
.publishOn(Schedulers.single());
}
}

View File

@ -111,6 +111,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
})
.flatMap(Mono::hide)
.doOnSuccess(s -> logger.trace("Stated verticle"))
.publishOn(Schedulers.single())
);
}
@ -299,7 +300,10 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.timeout(Duration.ofSeconds(5), Mono.empty())
.flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono)))
.doOnError(ex -> logger.error("Undeploy of bot \"" + botAlias + "\": stop failed", ex))
.doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped"))));
.doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped"))
)
.publishOn(Schedulers.single())
);
}
private Mono<Void> pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) {