diff --git a/README.md b/README.md index e69de29..8ca5ac6 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,13 @@ +TDLib session container +======================= + +This software is a wrapper for [TDLight Java](https://github.com/tdlight-team/tdlight-java) + +Unlike TDLight java, this wrapper abstracts TDLib to make it possible to run remotely, +decoupling it into a client and a server. + +TDLib session container can be used in various ways: + - Clustered + - Remote + - Local + - Direct (The client process, as TDLight java) diff --git a/pom.xml b/pom.xml index 91eb8a0..5458462 100644 --- a/pom.xml +++ b/pom.xml @@ -139,7 +139,7 @@ it.tdlight tdlight-java - [3.171.36,) + [4.171.54,) it.cavallium diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index 68a6a0f..95f7d53 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -25,6 +25,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.One; import reactor.core.scheduler.Schedulers; +import reactor.tools.agent.ReactorDebugAgent; public class TDLibRemoteClient implements AutoCloseable { @@ -41,13 +42,27 @@ public class TDLibRemoteClient implements AutoCloseable { */ private final AtomicInteger statsActiveDeployments = new AtomicInteger(); - public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set membersAddresses) { + public static boolean runningFromIntelliJ() { + return System.getProperty("java.class.path").contains("idea_rt.jar") + || System.getProperty("idea.test.cyclic.buffer.size") != null; + } + + public TDLibRemoteClient(SecurityInfo securityInfo, + String masterHostname, + String netInterface, + int port, + Set membersAddresses, + boolean enableStacktraces) { this.securityInfo = securityInfo; this.masterHostname = masterHostname; this.netInterface = netInterface; this.port = port; this.membersAddresses = membersAddresses; + if (enableStacktraces && !runningFromIntelliJ()) { + ReactorDebugAgent.init(); + } + try { Init.start(); } catch (CantLoadLibrary ex) { @@ -74,13 +89,14 @@ public class TDLibRemoteClient implements AutoCloseable { Path keyStorePasswordPath = Paths.get(args[4]); Path trustStorePath = Paths.get(args[5]); Path trustStorePasswordPath = Paths.get(args[6]); + boolean enableStacktraces = Boolean.parseBoolean(args[7]); var loggerContext = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false); loggerContext.setConfigLocation(TDLibRemoteClient.class.getResource("/tdlib-session-container-log4j2.xml").toURI()); var securityInfo = new SecurityInfo(keyStorePath, keyStorePasswordPath, trustStorePath, trustStorePasswordPath); - var client = new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses); + var client = new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses, enableStacktraces); client .start() @@ -140,16 +156,18 @@ public class TDLibRemoteClient implements AutoCloseable { .setConfig(new JsonObject() .put("botId", req.id()) .put("botAlias", req.alias()) - .put("local", false)); + .put("local", false) + .put("implementationDetails", req.implementationDetails())); var verticle = new AsyncTdMiddleEventBusServer(); // Binlog path var sessPath = getSessionDirectory(req.id()); + var mediaPath = getMediaDirectory(req.id()); var blPath = getSessionBinlogDirectory(req.id()); BinlogUtils .chooseBinlog(clusterManager.getVertx().fileSystem(), blPath, req.binlog(), req.binlogDate()) - .then(BinlogUtils.cleanSessionPath(clusterManager.getVertx().fileSystem(), blPath, sessPath)) + .then(BinlogUtils.cleanSessionPath(clusterManager.getVertx().fileSystem(), blPath, sessPath, mediaPath)) .then(clusterManager.getVertx().rxDeployVerticle(verticle, deploymentOptions).as(MonoUtils::toMono)) .subscribeOn(Schedulers.single()) .subscribe( diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 4c69fcb..ba572a4 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -1,15 +1,15 @@ package it.tdlight.tdlibsession.td.direct; +import io.vertx.core.json.JsonObject; import it.tdlight.common.TelegramClient; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.Close; import it.tdlight.jni.TdApi.Function; -import it.tdlight.jni.TdApi.Object; import it.tdlight.jni.TdApi.Ok; import it.tdlight.jni.TdApi.UpdateAuthorizationState; +import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResult; -import it.tdlight.tdlight.ClientManager; import it.tdlight.utils.MonoUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,11 +23,17 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { private static final Logger logger = LoggerFactory.getLogger(AsyncTdDirect.class); - private final One td = Sinks.one(); - + private final TelegramClientFactory telegramClientFactory; + private final JsonObject implementationDetails; private final String botAlias; - public AsyncTdDirectImpl(String botAlias) { + private final One td = Sinks.one(); + + public AsyncTdDirectImpl(TelegramClientFactory telegramClientFactory, + JsonObject implementationDetails, + String botAlias) { + this.telegramClientFactory = telegramClientFactory; + this.implementationDetails = implementationDetails; this.botAlias = botAlias; } @@ -65,41 +71,40 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { public Flux receive(AsyncTdDirectOptions options) { // If closed it will be either true or false final One closedFromTd = Sinks.one(); - return Flux.create(emitter -> { - var client = ClientManager.create((Object object) -> { - emitter.next(object); - // Close the emitter if receive closed state - if (object.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR - && ((UpdateAuthorizationState) object).authorizationState.getConstructor() - == AuthorizationStateClosed.CONSTRUCTOR) { - logger.debug("Received closed status from tdlib"); - closedFromTd.tryEmitValue(true); - emitter.complete(); - } - }, emitter::error, emitter::error); - try { - this.td.tryEmitValue(client).orThrow(); - } catch (Exception ex) { - emitter.error(ex); - } + return telegramClientFactory.create(implementationDetails) + .flatMapMany(client -> Flux + .create(updatesSink -> { + client.initialize((TdApi.Object object) -> { + updatesSink.next(object); + // Close the emitter if receive closed state + if (object.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR + && ((UpdateAuthorizationState) object).authorizationState.getConstructor() + == AuthorizationStateClosed.CONSTRUCTOR) { + logger.debug("Received closed status from tdlib"); + closedFromTd.tryEmitValue(true); + updatesSink.complete(); + } + }, updatesSink::error, updatesSink::error); - // Send close if the stream is disposed before tdlib is closed - emitter.onDispose(() -> { - // Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false. - closedFromTd.tryEmitValue(false); + if (td.tryEmitValue(client).isFailure()) { + updatesSink.error(new TdError(500, "Failed to emit td client")); + } - closedFromTd.asMono() - .filter(isClosedFromTd -> !isClosedFromTd) - .doOnNext(x -> { - logger.warn("The stream has been disposed without closing tdlib. Sending TdApi.Close()..."); - client.send(new Close(), - result -> logger.warn("Close result: {}", result), - ex -> logger.error("Error when disposing td client", ex) - ); + // Send close if the stream is disposed before tdlib is closed + updatesSink.onDispose(() -> { + // Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false. + closedFromTd.tryEmitValue(false); + + closedFromTd.asMono().filter(isClosedFromTd -> !isClosedFromTd).doOnNext(x -> { + logger.warn("The stream has been disposed without closing tdlib. Sending TdApi.Close()..."); + client.send(new Close(), + result -> logger.warn("Close result: {}", result), + ex -> logger.error("Error when disposing td client", ex) + ); + }).subscribeOn(Schedulers.single()).subscribe(); + }); }) - .subscribeOn(Schedulers.single()) - .subscribe(); - }); - }); + .subscribeOn(Schedulers.boundedElastic()) + ); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/TelegramClientFactory.java b/src/main/java/it/tdlight/tdlibsession/td/direct/TelegramClientFactory.java new file mode 100644 index 0000000..aff3f80 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/TelegramClientFactory.java @@ -0,0 +1,28 @@ +package it.tdlight.tdlibsession.td.direct; + +import io.vertx.core.json.JsonObject; +import it.tdlight.common.TelegramClient; +import it.tdlight.tdlight.ClientManager; +import it.tdlight.utils.MonoUtils; +import reactor.core.publisher.Mono; + +public class TelegramClientFactory { + + public TelegramClientFactory() { + + } + + public Mono create(JsonObject implementationDetails) { + return MonoUtils.fromBlockingSingle(() -> { + var implementationName = implementationDetails.getString("name", "native-client"); + switch (implementationName) { + case "native-client": + return ClientManager.create(); + case "test-client": + //todo: create a noop test client with optional behaviours + default: + return null; + } + }); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index 9fb0113..d8ebea8 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -57,7 +57,7 @@ import reactor.core.scheduler.Schedulers; public class AsyncTdEasy { - private static final Logger logger = LoggerFactory.getLogger(AsyncTdEasy.class); + private final Logger logger; private static final Scheduler scheduler = Schedulers.newSingle("AsyncTdEasy", false); private final ReplayProcessor authState = ReplayProcessor.create(1); @@ -72,6 +72,7 @@ public class AsyncTdEasy { public AsyncTdEasy(AsyncTdMiddle td, String logName) { this.td = td; this.logName = logName; + this.logger = LoggerFactory.getLogger("AsyncTdEasy " + logName); // todo: use Duration.ZERO instead of 10ms interval this.incomingUpdates = td.receive() @@ -93,8 +94,9 @@ public class AsyncTdEasy { } else { logger.error(ex.getLocalizedMessage(), ex); } - }).doOnComplete(() -> { - authState.asFlux().take(1).single().subscribe(authState -> { + }) + .doOnComplete(() -> { + authState.asFlux().take(1).single().subscribeOn(Schedulers.single()).subscribe(authState -> { onUpdatesTerminated(); if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { logger.warn("Updates stream has closed while" @@ -104,7 +106,7 @@ public class AsyncTdEasy { } }); }).doOnError(ex -> { - authState.asFlux().take(1).single().subscribe(authState -> { + authState.asFlux().take(1).single().subscribeOn(Schedulers.single()).subscribe(authState -> { onUpdatesTerminated(); if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { logger.warn("Updates stream has terminated with an error while" diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessage.java b/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessage.java index a040dea..bfe3d5d 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessage.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessage.java @@ -1,5 +1,6 @@ package it.tdlight.tdlibsession.td.middle; +import io.vertx.core.json.JsonObject; import java.util.Arrays; import java.util.Objects; import java.util.StringJoiner; @@ -10,12 +11,14 @@ public final class StartSessionMessage { private final String alias; private final byte[] binlog; private final long binlogDate; + private final JsonObject implementationDetails; - public StartSessionMessage(int id, String alias, byte[] binlog, long binlogDate) { + public StartSessionMessage(int id, String alias, byte[] binlog, long binlogDate, JsonObject implementationDetails) { this.id = id; this.alias = alias; this.binlog = binlog; this.binlogDate = binlogDate; + this.implementationDetails = implementationDetails; } public int id() { @@ -34,6 +37,10 @@ public final class StartSessionMessage { return binlogDate; } + public JsonObject implementationDetails() { + return implementationDetails; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -54,7 +61,10 @@ public final class StartSessionMessage { if (!Objects.equals(alias, that.alias)) { return false; } - return Arrays.equals(binlog, that.binlog); + if (!Arrays.equals(binlog, that.binlog)) { + return false; + } + return Objects.equals(implementationDetails, that.implementationDetails); } @Override @@ -63,6 +73,7 @@ public final class StartSessionMessage { result = 31 * result + (alias != null ? alias.hashCode() : 0); result = 31 * result + Arrays.hashCode(binlog); result = 31 * result + (int) (binlogDate ^ (binlogDate >>> 32)); + result = 31 * result + (implementationDetails != null ? implementationDetails.hashCode() : 0); return result; } @@ -73,6 +84,7 @@ public final class StartSessionMessage { .add("alias='" + alias + "'") .add("binlog=" + Arrays.toString(binlog)) .add("binlogDate=" + binlogDate) + .add("implementationDetails=" + implementationDetails) .toString(); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessageCodec.java index d58fbd4..ca8d3b4 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessageCodec.java @@ -2,6 +2,7 @@ package it.tdlight.tdlibsession.td.middle; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.MessageCodec; +import io.vertx.core.json.JsonObject; import it.tdlight.utils.VertxBufferInputStream; import it.tdlight.utils.VertxBufferOutputStream; import org.warp.commonutils.stream.SafeDataInputStream; @@ -25,6 +26,7 @@ public class StartSessionMessageCodec implements MessageCodeccreate(sink -> { diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index f32641e..9f53043 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -1,6 +1,7 @@ package it.tdlight.tdlibsession.td.middle.client; import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.json.JsonObject; import io.vertx.reactivex.core.Vertx; import io.vertx.reactivex.core.eventbus.Message; import io.vertx.reactivex.core.eventbus.MessageConsumer; @@ -22,7 +23,6 @@ import it.tdlight.utils.MonoUtils; import it.tdlight.utils.MonoUtils.SinkRWStream; import java.nio.file.Path; import java.time.Duration; -import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -66,6 +66,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { int botId, String botAlias, boolean local, + JsonObject implementationDetails, Path binlogsArchiveDirectory) { var instance = new AsyncTdMiddleEventBusClient(clusterManager); return retrieveBinlog(clusterManager.getVertx(), binlogsArchiveDirectory, botId) @@ -78,7 +79,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .thenReturn(binlog) ) .flatMap(binlog -> instance - .start(botId, botAlias, local, binlog) + .start(botId, botAlias, local, implementationDetails, binlog) .thenReturn(instance) ) .single(); @@ -96,7 +97,11 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return this.binlog.asMono().flatMap(binlog -> BinlogUtils.saveBinlog(binlog, data)); } - public Mono start(int botId, String botAlias, boolean local, BinlogAsyncFile binlog) { + public Mono start(int botId, + String botAlias, + boolean local, + JsonObject implementationDetails, + BinlogAsyncFile binlog) { this.botId = botId; this.botAlias = botAlias; this.botAddress = "bots.bot." + this.botId; @@ -111,16 +116,22 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { var binlogLastModifiedTime = tuple.getT1(); var binlogData = tuple.getT2(); - var msg = new StartSessionMessage(this.botId, this.botAlias, binlogData, binlogLastModifiedTime); + var msg = new StartSessionMessage(this.botId, + this.botAlias, + binlogData, + binlogLastModifiedTime, + implementationDetails + ); return setupUpdatesListener() - .then(cluster.getEventBus().rxRequest("bots.start-bot", msg).as(MonoUtils::toMono)) + .then(Mono.defer(() -> local ? Mono.empty() + : cluster.getEventBus().rxRequest("bots.start-bot", msg).as(MonoUtils::toMono))) .then(); }); } @SuppressWarnings("CallingSubscribeInNonBlockingScope") private Mono setupUpdatesListener() { - MessageConsumer updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().consumer(botAddress + ".updates").getDelegate()); + MessageConsumer updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().consumer(botAddress + ".updates").setMaxBufferedMessages(5000).getDelegate()); updateConsumer.endHandler(h -> { logger.error("<<<<<<<<<<<<<<<>>>>>>>>>>>>"); }); @@ -138,7 +149,12 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { @Override public Flux receive() { // Here the updates will be received - return cluster.getEventBus().rxRequest(botAddress + ".ready-to-receive", EMPTY).as(MonoUtils::toMono) + return Mono + .fromRunnable(() -> logger.trace("Called receive() from parent")) + .doOnSuccess(s -> logger.trace("Sending ready-to-receive")) + .then(cluster.getEventBus().rxRequest(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout).as(MonoUtils::toMono)) + .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) + .doOnSuccess(s -> logger.trace("About to read updates flux")) .thenMany(updates.readAsFlux()) .cast(io.vertx.core.eventbus.Message.class) .timeout(Duration.ofSeconds(20), Mono.fromCallable(() -> { @@ -157,15 +173,14 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .doOnTerminate(updatesStreamEnd::tryEmitEmpty); } - private Publisher interceptUpdate(TdApi.Object update) { + private Mono interceptUpdate(TdApi.Object update) { switch (update.getConstructor()) { case TdApi.UpdateAuthorizationState.CONSTRUCTOR: var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; switch (updateAuthorizationState.authorizationState.getConstructor()) { case TdApi.AuthorizationStateClosed.CONSTRUCTOR: - return cluster - .getEventBus() - .rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono) + return Mono.fromRunnable(() -> logger.trace("Received AuthorizationStateClosed from tdlib")) + .then(cluster.getEventBus().rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) .doOnNext(l -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(l.body().binlog().length))) .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.body().binlog())) .doOnSuccess(s -> logger.info("Overwritten binlog from server")) @@ -182,16 +197,20 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return Mono .firstWithSignal( MonoUtils.castVoid(crash.asMono()), - cluster.getEventBus() - .rxRequest(botAddress + ".execute", req, deliveryOptions).as(MonoUtils::toMono) + Mono + .fromRunnable(() -> logger.trace("Executing request {}", request)) + .then(cluster.getEventBus().rxRequest(botAddress + ".execute", req, deliveryOptions).as(MonoUtils::toMono)) .onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex)) - .>flatMap(resp -> Mono.fromCallable(() -> { - if (resp.body() == null) { - throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Response is empty")); - } else { - return resp.body().toTdResult(); - } - })) + .>flatMap(resp -> Mono + .fromCallable(() -> { + if (resp.body() == null) { + throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Response is empty")); + } else { + return resp.body().toTdResult(); + } + }) + ) + .doOnSuccess(s -> logger.trace("Executed request")) ) .switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> { throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty")); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index 3afb7d8..a3ad09a 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -12,6 +12,7 @@ import it.tdlight.tdlibsession.td.ResponseError; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions; +import it.tdlight.tdlibsession.td.direct.TelegramClientFactory; import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.utils.MonoUtils; @@ -26,21 +27,26 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleDirect.class); + private final TelegramClientFactory clientFactory; + protected AsyncTdDirectImpl td; private String botAddress; private String botAlias; private final Empty closeRequest = Sinks.empty(); public AsyncTdMiddleDirect() { + this.clientFactory = new TelegramClientFactory(); } public static Mono getAndDeployInstance(TdClusterManager clusterManager, String botAlias, - String botAddress) { + String botAddress, + JsonObject implementationDetails) { var instance = new AsyncTdMiddleDirect(); var options = clusterManager.newDeploymentOpts().setConfig(new JsonObject() .put("botAlias", botAlias) - .put("botAddress", botAddress)); + .put("botAddress", botAddress) + .put("implementationDetails", implementationDetails)); return clusterManager.getVertx() .rxDeployVerticle(instance, options) .as(MonoUtils::toMono) @@ -60,8 +66,12 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd throw new IllegalArgumentException("botAlias is not set!"); } this.botAlias = botAlias; + var implementationDetails = config().getJsonObject("implementationDetails"); + if (implementationDetails == null) { + throw new IllegalArgumentException("implementationDetails is not set!"); + } - this.td = new AsyncTdDirectImpl(botAlias); + this.td = new AsyncTdDirectImpl(clientFactory, implementationDetails, botAlias); startPromise.complete(); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java index f6fa731..1e3321d 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java @@ -29,19 +29,26 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle { private final AsyncTdMiddleEventBusServer srv; private final One cli = Sinks.one(); + private final JsonObject implementationDetails; - public AsyncTdMiddleLocal(TdClusterManager masterClusterManager, String botAlias, int botId) { + public AsyncTdMiddleLocal(TdClusterManager masterClusterManager, + String botAlias, + int botId, + JsonObject implementationDetails) { this.masterClusterManager = masterClusterManager; this.botAlias = botAlias; this.botId = botId; + this.implementationDetails = implementationDetails; this.vertx = masterClusterManager.getVertx(); this.deploymentOptions = masterClusterManager .newDeploymentOpts() .setConfig(new JsonObject() .put("botId", botId) .put("botAlias", botAlias) - .put("local", true)); + .put("local", true) + .put("implementationDetails", implementationDetails) + ); this.srv = new AsyncTdMiddleEventBusServer(); } @@ -51,7 +58,10 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle { .single() .then(Mono.fromSupplier(() -> new AsyncTdMiddleEventBusClient(masterClusterManager))) .zipWith(AsyncTdMiddleEventBusClient.retrieveBinlog(vertx, Path.of("binlogs"), botId)) - .flatMap(tuple -> tuple.getT1().start(botId, botAlias, true, tuple.getT2()).thenReturn(tuple.getT1())) + .flatMap(tuple -> tuple + .getT1() + .start(botId, botAlias, true, implementationDetails, tuple.getT2()) + .thenReturn(tuple.getT1())) .onErrorMap(InitializationException::new) .doOnNext(this.cli::tryEmitValue) .doOnError(this.cli::tryEmitError) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 11daacc..419dbe4 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -18,6 +18,7 @@ import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResultMessage; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions; +import it.tdlight.tdlibsession.td.direct.TelegramClientFactory; import it.tdlight.tdlibsession.td.middle.ExecuteObject; import it.tdlight.tdlibsession.td.middle.TdResultList; import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; @@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Empty; import reactor.core.publisher.Sinks.One; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuples; @@ -43,6 +45,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { // Values configured from constructor private final AsyncTdDirectOptions tdOptions; + private final TelegramClientFactory clientFactory; // Variables configured by the user at startup private final One botId = Sinks.one(); @@ -57,48 +60,55 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { private final One> readyToReceiveConsumer = Sinks.one(); private final One> pingConsumer = Sinks.one(); private final One> pipeFlux = Sinks.one(); + private final Empty terminatePingOverPipeFlux = Sinks.empty(); public AsyncTdMiddleEventBusServer() { - this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 15); + this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 50); + this.clientFactory = new TelegramClientFactory(); } @Override public Completable rxStart() { - return MonoUtils.toCompletable(Mono - .fromCallable(() -> { - var botId = config().getInteger("botId"); - if (botId == null || botId <= 0) { - throw new IllegalArgumentException("botId is not set!"); - } - if (this.botId.tryEmitValue(botId).isFailure()) { - throw new IllegalStateException("Failed to set botId"); - } - var botAddress = "bots.bot." + botId; - if (this.botAddress.tryEmitValue(botAddress).isFailure()) { - throw new IllegalStateException("Failed to set botAddress"); - } - var botAlias = config().getString("botAlias"); - if (botAlias == null || botAlias.isEmpty()) { - throw new IllegalArgumentException("botAlias is not set!"); - } - if (this.botAlias.tryEmitValue(botAlias).isFailure()) { - throw new IllegalStateException("Failed to set botAlias"); - } - var local = config().getBoolean("local"); - if (local == null) { - throw new IllegalArgumentException("local is not set!"); - } - if (this.local.tryEmitValue(local).isFailure()) { - throw new IllegalStateException("Failed to set local"); - } + return MonoUtils + .toCompletable(MonoUtils + .fromBlockingMaybe(() -> { + logger.trace("Stating verticle"); + var botId = config().getInteger("botId"); + if (botId == null || botId <= 0) { + throw new IllegalArgumentException("botId is not set!"); + } + if (this.botId.tryEmitValue(botId).isFailure()) { + throw new IllegalStateException("Failed to set botId"); + } + var botAddress = "bots.bot." + botId; + if (this.botAddress.tryEmitValue(botAddress).isFailure()) { + throw new IllegalStateException("Failed to set botAddress"); + } + var botAlias = config().getString("botAlias"); + if (botAlias == null || botAlias.isEmpty()) { + throw new IllegalArgumentException("botAlias is not set!"); + } + if (this.botAlias.tryEmitValue(botAlias).isFailure()) { + throw new IllegalStateException("Failed to set botAlias"); + } + var local = config().getBoolean("local"); + if (local == null) { + throw new IllegalArgumentException("local is not set!"); + } + var implementationDetails = config().getJsonObject("implementationDetails"); + if (implementationDetails == null) { + throw new IllegalArgumentException("implementationDetails is not set!"); + } - var td = new AsyncTdDirectImpl(botAlias); - if (this.td.tryEmitValue(td).isFailure()) { - throw new IllegalStateException("Failed to set td instance"); - } - return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local); - }) - .flatMap(Mono::hide)); + var td = new AsyncTdDirectImpl(clientFactory, implementationDetails, botAlias); + if (this.td.tryEmitValue(td).isFailure()) { + throw new IllegalStateException("Failed to set td instance"); + } + return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local); + }) + .flatMap(Mono::hide) + .doOnSuccess(s -> logger.trace("Stated verticle")) + ); } private Mono onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { @@ -115,6 +125,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { private Mono listen(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { return Mono.create(registrationSink -> { + logger.trace("Preparing listeners"); + MessageConsumer executeConsumer = vertx.eventBus().consumer(botAddress + ".execute"); if (this.executeConsumer.tryEmitValue(executeConsumer).isFailure()) { registrationSink.error(new IllegalStateException("Failed to set executeConsumer")); @@ -126,10 +138,12 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { executeConsumer.endHandler(h -> sink.complete()); }) .flatMap(msg -> { + logger.trace("Received execute request {}", msg.body()); var request = overrideRequest(msg.body().getRequest(), botId); return td .execute(request, msg.body().isExecuteDirectly()) - .map(result -> Tuples.of(msg, result)); + .map(result -> Tuples.of(msg, result)) + .doOnSuccess(s -> logger.trace("Executed successfully")); }) .handle((tuple, sink) -> { var msg = tuple.getT1(); @@ -137,16 +151,21 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { var replyOpts = new DeliveryOptions().setLocalOnly(local); var replyValue = new TdResultMessage(response.result(), response.cause()); try { + logger.trace("Replying with success response"); msg.reply(replyValue, replyOpts); sink.next(response); } catch (Exception ex) { + logger.trace("Replying with error response: {}", ex.getLocalizedMessage()); msg.fail(500, ex.getLocalizedMessage()); sink.error(ex); } }) .then() .subscribeOn(Schedulers.single()) - .subscribe(v -> {}, ex -> logger.error("Error when processing an execute request", ex)); + .subscribe(v -> {}, + ex -> logger.error("Error when processing an execute request", ex), + () -> logger.trace("Finished handling execute requests") + ); MessageConsumer readBinlogConsumer = vertx.eventBus().consumer(botAddress + ".read-binlog"); if (this.readBinlogConsumer.tryEmitValue(readBinlogConsumer).isFailure()) { @@ -163,28 +182,37 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { registrationSink.error(new IllegalStateException("Failed to set readyToReceiveConsumer")); return; } - Flux + + // Pipe the data + var pipeSubscription = Flux .>create(sink -> { readyToReceiveConsumer.handler(sink::next); readyToReceiveConsumer.endHandler(h -> sink.complete()); }) + .take(1) + .limitRequest(1) + .single() .flatMap(msg -> this.pipeFlux .asMono() .timeout(Duration.ofSeconds(5)) .map(pipeFlux -> Tuples.of(msg, pipeFlux))) - .doOnNext(tuple -> { + .doOnError(ex -> logger.error("Error when processing a ready-to-receive request", ex)) + .flatMapMany(tuple -> { var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); + tuple.getT1().reply(EMPTY, opts); + logger.trace("Replied to ready-to-receive"); // Start piping the data - //noinspection CallingSubscribeInNonBlockingScope - tuple.getT2() - .subscribeOn(Schedulers.single()) - .subscribe(); + return tuple.getT2().doOnSubscribe(s -> { + logger.trace("Subscribed to updates pipe"); + }); }) .then() + .doOnSuccess(s -> logger.trace("Finished handling ready-to-receive requests (updates pipe ended)")) .subscribeOn(Schedulers.single()) - .subscribe(v -> {}, ex -> logger.error("Error when processing a ready-to-receive request", ex)); + // Don't handle errors here. Handle them in pipeFlux + .subscribe(v -> {}); MessageConsumer pingConsumer = vertx.eventBus().consumer(botAddress + ".ping"); if (this.pingConsumer.tryEmitValue(pingConsumer).isFailure()) { @@ -202,7 +230,10 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { }) .then() .subscribeOn(Schedulers.single()) - .subscribe(v -> {}, ex -> logger.error("Error when processing a ping request", ex)); + .subscribe(v -> {}, + ex -> logger.error("Error when processing a ping request", ex), + () -> logger.trace("Finished handling ping requests") + ); //noinspection ResultOfMethodCallIgnored @@ -212,6 +243,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .andThen(readyToReceiveConsumer.rxCompletionHandler()) .andThen(pingConsumer.rxCompletionHandler()) .subscribeOn(io.reactivex.schedulers.Schedulers.single()) + .doOnComplete(() -> logger.trace("Finished preparing listeners")) .subscribe(registrationSink::success, registrationSink::error); }); } @@ -268,37 +300,37 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } private Mono pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { + logger.trace("Preparing to pipe requests"); Flux updatesFlux = td .receive(tdOptions) - .flatMap(item -> Mono.defer(() -> { + .takeUntil(item -> { if (item instanceof Update) { var tdUpdate = (Update) item; if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { - var tdUpdateAuthorizationState = (UpdateAuthorizationState) tdUpdate; - if (tdUpdateAuthorizationState.authorizationState.getConstructor() - == AuthorizationStateClosed.CONSTRUCTOR) { - logger.debug("Undeploying after receiving AuthorizationStateClosed"); - return rxStop().as(MonoUtils::toMono).thenReturn(item); + var updateAuthorizationState = (UpdateAuthorizationState) tdUpdate; + if (updateAuthorizationState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { + return true; } } } else if (item instanceof Error) { - // An error in updates means that a fatal error occurred - logger.debug("Undeploying after receiving a fatal error"); - return rxStop().as(MonoUtils::toMono).thenReturn(item); + return true; } - return Mono.just(item); - })) - .flatMap(item -> Mono.fromCallable(() -> { - if (item.getConstructor() == TdApi.Error.CONSTRUCTOR) { - var error = (Error) item; + return false; + }) + .flatMap(update -> Mono.fromCallable(() -> { + if (update.getConstructor() == TdApi.Error.CONSTRUCTOR) { + var error = (Error) update; throw new TdError(error.code, error.message); } else { - return item; + return update; } })) .bufferTimeout(tdOptions.getEventsSize(), local ? Duration.ofMillis(1) : Duration.ofMillis(100)) - .windowTimeout(1, Duration.ofSeconds(5)) - .flatMap(w -> w.defaultIfEmpty(Collections.emptyList())) + .doFinally(signalType -> terminatePingOverPipeFlux.tryEmitEmpty()) + .mergeWith(Flux + .interval(Duration.ofSeconds(5)) + .map(l -> Collections.emptyList()) + .takeUntilOther(terminatePingOverPipeFlux.asMono())) .map(TdResultList::new); var fluxCodec = new TdResultListMessageCodec(); @@ -312,7 +344,33 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .sender(botAddress + ".updates", opts); var pipeFlux = updatesFlux - .flatMap(update -> updatesSender.rxWrite(update).as(MonoUtils::toMono).then()) + .flatMap(updatesList -> updatesSender + .rxWrite(updatesList) + .as(MonoUtils::toMono) + .thenReturn(updatesList) + ) + .flatMap(updatesList -> Flux + .fromIterable(updatesList.value()) + .flatMap(item -> { + if (item instanceof Update) { + var tdUpdate = (Update) item; + if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { + var tdUpdateAuthorizationState = (UpdateAuthorizationState) tdUpdate; + if (tdUpdateAuthorizationState.authorizationState.getConstructor() + == AuthorizationStateClosed.CONSTRUCTOR) { + logger.debug("Undeploying after receiving AuthorizationStateClosed"); + return rxStop().as(MonoUtils::toMono).thenReturn(item); + } + } + } else if (item instanceof Error) { + // An error in updates means that a fatal error occurred + logger.debug("Undeploying after receiving a fatal error"); + return rxStop().as(MonoUtils::toMono).thenReturn(item); + } + return Mono.just(item); + }) + .then() + ) .doOnTerminate(() -> updatesSender.close(h -> { if (h.failed()) { logger.error("Failed to close \"updates\" message sender"); @@ -324,7 +382,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .doOnError(ex2 -> logger.error("Unexpected error", ex2)) .then(); }); - MonoUtils.emitValue(this.pipeFlux, pipeFlux); - return Mono.empty(); + return MonoUtils.emitValue(this.pipeFlux, pipeFlux) + .doOnSuccess(s -> logger.trace("Prepared piping requests successfully")); } } diff --git a/src/main/java/it/tdlight/utils/BinlogUtils.java b/src/main/java/it/tdlight/utils/BinlogUtils.java index 1dc33ba..29a15b2 100644 --- a/src/main/java/it/tdlight/utils/BinlogUtils.java +++ b/src/main/java/it/tdlight/utils/BinlogUtils.java @@ -69,13 +69,20 @@ public class BinlogUtils { .then(); } - public static Mono cleanSessionPath(FileSystem vertxFilesystem, Path binlogPath, Path sessionPath) { + public static Mono cleanSessionPath(FileSystem vertxFilesystem, + Path binlogPath, + Path sessionPath, + Path mediaPath) { return vertxFilesystem .rxReadFile(binlogPath.toString()).as(MonoUtils::toMono) .flatMap(buffer -> vertxFilesystem .rxReadDir(sessionPath.toString(), "^(?!td.binlog$).*").as(MonoUtils::toMono) .flatMapMany(Flux::fromIterable) - .doOnNext(file -> logger.debug("Deleting file {}", file)) + .doOnNext(file -> logger.debug("Deleting session file {}", file)) + .flatMap(file -> vertxFilesystem.rxDeleteRecursive(file, true).as(MonoUtils::toMono)) + .then(vertxFilesystem.rxReadDir(mediaPath.toString(), "^(?!td.binlog$).*").as(MonoUtils::toMono)) + .flatMapMany(Flux::fromIterable) + .doOnNext(file -> logger.debug("Deleting media file {}", file)) .flatMap(file -> vertxFilesystem.rxDeleteRecursive(file, true).as(MonoUtils::toMono)) .onErrorResume(ex -> Mono.empty()) .then() diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index 6307b9b..137609a 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -99,7 +99,7 @@ public class MonoUtils { } public static Mono fromBlockingMaybe(Callable callable) { - return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic()); + return Mono.fromCallable(callable).publishOn(Schedulers.boundedElastic()); } public static Mono fromBlockingSingle(Callable callable) { @@ -471,7 +471,9 @@ public class MonoUtils { @Override public io.vertx.core.streams.ReadStream fetch(long amount) { if (fetchMode.get()) { - readCoreSubscription.request(amount); + if (amount > 0) { + readCoreSubscription.request(amount); + } } return this; } @@ -498,6 +500,7 @@ public class MonoUtils { @Override public void end(Handler> handler) { + /* MonoUtils.emitCompleteFuture(sink).recover(error -> { if (error instanceof EmissionException) { var sinkError = (EmissionException) error; @@ -514,6 +517,8 @@ public class MonoUtils { drainSubscription.dispose(); } }).onComplete(handler); + */ + MonoUtils.emitCompleteFuture(sink).onComplete(handler); } @Override @@ -621,7 +626,9 @@ public class MonoUtils { @Override public io.vertx.core.streams.ReadStream fetch(long amount) { if (fetchMode.get()) { - readCoreSubscription.request(amount); + if (amount > 0) { + readCoreSubscription.request(amount); + } } return this; }