From df116331d7e14b15216386394e90968fca790b15 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 24 Jan 2021 03:15:45 +0100 Subject: [PATCH] Bugfixes --- .../remoteclient/TDLibRemoteClient.java | 12 ++- .../td/middle/TdClusterManager.java | 28 +++--- .../client/AsyncTdMiddleEventBusClient.java | 13 +-- .../server/AsyncTdMiddleEventBusServer.java | 89 +++++++++++++++---- .../java/it/tdlight/utils/BinlogUtils.java | 30 +++++++ src/main/java/it/tdlight/utils/MonoUtils.java | 13 ++- 6 files changed, 145 insertions(+), 40 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index 4a70a31..68a6a0f 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -80,17 +80,23 @@ public class TDLibRemoteClient implements AutoCloseable { var securityInfo = new SecurityInfo(keyStorePath, keyStorePasswordPath, trustStorePath, trustStorePasswordPath); - new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses) + var client = new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses); + + client .start() .block(); + + // Close vert.x on shutdown + var vertx = client.clusterManager.asMono().block().getVertx(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> MonoUtils.toMono(vertx.rxClose()).blockOptional())); } public Mono start() { - var keyStoreOptions = new JksOptions() + var keyStoreOptions = securityInfo == null ? null : new JksOptions() .setPath(securityInfo.getKeyStorePath().toAbsolutePath().toString()) .setPassword(securityInfo.getKeyStorePassword()); - var trustStoreOptions = new JksOptions() + var trustStoreOptions = securityInfo == null ? null : new JksOptions() .setPath(securityInfo.getTrustStorePath().toAbsolutePath().toString()) .setPassword(securityInfo.getTrustStorePassword()); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java index 0aec231..3328ac9 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java @@ -84,20 +84,22 @@ public class TdClusterManager { } public static Mono ofNodes(JksOptions keyStoreOptions, JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set nodesAddresses) { - if (definedNodesCluster.compareAndSet(false, true)) { - var vertxOptions = new VertxOptions(); - netInterface = onlyLocal ? "127.0.0.1" : netInterface; - Config cfg; - if (!onlyLocal) { - cfg = new Config(); - cfg.setInstanceName("Node-" + new Random().nextLong()); + return Mono.defer(() -> { + if (definedNodesCluster.compareAndSet(false, true)) { + var vertxOptions = new VertxOptions(); + var netInterfaceF = onlyLocal ? "127.0.0.1" : netInterface; + Config cfg; + if (!onlyLocal) { + cfg = new Config(); + cfg.setInstanceName("Node-" + new Random().nextLong()); + } else { + cfg = null; + } + return of(cfg, vertxOptions, keyStoreOptions, trustStoreOptions, masterHostname, netInterfaceF, port, nodesAddresses); } else { - cfg = null; + return Mono.error(new AlreadyBoundException()); } - return of(cfg, vertxOptions, keyStoreOptions, trustStoreOptions, masterHostname, netInterface, port, nodesAddresses); - } else { - return Mono.error(new AlreadyBoundException()); - } + }); } public static Mono of(@Nullable Config cfg, @@ -157,6 +159,8 @@ public class TdClusterManager { vertxOptions.setClusterManager(null); } + vertxOptions.setPreferNativeTransport(true); + return Mono .create(sink -> { if (mgr != null) { diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 0ec858b..9f72cb9 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -21,6 +21,7 @@ import it.tdlight.utils.BinlogUtils; import it.tdlight.utils.MonoUtils; import it.tdlight.utils.MonoUtils.SinkRWStream; import java.nio.file.Path; +import java.time.Duration; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +44,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private final One binlog = Sinks.one(); - SinkRWStream> updates = MonoUtils.unicastBackpressureStream(1000); + SinkRWStream> updates = MonoUtils.unicastBackpressureStream(10000); // This will only result in a successful completion, never completes in other ways private final Empty updatesStreamEnd = Sinks.one(); // This will only result in a crash, never completes in other ways @@ -137,10 +138,12 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { @Override public Flux receive() { // Here the updates will be received - return updates - .readAsFlux() - .subscribeOn(Schedulers.single()) + return cluster.getEventBus().rxRequest(botAddress + ".ready-to-receive", EMPTY).as(MonoUtils::toMono) + .thenMany(updates.readAsFlux()) .cast(io.vertx.core.eventbus.Message.class) + .timeout(Duration.ofSeconds(20), Mono.fromCallable(() -> { + throw new IllegalStateException("Server did not respond to 4 pings after 20 seconds (5 seconds per ping)"); + })) .flatMap(updates -> Flux.fromIterable(((TdResultList) updates.body()).getValues())) .flatMap(update -> Mono.fromCallable(update::orElseThrow)) .flatMap(this::interceptUpdate) @@ -174,7 +177,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .firstWithSignal( MonoUtils.castVoid(crash.asMono()), cluster.getEventBus() - .rxRequest(botAddress + ".execute", req, deliveryOptionsWithTimeout).as(MonoUtils::toMono) + .rxRequest(botAddress + ".execute", req, deliveryOptions).as(MonoUtils::toMono) .onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex)) .>flatMap(resp -> Mono.fromCallable(() -> { if (resp.body() == null) { diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index bee0610..688f58b 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -17,11 +17,9 @@ import it.tdlight.tdlibsession.remoteclient.TDLibRemoteClient; import it.tdlight.tdlibsession.td.TdResultMessage; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions; -import it.tdlight.tdlibsession.td.middle.EndSessionMessage; import it.tdlight.tdlibsession.td.middle.ExecuteObject; import it.tdlight.tdlibsession.td.middle.TdResultList; import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; -import it.tdlight.utils.BinlogAsyncFile; import it.tdlight.utils.BinlogUtils; import it.tdlight.utils.MonoUtils; import java.time.Duration; @@ -55,6 +53,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { private final One td = Sinks.one(); private final One> executeConsumer = Sinks.one(); private final One> readBinlogConsumer = Sinks.one(); + private final One> readyToReceiveConsumer = Sinks.one(); + private final One> pingConsumer = Sinks.one(); + private final One> pipeFlux = Sinks.one(); public AsyncTdMiddleEventBusServer() { this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 100); @@ -118,7 +119,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { registrationSink.error(new IllegalStateException("Failed to set executeConsumer")); return; } - Flux .>create(sink -> { executeConsumer.handler(sink::next); @@ -152,31 +152,64 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { registrationSink.error(new IllegalStateException("Failed to set readBinlogConsumer")); return; } + BinlogUtils + .readBinlogConsumer(vertx, readBinlogConsumer, botId, local) + .subscribeOn(Schedulers.single()) + .subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex)); + MessageConsumer readyToReceiveConsumer = vertx.eventBus().consumer(botAddress + ".ready-to-receive"); + if (this.readyToReceiveConsumer.tryEmitValue(readyToReceiveConsumer).isFailure()) { + registrationSink.error(new IllegalStateException("Failed to set readyToReceiveConsumer")); + return; + } Flux .>create(sink -> { - readBinlogConsumer.handler(sink::next); - readBinlogConsumer.endHandler(h -> sink.complete()); + readyToReceiveConsumer.handler(sink::next); + readyToReceiveConsumer.endHandler(h -> sink.complete()); }) - .flatMap(req -> BinlogUtils - .retrieveBinlog(vertx.fileSystem(), TDLibRemoteClient.getSessionBinlogDirectory(botId)) - .flatMap(BinlogAsyncFile::readFullyBytes) - .single() - .map(binlog -> Tuples.of(req, binlog)) - ) + .flatMap(msg -> this.pipeFlux + .asMono() + .timeout(Duration.ofSeconds(5)) + .map(pipeFlux -> Tuples.of(msg, pipeFlux))) .doOnNext(tuple -> { var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); - tuple.getT1().reply(new EndSessionMessage(botId, tuple.getT2()), opts); + tuple.getT1().reply(EMPTY, opts); + + // Start piping the data + //noinspection CallingSubscribeInNonBlockingScope + tuple.getT2() + .subscribeOn(Schedulers.single()) + .subscribe(); }) .then() .subscribeOn(Schedulers.single()) - .subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex)); + .subscribe(v -> {}, ex -> logger.error("Error when processing a ready-to-receive request", ex)); + + MessageConsumer pingConsumer = vertx.eventBus().consumer(botAddress + ".ping"); + if (this.pingConsumer.tryEmitValue(pingConsumer).isFailure()) { + registrationSink.error(new IllegalStateException("Failed to set pingConsumer")); + return; + } + Flux + .>create(sink -> { + pingConsumer.handler(sink::next); + pingConsumer.endHandler(h -> sink.complete()); + }) + .doOnNext(msg -> { + var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); + msg.reply(EMPTY, opts); + }) + .then() + .subscribeOn(Schedulers.single()) + .subscribe(v -> {}, ex -> logger.error("Error when processing a ping request", ex)); //noinspection ResultOfMethodCallIgnored executeConsumer .rxCompletionHandler() .andThen(readBinlogConsumer.rxCompletionHandler()) + .andThen(readyToReceiveConsumer.rxCompletionHandler()) + .andThen(pingConsumer.rxCompletionHandler()) .subscribeOn(io.reactivex.schedulers.Schedulers.single()) .subscribe(registrationSink::success, registrationSink::error); }); @@ -209,6 +242,26 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .asMono() .timeout(Duration.ofSeconds(5), Mono.empty()) .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono))) + .then(readBinlogConsumer + .asMono() + .timeout(Duration.ofSeconds(10), Mono.empty()) + .doOnNext(ec -> Mono + // ReadBinLog will live for another 30 minutes. + // Since every consumer of ReadBinLog is identical, this should not pose a problem. + .delay(Duration.ofMinutes(30)) + .then(ec.rxUnregister().as(MonoUtils::toMono)) + .subscribeOn(Schedulers.single()) + .subscribe() + ) + ) + .then(readyToReceiveConsumer + .asMono() + .timeout(Duration.ofSeconds(5), Mono.empty()) + .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono))) + .then(pingConsumer + .asMono() + .timeout(Duration.ofSeconds(5), Mono.empty()) + .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono))) .doOnError(ex -> logger.error("Undeploy of bot \"" + botAlias + "\": stop failed", ex)) .doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped")))); } @@ -254,9 +307,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .eventBus() .sender(botAddress + ".updates", opts); - //noinspection CallingSubscribeInNonBlockingScope - updatesFlux - .flatMap(update -> updatesSender.rxWrite(update).as(MonoUtils::toMono)) + var pipeFlux = updatesFlux + .flatMap(update -> updatesSender.rxWrite(update).as(MonoUtils::toMono).then()) .doOnTerminate(() -> updatesSender.close(h -> { if (h.failed()) { logger.error("Failed to close \"updates\" message sender"); @@ -267,9 +319,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { return td.execute(new TdApi.Close(), false) .doOnError(ex2 -> logger.error("Unexpected error", ex2)) .then(); - }) - .subscribeOn(Schedulers.single()) - .subscribe(); + }); + MonoUtils.emitValue(this.pipeFlux, pipeFlux); return Mono.empty(); } } diff --git a/src/main/java/it/tdlight/utils/BinlogUtils.java b/src/main/java/it/tdlight/utils/BinlogUtils.java index 6af9e4d..1dc33ba 100644 --- a/src/main/java/it/tdlight/utils/BinlogUtils.java +++ b/src/main/java/it/tdlight/utils/BinlogUtils.java @@ -1,11 +1,18 @@ package it.tdlight.utils; +import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.file.OpenOptions; +import io.vertx.reactivex.core.Vertx; import io.vertx.reactivex.core.buffer.Buffer; +import io.vertx.reactivex.core.eventbus.Message; +import io.vertx.reactivex.core.eventbus.MessageConsumer; import io.vertx.reactivex.core.file.FileSystem; +import it.tdlight.tdlibsession.remoteclient.TDLibRemoteClient; +import it.tdlight.tdlibsession.td.middle.EndSessionMessage; import java.nio.file.Path; import java.text.CharacterIterator; import java.text.StringCharacterIterator; +import java.time.Duration; import java.time.Instant; import java.time.ZoneOffset; import org.slf4j.Logger; @@ -13,6 +20,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; public class BinlogUtils { @@ -88,4 +96,26 @@ public class BinlogUtils { value *= Long.signum(bytes); return String.format("%.1f %ciB", value / 1024.0, ci.current()); } + + public static Mono readBinlogConsumer(Vertx vertx, + MessageConsumer readBinlogConsumer, + int botId, + boolean local) { + return Flux + .>create(sink -> { + readBinlogConsumer.handler(sink::next); + readBinlogConsumer.endHandler(h -> sink.complete()); + }) + .flatMap(req -> BinlogUtils + .retrieveBinlog(vertx.fileSystem(), TDLibRemoteClient.getSessionBinlogDirectory(botId)) + .flatMap(BinlogAsyncFile::readFullyBytes) + .single() + .map(binlog -> Tuples.of(req, binlog)) + ) + .doOnNext(tuple -> { + var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); + tuple.getT1().reply(new EndSessionMessage(botId, tuple.getT2()), opts); + }) + .then(); + } } diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index c11cf81..6307b9b 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -498,7 +498,18 @@ public class MonoUtils { @Override public void end(Handler> handler) { - MonoUtils.emitCompleteFuture(sink).onComplete(h -> { + MonoUtils.emitCompleteFuture(sink).recover(error -> { + if (error instanceof EmissionException) { + var sinkError = (EmissionException) error; + switch (sinkError.getReason()) { + case FAIL_CANCELLED: + case FAIL_ZERO_SUBSCRIBER: + case FAIL_TERMINATED: + return Future.succeededFuture(); + } + } + return Future.failedFuture(error); + }).onComplete(h -> { if (drainSubscription != null) { drainSubscription.dispose(); }