From 82ba0bcd39ad26de191a2dae9b8e762388f3951b Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 23 Jan 2021 22:33:52 +0100 Subject: [PATCH] Bugfixes --- .../remoteclient/TDLibRemoteClient.java | 21 ++++- .../td/middle/EndSessionMessage.java | 2 +- .../client/AsyncTdMiddleEventBusClient.java | 46 ++++++++-- .../server/AsyncTdMiddleEventBusServer.java | 90 +++++++++++++++---- .../it/tdlight/utils/BinlogAsyncFile.java | 28 ++++-- .../java/it/tdlight/utils/BinlogUtils.java | 57 +++++++++--- 6 files changed, 202 insertions(+), 42 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index e77a21b..4a70a31 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -95,7 +95,7 @@ public class TDLibRemoteClient implements AutoCloseable { .setPassword(securityInfo.getTrustStorePassword()); return MonoUtils - .fromBlockingSingle(() -> { + .fromBlockingMaybe(() -> { // Set verbosity level here, before creating the bots if (Files.notExists(Paths.get("logs"))) { try { @@ -123,6 +123,7 @@ public class TDLibRemoteClient implements AutoCloseable { clusterManager.tryEmitEmpty(); } }) + .single() .flatMap(clusterManager -> { MessageConsumer startBotConsumer = clusterManager.getEventBus().consumer("bots.start-bot"); startBotConsumer.handler(msg -> { @@ -137,10 +138,12 @@ public class TDLibRemoteClient implements AutoCloseable { var verticle = new AsyncTdMiddleEventBusServer(); // Binlog path - var blPath = Paths.get(".sessions-cache").resolve("id" + req.id()).resolve("td.binlog"); + var sessPath = getSessionDirectory(req.id()); + var blPath = getSessionBinlogDirectory(req.id()); BinlogUtils .chooseBinlog(clusterManager.getVertx().fileSystem(), blPath, req.binlog(), req.binlogDate()) + .then(BinlogUtils.cleanSessionPath(clusterManager.getVertx().fileSystem(), blPath, sessPath)) .then(clusterManager.getVertx().rxDeployVerticle(verticle, deploymentOptions).as(MonoUtils::toMono)) .subscribeOn(Schedulers.single()) .subscribe( @@ -152,11 +155,23 @@ public class TDLibRemoteClient implements AutoCloseable { () -> msg.reply(new byte[0]) ); }); - return Mono.empty(); + return startBotConsumer.rxCompletionHandler().as(MonoUtils::toMono); }) .then(); } + public static Path getSessionDirectory(int botId) { + return Paths.get(".sessions-cache").resolve("id" + botId); + } + + public static Path getMediaDirectory(int botId) { + return Paths.get(".cache").resolve("media").resolve("id" + botId); + } + + public static Path getSessionBinlogDirectory(int botId) { + return getSessionDirectory(botId).resolve("td.binlog"); + } + @Override public void close() { this.clusterManager diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/EndSessionMessage.java b/src/main/java/it/tdlight/tdlibsession/td/middle/EndSessionMessage.java index 946dae5..470b18d 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/EndSessionMessage.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/EndSessionMessage.java @@ -8,7 +8,7 @@ public final class EndSessionMessage { private final int id; private final byte[] binlog; - EndSessionMessage(int id, byte[] binlog) { + public EndSessionMessage(int id, byte[] binlog) { this.id = id; this.binlog = binlog; } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index af4a4bb..0ec858b 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -3,6 +3,7 @@ package it.tdlight.tdlibsession.td.middle.client; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.reactivex.core.Vertx; import io.vertx.reactivex.core.eventbus.Message; +import io.vertx.reactivex.core.eventbus.MessageConsumer; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Function; import it.tdlight.tdlibsession.td.ResponseError; @@ -10,6 +11,7 @@ import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.tdlibsession.td.TdResultMessage; import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; +import it.tdlight.tdlibsession.td.middle.EndSessionMessage; import it.tdlight.tdlibsession.td.middle.ExecuteObject; import it.tdlight.tdlibsession.td.middle.StartSessionMessage; import it.tdlight.tdlibsession.td.middle.TdClusterManager; @@ -19,6 +21,7 @@ import it.tdlight.utils.BinlogUtils; import it.tdlight.utils.MonoUtils; import it.tdlight.utils.MonoUtils.SinkRWStream; import java.nio.file.Path; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -65,11 +68,19 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { Path binlogsArchiveDirectory) { var instance = new AsyncTdMiddleEventBusClient(clusterManager); return retrieveBinlog(clusterManager.getVertx(), binlogsArchiveDirectory, botId) - .single() - .flatMap(binlog -> instance + .flatMap(binlog -> binlog + .getLastModifiedTime() + .filter(modTime -> modTime == 0) + .doOnNext(v -> LoggerFactory + .getLogger(AsyncTdMiddleEventBusClient.class) + .error("Can't retrieve binlog of bot " + botId + " " + botAlias + ". Creating a new one...")) + .thenReturn(binlog) + ) + .flatMap(binlog -> instance .start(botId, botAlias, local, binlog) .thenReturn(instance) - ); + ) + .single(); } /** @@ -108,7 +119,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { @SuppressWarnings("CallingSubscribeInNonBlockingScope") private Mono setupUpdatesListener() { - var updateConsumer = cluster.getEventBus().consumer(botAddress + ".update"); + MessageConsumer updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().consumer(botAddress + ".updates").getDelegate()); updateConsumer.endHandler(h -> { logger.error("<<<<<<<<<<<<<<<>>>>>>>>>>>>"); }); @@ -129,12 +140,33 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return updates .readAsFlux() .subscribeOn(Schedulers.single()) - .flatMap(updates -> Flux.fromIterable(updates.body().getValues())) + .cast(io.vertx.core.eventbus.Message.class) + .flatMap(updates -> Flux.fromIterable(((TdResultList) updates.body()).getValues())) .flatMap(update -> Mono.fromCallable(update::orElseThrow)) + .flatMap(this::interceptUpdate) .doOnError(crash::tryEmitError) .doOnTerminate(updatesStreamEnd::tryEmitEmpty); } + private Publisher interceptUpdate(TdApi.Object update) { + switch (update.getConstructor()) { + case TdApi.UpdateAuthorizationState.CONSTRUCTOR: + var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; + switch (updateAuthorizationState.authorizationState.getConstructor()) { + case TdApi.AuthorizationStateClosed.CONSTRUCTOR: + return cluster + .getEventBus() + .rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono) + .doOnNext(l -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(l.body().binlog().length))) + .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.body().binlog())) + .doOnSuccess(s -> logger.info("Overwritten binlog from server")) + .thenReturn(update); + } + break; + } + return Mono.just(update); + } + @Override public Mono> execute(Function request, boolean executeDirectly) { var req = new ExecuteObject(executeDirectly, request); @@ -152,8 +184,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { } })) ) - .switchIfEmpty(Mono.fromCallable(() -> { + .switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> { throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty")); - })); + }))); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index c88bab3..bee0610 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -9,14 +9,20 @@ import io.vertx.reactivex.core.eventbus.MessageProducer; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.Error; +import it.tdlight.jni.TdApi.Function; +import it.tdlight.jni.TdApi.SetTdlibParameters; import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.UpdateAuthorizationState; +import it.tdlight.tdlibsession.remoteclient.TDLibRemoteClient; import it.tdlight.tdlibsession.td.TdResultMessage; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions; +import it.tdlight.tdlibsession.td.middle.EndSessionMessage; import it.tdlight.tdlibsession.td.middle.ExecuteObject; import it.tdlight.tdlibsession.td.middle.TdResultList; import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; +import it.tdlight.utils.BinlogAsyncFile; +import it.tdlight.utils.BinlogUtils; import it.tdlight.utils.MonoUtils; import java.time.Duration; import java.util.Collections; @@ -48,6 +54,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { // Variables configured at startup private final One td = Sinks.one(); private final One> executeConsumer = Sinks.one(); + private final One> readBinlogConsumer = Sinks.one(); public AsyncTdMiddleEventBusServer() { this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 100); @@ -87,15 +94,15 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { if (this.td.tryEmitValue(td).isFailure()) { throw new IllegalStateException("Failed to set td instance"); } - return onSuccessfulStartRequest(td, botAddress, botAlias, local); + return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local); }) .flatMap(Mono::hide)); } - private Mono onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, boolean local) { + private Mono onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { return this - .listen(td, botAddress, botAlias, local) - .then(this.pipe(td, botAddress, botAlias, local)) + .listen(td, botAddress, botAlias, botId, local) + .then(this.pipe(td, botAddress, botAlias, botId, local)) .doOnSuccess(s -> { logger.info("Deploy and start of bot \"" + botAlias + "\": ✅ Succeeded"); }) @@ -104,7 +111,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { }); } - private Mono listen(AsyncTdDirectImpl td, String botAddress, String botAlias, boolean local) { + private Mono listen(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { return Mono.create(registrationSink -> { MessageConsumer executeConsumer = vertx.eventBus().consumer(botAddress + ".execute"); if (this.executeConsumer.tryEmitValue(executeConsumer).isFailure()) { @@ -112,13 +119,17 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { return; } - Flux.>create(sink -> { - executeConsumer.handler(sink::next); - executeConsumer.completionHandler(MonoUtils.toHandler(registrationSink)); - }) - .flatMap(msg -> td - .execute(msg.body().getRequest(), msg.body().isExecuteDirectly()) - .map(result -> Tuples.of(msg, result))) + Flux + .>create(sink -> { + executeConsumer.handler(sink::next); + executeConsumer.endHandler(h -> sink.complete()); + }) + .flatMap(msg -> { + var request = overrideRequest(msg.body().getRequest(), botId); + return td + .execute(request, msg.body().isExecuteDirectly()) + .map(result -> Tuples.of(msg, result)); + }) .handle((tuple, sink) -> { var msg = tuple.getT1(); var response = tuple.getT2(); @@ -133,13 +144,60 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } }) .then() - .doOnError(ex -> { - logger.error("Error when processing a request", ex); + .subscribeOn(Schedulers.single()) + .subscribe(v -> {}, ex -> logger.error("Error when processing an execute request", ex)); + + MessageConsumer readBinlogConsumer = vertx.eventBus().consumer(botAddress + ".read-binlog"); + if (this.readBinlogConsumer.tryEmitValue(readBinlogConsumer).isFailure()) { + registrationSink.error(new IllegalStateException("Failed to set readBinlogConsumer")); + return; + } + + Flux + .>create(sink -> { + readBinlogConsumer.handler(sink::next); + readBinlogConsumer.endHandler(h -> sink.complete()); }) - .subscribe(); + .flatMap(req -> BinlogUtils + .retrieveBinlog(vertx.fileSystem(), TDLibRemoteClient.getSessionBinlogDirectory(botId)) + .flatMap(BinlogAsyncFile::readFullyBytes) + .single() + .map(binlog -> Tuples.of(req, binlog)) + ) + .doOnNext(tuple -> { + var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); + tuple.getT1().reply(new EndSessionMessage(botId, tuple.getT2()), opts); + }) + .then() + .subscribeOn(Schedulers.single()) + .subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex)); + + + //noinspection ResultOfMethodCallIgnored + executeConsumer + .rxCompletionHandler() + .andThen(readBinlogConsumer.rxCompletionHandler()) + .subscribeOn(io.reactivex.schedulers.Schedulers.single()) + .subscribe(registrationSink::success, registrationSink::error); }); } + /** + * Override some requests + */ + private Function overrideRequest(Function request, int botId) { + switch (request.getConstructor()) { + case SetTdlibParameters.CONSTRUCTOR: + // Fix session directory locations + var setTdlibParamsObj = (SetTdlibParameters) request; + setTdlibParamsObj.parameters.databaseDirectory = TDLibRemoteClient.getSessionDirectory(botId).toString(); + setTdlibParamsObj.parameters.filesDirectory = TDLibRemoteClient.getMediaDirectory(botId).toString(); + return request; + default: + return request; + } + } + @Override public Completable rxStop() { return MonoUtils.toCompletable(botAlias @@ -155,7 +213,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped")))); } - private Mono pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, boolean local) { + private Mono pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { Flux updatesFlux = td .receive(tdOptions) .flatMap(item -> Mono.defer(() -> { diff --git a/src/main/java/it/tdlight/utils/BinlogAsyncFile.java b/src/main/java/it/tdlight/utils/BinlogAsyncFile.java index 1a5311b..65158fb 100644 --- a/src/main/java/it/tdlight/utils/BinlogAsyncFile.java +++ b/src/main/java/it/tdlight/utils/BinlogAsyncFile.java @@ -4,10 +4,14 @@ import io.vertx.reactivex.core.buffer.Buffer; import io.vertx.reactivex.core.file.AsyncFile; import io.vertx.reactivex.core.file.FileProps; import io.vertx.reactivex.core.file.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; public class BinlogAsyncFile { + private static final Logger logger = LoggerFactory.getLogger(BinlogAsyncFile.class); + private final FileSystem filesystem; private final String path; private final AsyncFile file; @@ -25,6 +29,7 @@ public class BinlogAsyncFile { .as(MonoUtils::toMono) .flatMap(size -> { var buf = Buffer.buffer(size); + logger.debug("Reading binlog from disk. Size: " + BinlogUtils.humanReadableByteCountBin(size)); return file.rxRead(buf, 0, 0, size).as(MonoUtils::toMono).thenReturn(buf); }); } @@ -38,10 +43,16 @@ public class BinlogAsyncFile { } public Mono overwrite(Buffer newData) { - return file.rxWrite(newData, 0) - .andThen(file.rxFlush()) - .andThen(filesystem.rxTruncate(path, newData.length())) - .as(MonoUtils::toMono); + return getSize() + .doOnNext(size -> logger.debug("Preparing to overwrite binlog. Initial size: " + BinlogUtils.humanReadableByteCountBin(size))) + .then(file.rxWrite(newData, 0) + .andThen(file.rxFlush()) + .andThen(filesystem.rxTruncate(path, newData.length())) + .as(MonoUtils::toMono) + ) + .then(getSize()) + .doOnNext(size -> logger.debug("Overwritten binlog. Final size: " + BinlogUtils.humanReadableByteCountBin(size))) + .then(); } public Mono overwrite(byte[] newData) { @@ -59,7 +70,14 @@ public class BinlogAsyncFile { public Mono getLastModifiedTime() { return filesystem .rxProps(path) - .map(FileProps::lastModifiedTime) + .map(fileProps -> fileProps.size() == 0 ? 0 : fileProps.lastModifiedTime()) + .as(MonoUtils::toMono); + } + + public Mono getSize() { + return filesystem + .rxProps(path) + .map(FileProps::size) .as(MonoUtils::toMono); } } diff --git a/src/main/java/it/tdlight/utils/BinlogUtils.java b/src/main/java/it/tdlight/utils/BinlogUtils.java index 1d4bc24..6af9e4d 100644 --- a/src/main/java/it/tdlight/utils/BinlogUtils.java +++ b/src/main/java/it/tdlight/utils/BinlogUtils.java @@ -4,8 +4,13 @@ import io.vertx.core.file.OpenOptions; import io.vertx.reactivex.core.buffer.Buffer; import io.vertx.reactivex.core.file.FileSystem; import java.nio.file.Path; +import java.text.CharacterIterator; +import java.text.StringCharacterIterator; +import java.time.Instant; +import java.time.ZoneOffset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -13,17 +18,20 @@ public class BinlogUtils { private static final Logger logger = LoggerFactory.getLogger(BinlogUtils.class); - /** - * - * @return optional result - */ public static Mono retrieveBinlog(FileSystem vertxFilesystem, Path binlogPath) { var path = binlogPath.toString(); var openOptions = new OpenOptions().setWrite(true).setRead(true).setCreate(false).setDsync(true); - return vertxFilesystem.rxExists(path).filter(exists -> exists) - .flatMapSingle(x -> vertxFilesystem.rxOpen(path, openOptions)) + return vertxFilesystem + // Create file if not exist to avoid errors + .rxExists(path).filter(exists -> exists).as(MonoUtils::toMono) + .switchIfEmpty(Mono.defer(() -> vertxFilesystem.rxMkdirs(binlogPath.getParent().toString()).as(MonoUtils::toMono)) + .then(vertxFilesystem.rxCreateFile(path).as(MonoUtils::toMono)) + .thenReturn(true) + ) + // Open file + .flatMap(x -> vertxFilesystem.rxOpen(path, openOptions).as(MonoUtils::toMono)) .map(file -> new BinlogAsyncFile(vertxFilesystem, path, file)) - .as(MonoUtils::toMono); + .single(); } public static Mono saveBinlog(BinlogAsyncFile binlog, byte[] data) { @@ -40,15 +48,44 @@ public class BinlogUtils { .just(binlog) .zipWith(binlog.getLastModifiedTime()) ) + .doOnSuccess(s -> logger.info("Local binlog: " + binlogPath + ". Local date: " + Instant.ofEpochMilli(s == null ? 0 : s.getT2()).atZone(ZoneOffset.UTC).toString() + " Remote date: " + Instant.ofEpochMilli(remoteBinlogDate).atZone(ZoneOffset.UTC).toString())) // Files older than the remote file will be overwritten - .filter(tuple -> tuple.getT2() < remoteBinlogDate) + .filter(tuple -> tuple.getT2() >= remoteBinlogDate) + .doOnNext(v -> logger.info("Using local binlog: " + binlogPath)) .map(Tuple2::getT1) - .switchIfEmpty(Mono - .fromRunnable(() -> logger.info("Overwriting local binlog: " + binlogPath)) + .switchIfEmpty(Mono.defer(() -> Mono.fromRunnable(() -> logger.info("Using remote binlog. Overwriting " + binlogPath))) .then(vertxFilesystem.rxWriteFile(path, Buffer.buffer(remoteBinlog)).as(MonoUtils::toMono)) .then(retrieveBinlog(vertxFilesystem, binlogPath)) ) .single() .then(); } + + public static Mono cleanSessionPath(FileSystem vertxFilesystem, Path binlogPath, Path sessionPath) { + return vertxFilesystem + .rxReadFile(binlogPath.toString()).as(MonoUtils::toMono) + .flatMap(buffer -> vertxFilesystem + .rxReadDir(sessionPath.toString(), "^(?!td.binlog$).*").as(MonoUtils::toMono) + .flatMapMany(Flux::fromIterable) + .doOnNext(file -> logger.debug("Deleting file {}", file)) + .flatMap(file -> vertxFilesystem.rxDeleteRecursive(file, true).as(MonoUtils::toMono)) + .onErrorResume(ex -> Mono.empty()) + .then() + ); + } + + public static String humanReadableByteCountBin(long bytes) { + long absB = bytes == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(bytes); + if (absB < 1024) { + return bytes + " B"; + } + long value = absB; + CharacterIterator ci = new StringCharacterIterator("KMGTPE"); + for (int i = 40; i >= 0 && absB > 0xfffccccccccccccL >> i; i -= 10) { + value >>= 10; + ci.next(); + } + value *= Long.signum(bytes); + return String.format("%.1f %ciB", value / 1024.0, ci.current()); + } }