Subscribe on "Parallel"

This commit is contained in:
Andrea Cavalli 2021-01-27 01:09:22 +01:00
parent 40ec712cf0
commit a10158ae92
8 changed files with 38 additions and 37 deletions

View File

@ -99,7 +99,7 @@ public class EventBusFlux {
var responseHandler = MonoUtils.toHandler(itemSink); var responseHandler = MonoUtils.toHandler(itemSink);
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions, responseHandler); eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions, responseHandler);
})) }))
.publishOn(Schedulers.single()) .publishOn(Schedulers.parallel())
.subscribe(response -> {}, error -> { .subscribe(response -> {}, error -> {
if (error instanceof ReplyException) { if (error instanceof ReplyException) {
var errorMessageCode = ((ReplyException) error).failureCode(); var errorMessageCode = ((ReplyException) error).failureCode();
@ -208,7 +208,7 @@ public class EventBusFlux {
sink.success(); sink.success();
} }
}); });
}).publishOn(Schedulers.single()).share(); }).publishOn(Schedulers.parallel()).share();
return Tuples.of(servedMono, fatalErrorSink.asMono()); return Tuples.of(servedMono, fatalErrorSink.asMono());
} }
@ -313,7 +313,7 @@ public class EventBusFlux {
}))) })))
.publishOn(Schedulers.boundedElastic()) .publishOn(Schedulers.boundedElastic())
.onBackpressureBuffer() .onBackpressureBuffer()
.publishOn(Schedulers.single()) .publishOn(Schedulers.parallel())
.subscribe(v -> {}, emitter::error); .subscribe(v -> {}, emitter::error);
emitter.onDispose(() -> { emitter.onDispose(() -> {

View File

@ -102,12 +102,12 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
result -> logger.warn("Close result: {}", result), result -> logger.warn("Close result: {}", result),
ex -> logger.error("Error when disposing td client", ex) ex -> logger.error("Error when disposing td client", ex)
); );
}).publishOn(Schedulers.single()).subscribe(); }).publishOn(Schedulers.parallel()).subscribe();
}); });
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic()) .publishOn(Schedulers.parallel())
) )
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(Schedulers.parallel());
} }
} }

View File

@ -140,7 +140,7 @@ public class AsyncTdEasy {
return true; return true;
}) })
.publishOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.flatMap(_v -> { .flatMap(_v -> {
this.settings.tryEmitNext(settings); this.settings.tryEmitNext(settings);
return Mono.empty(); return Mono.empty();
@ -563,7 +563,7 @@ public class AsyncTdEasy {
)) ))
.filterWhen(file -> Mono .filterWhen(file -> Mono
.fromCallable(() -> Files.exists(file)) .fromCallable(() -> Files.exists(file))
.publishOn(Schedulers.boundedElastic())) .subscribeOn(Schedulers.boundedElastic()))
.doOnNext(directory -> { .doOnNext(directory -> {
try { try {
if (!Files.walk(directory) if (!Files.walk(directory)

View File

@ -187,7 +187,7 @@ public class TdClusterManager {
return Mono.just(Vertx.vertx(vertxOptions)); return Mono.just(Vertx.vertx(vertxOptions));
} }
}) })
.publishOn(Schedulers.single()) .publishOn(Schedulers.parallel())
.flatMap(vertx -> Mono .flatMap(vertx -> Mono
.fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx)) .fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx))
.publishOn(Schedulers.boundedElastic()) .publishOn(Schedulers.boundedElastic())

View File

@ -142,7 +142,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
})) }))
.then(setupPing()); .then(setupPing());
}) })
.publishOn(Schedulers.single()); .publishOn(Schedulers.parallel());
} }
private Mono<Void> setupPing() { private Mono<Void> setupPing() {
@ -171,7 +171,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
}) })
.doOnNext(s -> logger.debug("END PING")) .doOnNext(s -> logger.debug("END PING"))
.then(MonoUtils.emitEmpty(this.pingFail)) .then(MonoUtils.emitEmpty(this.pingFail))
.subscribeOn(Schedulers.single()) .subscribeOn(Schedulers.parallel())
.subscribe(); .subscribe();
} }
logger.trace("Ping setup success"); logger.trace("Ping setup success");
@ -206,9 +206,9 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return Mono return Mono
.fromRunnable(() -> logger.trace("Called receive() from parent")) .fromRunnable(() -> logger.trace("Called receive() from parent"))
.then(updates.asMono().publishOn(Schedulers.single())) .then(updates.asMono().publishOn(Schedulers.parallel()))
.timeout(Duration.ofSeconds(5)) .timeout(Duration.ofSeconds(5))
.publishOn(Schedulers.single()) .publishOn(Schedulers.parallel())
.flatMap(MonoUtils::fromMessageConsumer) .flatMap(MonoUtils::fromMessageConsumer)
.flatMapMany(registration -> Mono .flatMapMany(registration -> Mono
.fromRunnable(() -> logger.trace("Registering updates flux")) .fromRunnable(() -> logger.trace("Registering updates flux"))
@ -240,7 +240,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow()); return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow());
} }
}) })
.publishOn(Schedulers.single()) .publishOn(Schedulers.parallel())
.flatMapSequential(this::interceptUpdate) .flatMapSequential(this::interceptUpdate)
// Redirect errors to crash sink // Redirect errors to crash sink
.doOnError(crash::tryEmitError) .doOnError(crash::tryEmitError)
@ -250,7 +250,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
}) })
.doOnTerminate(updatesStreamEnd::tryEmitEmpty) .doOnTerminate(updatesStreamEnd::tryEmitEmpty)
.publishOn(Schedulers.single()); .publishOn(Schedulers.parallel());
} }
private Mono<TdApi.Object> interceptUpdate(TdApi.Object update) { private Mono<TdApi.Object> interceptUpdate(TdApi.Object update) {
@ -262,11 +262,11 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
case TdApi.AuthorizationStateClosed.CONSTRUCTOR: case TdApi.AuthorizationStateClosed.CONSTRUCTOR:
return Mono.fromRunnable(() -> logger.trace("Received AuthorizationStateClosed from tdlib")) return Mono.fromRunnable(() -> logger.trace("Received AuthorizationStateClosed from tdlib"))
.then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) .then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono))
.flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.boundedElastic())) .flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.parallel()))
.doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length())))
.flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog()))
.doOnSuccess(s -> logger.info("Overwritten binlog from server")) .doOnSuccess(s -> logger.info("Overwritten binlog from server"))
.publishOn(Schedulers.boundedElastic()) .publishOn(Schedulers.parallel())
.thenReturn(update); .thenReturn(update);
} }
break; break;
@ -291,13 +291,13 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
} else { } else {
return resp.body().toTdResult(); return resp.body().toTdResult();
} }
}).subscribeOn(Schedulers.boundedElastic()) }).subscribeOn(Schedulers.parallel())
) )
.doOnSuccess(s -> logger.trace("Executed request")) .doOnSuccess(s -> logger.trace("Executed request"))
) )
.switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> { .switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> {
throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty")); throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty"));
}))) })))
.publishOn(Schedulers.single()); .publishOn(Schedulers.parallel());
} }
} }

View File

@ -91,8 +91,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
.doOnNext(s -> logger.trace("Received update from tdlib: {}", s.getClass().getSimpleName())) .doOnNext(s -> logger.trace("Received update from tdlib: {}", s.getClass().getSimpleName()))
.doOnError(ex -> logger.info("TdMiddle verticle error", ex)) .doOnError(ex -> logger.info("TdMiddle verticle error", ex))
.doOnTerminate(() -> logger.debug("TdMiddle verticle stopped")) .doOnTerminate(() -> logger.debug("TdMiddle verticle stopped"))
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.parallel());
.publishOn(Schedulers.single());
} }
@Override @Override
@ -100,6 +99,6 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
return td return td
.<T>execute(requestFunction, executeDirectly) .<T>execute(requestFunction, executeDirectly)
.onErrorMap(error -> ResponseError.newResponseError(requestFunction, botAlias, error)) .onErrorMap(error -> ResponseError.newResponseError(requestFunction, botAlias, error))
.publishOn(Schedulers.single()); .publishOn(Schedulers.parallel());
} }
} }

View File

@ -107,9 +107,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
} }
return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local); return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local);
}) })
.flatMap(voidMono -> voidMono.hide().subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.single())) .flatMap(voidMono -> voidMono.hide().subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.parallel()))
.doOnSuccess(s -> logger.trace("Stated verticle")) .doOnSuccess(s -> logger.trace("Stated verticle"))
.publishOn(Schedulers.single()) .publishOn(Schedulers.parallel())
); );
} }
@ -141,7 +141,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}) })
.flatMapSequential(msg -> Mono .flatMapSequential(msg -> Mono
.fromCallable(() -> Tuples.of(msg, msg.body())) .fromCallable(() -> Tuples.of(msg, msg.body()))
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.parallel())
) )
.flatMapSequential(tuple -> { .flatMapSequential(tuple -> {
var msg = tuple.getT1(); var msg = tuple.getT1();
@ -169,7 +169,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
} }
}).subscribeOn(Schedulers.boundedElastic())) }).subscribeOn(Schedulers.boundedElastic()))
.then() .then()
.publishOn(Schedulers.single()) .publishOn(Schedulers.parallel())
.subscribe(v -> {}, .subscribe(v -> {},
ex -> logger.error("Error when processing an execute request", ex), ex -> logger.error("Error when processing an execute request", ex),
() -> logger.trace("Finished handling execute requests") () -> logger.trace("Finished handling execute requests")
@ -182,7 +182,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
} }
BinlogUtils BinlogUtils
.readBinlogConsumer(vertx, readBinlogConsumer, botId, local) .readBinlogConsumer(vertx, readBinlogConsumer, botId, local)
.publishOn(Schedulers.single()) .publishOn(Schedulers.parallel())
.subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex)); .subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex));
MessageConsumer<byte[]> readyToReceiveConsumer = vertx.eventBus().consumer(botAddress + ".ready-to-receive"); MessageConsumer<byte[]> readyToReceiveConsumer = vertx.eventBus().consumer(botAddress + ".ready-to-receive");
@ -255,11 +255,11 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.andThen(pingConsumer.rxCompletionHandler()) .andThen(pingConsumer.rxCompletionHandler())
.as(MonoUtils::toMono) .as(MonoUtils::toMono)
.doOnSuccess(s -> logger.trace("Finished preparing listeners")) .doOnSuccess(s -> logger.trace("Finished preparing listeners"))
.publishOn(Schedulers.single()) .publishOn(Schedulers.parallel())
.subscribe(v -> {}, registrationSink::error, registrationSink::success); .subscribe(v -> {}, registrationSink::error, registrationSink::success);
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.single()); .publishOn(Schedulers.parallel());
} }
/** /**
@ -298,10 +298,10 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
// Since every consumer of ReadBinLog is identical, this should not pose a problem. // Since every consumer of ReadBinLog is identical, this should not pose a problem.
.delay(Duration.ofMinutes(30)) .delay(Duration.ofMinutes(30))
.then(ec.rxUnregister().as(MonoUtils::toMono)) .then(ec.rxUnregister().as(MonoUtils::toMono))
.publishOn(Schedulers.single()) .publishOn(Schedulers.parallel())
.subscribe(); .subscribe();
return null; return null;
}).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.single())) }).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.parallel()))
) )
.then(readyToReceiveConsumer .then(readyToReceiveConsumer
.asMono() .asMono()
@ -314,7 +314,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.doOnError(ex -> logger.error("Undeploy of bot \"" + botAlias + "\": stop failed", ex)) .doOnError(ex -> logger.error("Undeploy of bot \"" + botAlias + "\": stop failed", ex))
.doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped")) .doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped"))
) )
.publishOn(Schedulers.single()) .publishOn(Schedulers.parallel())
); );
} }

View File

@ -343,7 +343,7 @@ public class MonoUtils {
private static Future<Void> toVertxFuture(Mono<Void> toTransform) { private static Future<Void> toVertxFuture(Mono<Void> toTransform) {
var promise = Promise.<Void>promise(); var promise = Promise.<Void>promise();
toTransform.publishOn(Schedulers.single()).subscribe(next -> {}, promise::fail, promise::complete); toTransform.publishOn(Schedulers.parallel()).subscribe(next -> {}, promise::fail, promise::complete);
return promise.future(); return promise.future();
} }
@ -363,15 +363,17 @@ public class MonoUtils {
sink.onDispose(messageConsumer::unregister); sink.onDispose(messageConsumer::unregister);
}) })
//.startWith(MonoUtils.castVoid(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono))) //.startWith(MonoUtils.castVoid(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono)))
.flatMapSequential(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic())) .flatMapSequential(msg -> Mono
.subscribeOn(Schedulers.boundedElastic()); .fromCallable(msg::body)
.subscribeOn(Schedulers.parallel())
);
} }
public static <T> Mono<Tuple2<Mono<Void>, Flux<T>>> fromMessageConsumer(MessageConsumer<T> messageConsumer) { public static <T> Mono<Tuple2<Mono<Void>, Flux<T>>> fromMessageConsumer(MessageConsumer<T> messageConsumer) {
return fromReplyableMessageConsumer(messageConsumer) return fromReplyableMessageConsumer(messageConsumer)
.map(tuple -> tuple.mapT2(msgs -> msgs.flatMapSequential(msg -> Mono .map(tuple -> tuple.mapT2(msgs -> msgs.flatMapSequential(msg -> Mono
.fromCallable(msg::body) .fromCallable(msg::body)
.subscribeOn(Schedulers.boundedElastic()))) .subscribeOn(Schedulers.parallel())))
); );
} }
@ -379,7 +381,7 @@ public class MonoUtils {
return fromReplyableMessageConsumer(messageConsumer) return fromReplyableMessageConsumer(messageConsumer)
.map(tuple -> tuple.mapT2(msgs -> msgs.flatMapSequential(msg -> Mono .map(tuple -> tuple.mapT2(msgs -> msgs.flatMapSequential(msg -> Mono
.fromCallable(() -> Tuples.<Message<?>, T>of(msg, msg.body())) .fromCallable(() -> Tuples.<Message<?>, T>of(msg, msg.body()))
.subscribeOn(Schedulers.boundedElastic()))) .subscribeOn(Schedulers.parallel())))
); );
} }