From 620914b3cfff9f99ed91b6559b39687b70054652 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 25 Jan 2021 02:31:17 +0100 Subject: [PATCH] Try to avoid insidious unexpected blocking code from hazelcast, netty, vert.x --- .../td/middle/TdClusterManager.java | 4 +- .../client/AsyncTdMiddleEventBusClient.java | 94 ++++++++++------ .../server/AsyncTdMiddleEventBusServer.java | 8 +- .../java/it/tdlight/utils/BinlogUtils.java | 13 ++- src/main/java/it/tdlight/utils/MonoUtils.java | 101 +++++++++--------- 5 files changed, 126 insertions(+), 94 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java index 216c5f5..f56b754 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java @@ -181,7 +181,9 @@ public class TdClusterManager { } else { sink.success(Vertx.vertx(vertxOptions)); } - }).flatMap(vertx -> Mono + }) + .publishOn(Schedulers.boundedElastic()) + .flatMap(vertx -> Mono .fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx)) .subscribeOn(Schedulers.boundedElastic()) ); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 7743fde..5d9e11a 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -24,6 +24,7 @@ import it.tdlight.utils.MonoUtils.SinkRWStream; import java.net.ConnectException; import java.nio.file.Path; import java.time.Duration; +import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -45,7 +46,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private final One binlog = Sinks.one(); - SinkRWStream> updates = MonoUtils.unicastBackpressureSinkStreak(); + private final One>> updates = Sinks.one(); // This will only result in a successful completion, never completes in other ways private final Empty updatesStreamEnd = Sinks.one(); // This will only result in a crash, never completes in other ways @@ -63,25 +64,32 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(30000); } + private Mono initialize() { + return MonoUtils.>unicastBackpressureSinkStream() + .flatMap(updates -> MonoUtils.emitValue(this.updates, updates)) + .thenReturn(this); + } + public static Mono getAndDeployInstance(TdClusterManager clusterManager, int botId, String botAlias, boolean local, JsonObject implementationDetails, Path binlogsArchiveDirectory) { - var instance = new AsyncTdMiddleEventBusClient(clusterManager); - return retrieveBinlog(clusterManager.getVertx(), binlogsArchiveDirectory, botId) - .flatMap(binlog -> binlog - .getLastModifiedTime() - .filter(modTime -> modTime == 0) - .doOnNext(v -> LoggerFactory - .getLogger(AsyncTdMiddleEventBusClient.class) - .error("Can't retrieve binlog of bot " + botId + " " + botAlias + ". Creating a new one...")) - .thenReturn(binlog) - ) - .flatMap(binlog -> instance - .start(botId, botAlias, local, implementationDetails, binlog) - .thenReturn(instance) + return new AsyncTdMiddleEventBusClient(clusterManager) + .initialize() + .flatMap(instance -> retrieveBinlog(clusterManager.getVertx(), binlogsArchiveDirectory, botId) + .flatMap(binlog -> binlog + .getLastModifiedTime() + .filter(modTime -> modTime == 0) + .doOnNext(v -> LoggerFactory + .getLogger(AsyncTdMiddleEventBusClient.class) + .error("Can't retrieve binlog of bot " + botId + " " + botAlias + ". Creating a new one...")) + .thenReturn(binlog)).flatMap(binlog -> instance + .start(botId, botAlias, local, implementationDetails, binlog) + .thenReturn(instance) + ) + .single() ) .single(); } @@ -124,29 +132,42 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { implementationDetails ); return setupUpdatesListener() - .then(Mono.defer(() -> local ? Mono.empty() - : cluster.getEventBus().rxRequest("bots.start-bot", msg).as(MonoUtils::toMono))) + .then(Mono.defer(() -> { + if (local) { + return Mono.empty(); + } + return cluster.getEventBus() + .rxRequest("bots.start-bot", msg).as(MonoUtils::toMono) + .publishOn(Schedulers.boundedElastic()); + })) .then(); }) .publishOn(Schedulers.single()); } - @SuppressWarnings("CallingSubscribeInNonBlockingScope") private Mono setupUpdatesListener() { - MessageConsumer updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().consumer( - botAddress + ".updates").setMaxBufferedMessages(5000).getDelegate()); - updateConsumer.endHandler(h -> { - logger.error("<<<<<<<<<<<<<<<>>>>>>>>>>>>"); - }); + return MonoUtils + .fromBlockingMaybe(() -> { + MessageConsumer updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().consumer( + botAddress + ".updates").setMaxBufferedMessages(5000).getDelegate()); + updateConsumer.endHandler(h -> { + logger.error("<<<<<<<<<<<<<<<>>>>>>>>>>>>"); + }); - // Here the updates will be piped from the server to the client - updateConsumer - .rxPipeTo(updates.writeAsStream()).as(MonoUtils::toMono) - .subscribeOn(Schedulers.single()) - .subscribe(); + // Here the updates will be piped from the server to the client + updates + .asMono() + .timeout(Duration.ofSeconds(5)) + .flatMap(updates -> updateConsumer + .rxPipeTo(updates.writeAsStream()).as(MonoUtils::toMono) + ) + .publishOn(Schedulers.newSingle("td-client-updates-pipe")) + .subscribe(); - // Return when the registration of all the consumers has been done across the cluster - return updateConsumer.rxCompletionHandler().as(MonoUtils::toMono); + return updateConsumer; + }) + // Return when the registration of all the consumers has been done across the cluster + .flatMap(updateConsumer -> updateConsumer.rxCompletionHandler().as(MonoUtils::toMono)); } @Override @@ -158,7 +179,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .then() .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) .doOnSuccess(s -> logger.trace("About to read updates flux")) - .thenMany(updates.readAsFlux()) + .then(updates.asMono().timeout(Duration.ofSeconds(5))) + .flatMapMany(SinkRWStream::readAsFlux) // Cast to fix bug of reactivex .cast(io.vertx.core.eventbus.Message.class) .timeout(Duration.ofMinutes(1), Mono.fromCallable(() -> { @@ -166,9 +188,12 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { ex.setStackTrace(new StackTraceElement[0]); throw ex; })) - .doOnSubscribe(s -> cluster.getEventBus().send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout)) + .doOnSubscribe(s -> Schedulers.boundedElastic().schedule(() -> { + cluster.getEventBus().send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout); + })) + .flatMap(updates -> Mono.fromCallable((Callable) updates::body).publishOn(Schedulers.boundedElastic())) .flatMap(updates -> { - var result = (TdResultList) updates.body(); + var result = (TdResultList) updates; if (result.succeeded()) { return Flux.fromIterable(result.value()); } else { @@ -192,6 +217,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .doOnNext(l -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(l.body().binlog().length))) .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.body().binlog())) .doOnSuccess(s -> logger.info("Overwritten binlog from server")) + .publishOn(Schedulers.boundedElastic()) .thenReturn(update); } break; @@ -210,13 +236,13 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .then(cluster.getEventBus().rxRequest(botAddress + ".execute", req, deliveryOptions).as(MonoUtils::toMono)) .onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex)) .>flatMap(resp -> Mono - .fromCallable(() -> { + .>fromCallable(() -> { if (resp.body() == null) { throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Response is empty")); } else { return resp.body().toTdResult(); } - }) + }).publishOn(Schedulers.boundedElastic()) ) .doOnSuccess(s -> logger.trace("Executed request")) ) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index c1d852f..884b110 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -128,7 +128,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } private Mono listen(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { - return Mono.create(registrationSink -> { + return Mono.create(registrationSink -> Schedulers.boundedElastic().schedule(() -> { logger.trace("Preparing listeners"); MessageConsumer executeConsumer = vertx.eventBus().consumer(botAddress + ".execute"); @@ -249,7 +249,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .subscribeOn(io.reactivex.schedulers.Schedulers.single()) .doOnComplete(() -> logger.trace("Finished preparing listeners")) .subscribe(registrationSink::success, registrationSink::error); - }); + })); } /** @@ -282,13 +282,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .then(readBinlogConsumer .asMono() .timeout(Duration.ofSeconds(10), Mono.empty()) - .doOnNext(ec -> Mono + .doOnNext(ec -> Schedulers.boundedElastic().schedule(() -> Mono // ReadBinLog will live for another 30 minutes. // Since every consumer of ReadBinLog is identical, this should not pose a problem. .delay(Duration.ofMinutes(30)) .then(ec.rxUnregister().as(MonoUtils::toMono)) .subscribeOn(Schedulers.single()) - .subscribe() + .subscribe()) ) ) .then(readyToReceiveConsumer diff --git a/src/main/java/it/tdlight/utils/BinlogUtils.java b/src/main/java/it/tdlight/utils/BinlogUtils.java index 29a15b2..24d3472 100644 --- a/src/main/java/it/tdlight/utils/BinlogUtils.java +++ b/src/main/java/it/tdlight/utils/BinlogUtils.java @@ -19,6 +19,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @@ -39,7 +40,8 @@ public class BinlogUtils { // Open file .flatMap(x -> vertxFilesystem.rxOpen(path, openOptions).as(MonoUtils::toMono)) .map(file -> new BinlogAsyncFile(vertxFilesystem, path, file)) - .single(); + .single() + .publishOn(Schedulers.boundedElastic()); } public static Mono saveBinlog(BinlogAsyncFile binlog, byte[] data) { @@ -66,7 +68,8 @@ public class BinlogUtils { .then(retrieveBinlog(vertxFilesystem, binlogPath)) ) .single() - .then(); + .then() + .publishOn(Schedulers.boundedElastic()); } public static Mono cleanSessionPath(FileSystem vertxFilesystem, @@ -86,7 +89,8 @@ public class BinlogUtils { .flatMap(file -> vertxFilesystem.rxDeleteRecursive(file, true).as(MonoUtils::toMono)) .onErrorResume(ex -> Mono.empty()) .then() - ); + ) + .publishOn(Schedulers.boundedElastic()); } public static String humanReadableByteCountBin(long bytes) { @@ -123,6 +127,7 @@ public class BinlogUtils { var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); tuple.getT1().reply(new EndSessionMessage(botId, tuple.getT2()), opts); }) - .then(); + .then() + .publishOn(Schedulers.boundedElastic()); } } diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index 137609a..0c9dca3 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -31,7 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.warp.commonutils.concurrency.future.CompletableFutureUtils; import reactor.core.CoreSubscriber; -import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; @@ -303,7 +302,7 @@ public class MonoUtils { return fromEmitResultFuture(sink.tryEmitEmpty()); } - public static SinkRWStream unicastBackpressureSinkStreak() { + public static Mono> unicastBackpressureSinkStream() { Many sink = Sinks.many().unicast().onBackpressureBuffer(); return asStream(sink, null, null, 1); } @@ -311,7 +310,7 @@ public class MonoUtils { /** * Create a sink that can be written from a writeStream */ - public static SinkRWStream unicastBackpressureStream(int maxBackpressureQueueSize) { + public static Mono> unicastBackpressureStream(int maxBackpressureQueueSize) { Queue boundedQueue = Queues.get(maxBackpressureQueueSize).get(); var queueSize = Flux .interval(Duration.ZERO, Duration.ofMillis(500)) @@ -321,16 +320,16 @@ public class MonoUtils { return asStream(sink, queueSize, termination, maxBackpressureQueueSize); } - public static SinkRWStream unicastBackpressureErrorStream() { + public static Mono> unicastBackpressureErrorStream() { Many sink = Sinks.many().unicast().onBackpressureError(); return asStream(sink, null, null, 1); } - public static SinkRWStream asStream(Many sink, + public static Mono> asStream(Many sink, @Nullable Flux backpressureSize, @Nullable Empty termination, int maxBackpressureQueueSize) { - return new SinkRWStream<>(sink, backpressureSize, termination, maxBackpressureQueueSize); + return SinkRWStream.create(sink, backpressureSize, termination, maxBackpressureQueueSize); } private static Future toVertxFuture(Mono toTransform) { @@ -347,46 +346,64 @@ public class MonoUtils { public static class SinkRWStream implements io.vertx.core.streams.WriteStream, io.vertx.core.streams.ReadStream { private final Many sink; - private final @Nullable Disposable drainSubscription; + private final Flux backpressureSize; + private final Empty termination; private Handler exceptionHandler = e -> {}; private Handler drainHandler = h -> {}; private final int maxBackpressureQueueSize; private volatile int writeQueueMaxSize; private volatile boolean writeQueueFull = false; - public SinkRWStream(Many sink, + private SinkRWStream(Many sink, @Nullable Flux backpressureSize, @Nullable Empty termination, int maxBackpressureQueueSize) { this.maxBackpressureQueueSize = maxBackpressureQueueSize; this.writeQueueMaxSize = this.maxBackpressureQueueSize; + this.backpressureSize = backpressureSize; + this.termination = termination; this.sink = sink; + } - if (backpressureSize != null) { - AtomicBoolean drained = new AtomicBoolean(true); - this.drainSubscription = backpressureSize - .subscribeOn(Schedulers.single()) - .subscribe(size -> { - writeQueueFull = size >= this.writeQueueMaxSize; + public Mono> initialize() { + return Mono.fromCallable(() -> { + if (backpressureSize != null) { + AtomicBoolean drained = new AtomicBoolean(true); + var drainSubscription = backpressureSize + .publishOn(Schedulers.boundedElastic()) + .subscribe(size -> { + writeQueueFull = size >= this.writeQueueMaxSize; - boolean newDrained = size <= this.writeQueueMaxSize / 2; - boolean oldDrained = drained.getAndSet(newDrained); - if (newDrained && !oldDrained) { - drainHandler.handle(null); - } - }, ex -> { - exceptionHandler.handle(ex); - }, () -> { - if (!drained.get()) { - drainHandler.handle(null); - } - }); - if (termination != null) { - termination.asMono().subscribeOn(Schedulers.single()).doOnTerminate(drainSubscription::dispose).subscribe(); + boolean newDrained = size <= this.writeQueueMaxSize / 2; + boolean oldDrained = drained.getAndSet(newDrained); + if (newDrained && !oldDrained) { + drainHandler.handle(null); + } + }, ex -> { + exceptionHandler.handle(ex); + }, () -> { + if (!drained.get()) { + drainHandler.handle(null); + } + }); + if (termination != null) { + termination + .asMono() + .doOnTerminate(drainSubscription::dispose) + .publishOn(Schedulers.boundedElastic()) + .subscribe(); + } } - } else { - this.drainSubscription = null; - } + + return this; + }).publishOn(Schedulers.boundedElastic()); + } + + public static Mono> create(Many sink, + @Nullable Flux backpressureSize, + @Nullable Empty termination, + int maxBackpressureQueueSize) { + return new SinkRWStream(sink, backpressureSize, termination, maxBackpressureQueueSize).initialize(); } public Flux readAsFlux() { @@ -423,7 +440,7 @@ public class MonoUtils { @Override public io.vertx.core.streams.ReadStream handler(@io.vertx.codegen.annotations.Nullable Handler handler) { - sink.asFlux().subscribeWith(new CoreSubscriber() { + sink.asFlux().publishOn(Schedulers.boundedElastic()).subscribeWith(new CoreSubscriber() { @Override public void onSubscribe(@NotNull Subscription s) { @@ -500,24 +517,6 @@ public class MonoUtils { @Override public void end(Handler> handler) { - /* - MonoUtils.emitCompleteFuture(sink).recover(error -> { - if (error instanceof EmissionException) { - var sinkError = (EmissionException) error; - switch (sinkError.getReason()) { - case FAIL_CANCELLED: - case FAIL_ZERO_SUBSCRIBER: - case FAIL_TERMINATED: - return Future.succeededFuture(); - } - } - return Future.failedFuture(error); - }).onComplete(h -> { - if (drainSubscription != null) { - drainSubscription.dispose(); - } - }).onComplete(handler); - */ MonoUtils.emitCompleteFuture(sink).onComplete(handler); } @@ -578,7 +577,7 @@ public class MonoUtils { @Override public io.vertx.core.streams.ReadStream handler(@io.vertx.codegen.annotations.Nullable Handler handler) { - flux.subscribeWith(new CoreSubscriber() { + flux.publishOn(Schedulers.boundedElastic()).subscribeWith(new CoreSubscriber() { @Override public void onSubscribe(@NotNull Subscription s) {