diff --git a/src/main/java/it/tdlight/tdlibsession/FatalErrorType.java b/src/main/java/it/tdlight/tdlibsession/FatalErrorType.java index 5c65983..4356c58 100644 --- a/src/main/java/it/tdlight/tdlibsession/FatalErrorType.java +++ b/src/main/java/it/tdlight/tdlibsession/FatalErrorType.java @@ -1,5 +1,5 @@ package it.tdlight.tdlibsession; public enum FatalErrorType { - ACCESS_TOKEN_INVALID, PHONE_NUMBER_INVALID, CONNECTION_KILLED + ACCESS_TOKEN_INVALID, PHONE_NUMBER_INVALID, CONNECTION_KILLED, INVALID_UPDATE } diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/DeployClientResult.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/DeployClientResult.java new file mode 100644 index 0000000..b4fd4d7 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/DeployClientResult.java @@ -0,0 +1,7 @@ +package it.tdlight.tdlibsession.remoteclient; + +public enum DeployClientResult { + DEPLOYED, + IGNORED, + FAILED +} diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/RemoteClientBotAddresses.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/RemoteClientBotAddresses.java new file mode 100644 index 0000000..94facfa --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/RemoteClientBotAddresses.java @@ -0,0 +1,42 @@ +package it.tdlight.tdlibsession.remoteclient; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +public class RemoteClientBotAddresses { + + private final Set addresses; + private final Path addressesFilePath; + + public RemoteClientBotAddresses(Path addressesFilePath) throws IOException { + this.addressesFilePath = addressesFilePath; + if (Files.notExists(addressesFilePath)) { + Files.createFile(addressesFilePath); + } + addresses = Files.readAllLines(addressesFilePath, StandardCharsets.UTF_8).stream().filter(address -> !address.isBlank()).collect(Collectors.toSet()); + } + + public synchronized void putAddress(String address) throws IOException { + addresses.add(address); + Files.write(addressesFilePath, addresses, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.SYNC); + } + + public synchronized void removeAddress(String address) throws IOException { + addresses.remove(address); + Files.write(addressesFilePath, addresses, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.SYNC); + } + + public synchronized boolean has(String botAddress) { + return addresses.contains(botAddress); + } + + public synchronized Set values() { + return new HashSet<>(addresses); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index a5b7ca2..a0dbf48 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -1,28 +1,31 @@ package it.tdlight.tdlibsession.remoteclient; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; import io.vertx.core.net.JksOptions; +import io.vertx.core.shareddata.AsyncMap; +import io.vertx.core.shareddata.Lock; import it.tdlight.common.Init; import it.tdlight.common.utils.CantLoadLibrary; import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer; -import it.tdlight.utils.MonoUtils; import java.io.IOException; import java.net.URISyntaxException; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.LinkedHashSet; -import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.LogManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Mono; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Many; public class TDLibRemoteClient implements AutoCloseable { @@ -33,16 +36,14 @@ public class TDLibRemoteClient implements AutoCloseable { private final String netInterface; private final int port; private final Set membersAddresses; - private final LinkedHashSet botIds; - private final ReplayProcessor clusterManager = ReplayProcessor.cacheLast(); + private final Many clusterManager = Sinks.many().replay().latest(); - public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set membersAddresses, Set botIds) { + public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set membersAddresses) { this.securityInfo = securityInfo; this.masterHostname = masterHostname; this.netInterface = netInterface; this.port = port; this.membersAddresses = membersAddresses; - this.botIds = new LinkedHashSet<>(botIds); try { Init.start(); @@ -66,19 +67,17 @@ public class TDLibRemoteClient implements AutoCloseable { Set membersAddresses = Set.of(args[2].split(",")); - Set botIds = Set.of(args[3].split(",")); - - Path keyStorePath = Paths.get(args[4]); - Path keyStorePasswordPath = Paths.get(args[5]); - Path trustStorePath = Paths.get(args[6]); - Path trustStorePasswordPath = Paths.get(args[7]); + Path keyStorePath = Paths.get(args[3]); + Path keyStorePasswordPath = Paths.get(args[4]); + Path trustStorePath = Paths.get(args[5]); + Path trustStorePasswordPath = Paths.get(args[6]); var loggerContext = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false); loggerContext.setConfigLocation(TDLibRemoteClient.class.getResource("/tdlib-session-container-log4j2.xml").toURI()); var securityInfo = new SecurityInfo(keyStorePath, keyStorePasswordPath, trustStorePath, trustStorePasswordPath); - new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses, botIds).run(x -> {}); + new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses).run(x -> {}); } public void start(Handler startedEventHandler) throws IllegalStateException { @@ -97,6 +96,9 @@ public class TDLibRemoteClient implements AutoCloseable { logger.info("TDLib remote client is being hosted on" + netInterface + ":" + port + ". Master: " + masterHostname); + var botAddresses = new RemoteClientBotAddresses(Paths.get("remote_client_bot_addresses.txt")); + botAddresses.values().forEach(botAddress -> logger.info("Bot address is registered on this cluster:" + botAddress)); + var keyStoreOptions = new JksOptions() .setPath(securityInfo.getKeyStorePath().toAbsolutePath().toString()) .setPassword(securityInfo.getKeyStorePassword()); @@ -105,40 +107,93 @@ public class TDLibRemoteClient implements AutoCloseable { .setPath(securityInfo.getTrustStorePath().toAbsolutePath().toString()) .setPassword(securityInfo.getTrustStorePassword()); - Mono flux; - if (!botIds.isEmpty()) { - flux = TdClusterManager.ofNodes(keyStoreOptions, + TdClusterManager.ofNodes(keyStoreOptions, trustStoreOptions, false, masterHostname, netInterface, port, membersAddresses - ); - } else { - flux = Mono.empty(); - } + ) + .doOnNext(clusterManager::tryEmitNext) + .doOnTerminate(clusterManager::tryEmitComplete) + .doOnError(clusterManager::tryEmitError) + .flatMapMany(clusterManager -> { + return Flux.create(sink -> { + var sharedData = clusterManager.getSharedData(); + sharedData.getClusterWideMap("deployableBotAddresses", mapResult -> { + if (mapResult.succeeded()) { + var deployableBotAddresses = mapResult.result(); - flux - .doOnNext(clusterManager::onNext) - .doOnTerminate(clusterManager::onComplete) - .doOnError(clusterManager::onError) - .flatMapIterable(clusterManager -> botIds - .stream() - .map(id -> Map.entry(clusterManager, id)) - .collect(Collectors.toList())) - .flatMap(entry -> Mono.create(sink -> { - entry - .getKey() - .getVertx() - .deployVerticle(new AsyncTdMiddleEventBusServer(entry.getKey()), - entry.getKey().newDeploymentOpts().setConfig(new JsonObject() - .put("botAddress", entry.getValue()) - .put("botAlias", entry.getValue()) - .put("local", false)), - MonoUtils.toHandler(sink) - ); - })) + sharedData.getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> { + if (lockAcquisitionResult.succeeded()) { + var deploymentLock = lockAcquisitionResult.result(); + putAllAsync(deployableBotAddresses, botAddresses.values(), (AsyncResult putAllResult) -> { + if (putAllResult.succeeded()) { + clusterManager + .getEventBus() + .consumer("tdlib.remoteclient.clients.deploy", (Message msg) -> { + var botAddress = msg.body(); + if (botAddresses.has(botAddress)) { + deployBot(clusterManager, botAddress, deploymentResult -> { + if (deploymentResult.failed()) { + msg.fail(500, "Failed to deploy existing bot \"" + botAddress + "\": " + deploymentResult.cause().getLocalizedMessage()); + sink.error(deploymentResult.cause()); + } else { + sink.next(botAddress); + } + deploymentLock.release(); + }); + } else { + logger.info("Deploying new bot at address \"" + botAddress + "\""); + deployableBotAddresses.putIfAbsent(botAddress, netInterface, putResult -> { + if (putResult.succeeded()) { + if (putResult.result() == null) { + try { + botAddresses.putAddress(botAddress); + } catch (IOException e) { + logger.error("Can't save bot address \"" + botAddress + "\" to addresses file", e); + } + deployBot(clusterManager, botAddress, deploymentResult -> { + if (deploymentResult.failed()) { + msg.fail(500, "Failed to deploy new bot \"" + botAddress + "\": " + deploymentResult.cause().getLocalizedMessage()); + sink.error(deploymentResult.cause()); + } else { + sink.next(botAddress); + } + deploymentLock.release(); + }); + } else { + logger.error("Can't add new bot address \"" + botAddress + "\" because it's already present! Value: \"" + putResult.result() + "\""); + sink.error(new UnsupportedOperationException("Can't add new bot address \"" + botAddress + "\" because it's already present! Value: \"" + putResult.result() + "\"")); + deploymentLock.release(); + } + } else { + logger.error("Can't update shared map", putResult.cause()); + sink.error(putResult.cause()); + deploymentLock.release(); + } + }); + } + }); + } else { + logger.error("Can't update shared map", putAllResult.cause()); + sink.error(putAllResult.cause()); + deploymentLock.release(); + } + }); + } else { + logger.error("Can't obtain deployment lock", lockAcquisitionResult.cause()); + sink.error(lockAcquisitionResult.cause()); + } + }); + } else { + logger.error("Can't get shared map", mapResult.cause()); + sink.error(mapResult.cause()); + } + }); + }); + }) .doOnError(ex -> { logger.error(ex.getLocalizedMessage(), ex); }).subscribe(i -> {}, e -> {}, () -> startedEventHandler.handle(null)); @@ -147,8 +202,85 @@ public class TDLibRemoteClient implements AutoCloseable { } } + private void deployBot(TdClusterManager clusterManager, String botAddress, Handler> deploymentHandler) { + AsyncTdMiddleEventBusServer verticle = new AsyncTdMiddleEventBusServer(clusterManager); + AtomicReference deploymentLock = new AtomicReference<>(); + verticle.onBeforeStop(handler -> { + clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> { + if (lockAcquisitionResult.succeeded()) { + deploymentLock.set(lockAcquisitionResult.result()); + var sharedData = clusterManager.getSharedData(); + sharedData.getClusterWideMap("deployableBotAddresses", (AsyncResult> mapResult) -> { + if (mapResult.succeeded()) { + var deployableBotAddresses = mapResult.result(); + deployableBotAddresses.removeIfPresent(botAddress, netInterface, putResult -> { + if (putResult.succeeded()) { + if (putResult.result() != null) { + handler.complete(); + } else { + handler.fail("Can't destroy bot with address \"" + botAddress + "\" because it has been already destroyed"); + } + } else { + handler.fail(putResult.cause()); + } + }); + } else { + handler.fail(mapResult.cause()); + } + }); + } else { + handler.fail(lockAcquisitionResult.cause()); + } + }); + }); + verticle.onAfterStop(handler -> { + if (deploymentLock.get() != null) { + deploymentLock.get().release(); + } + handler.complete(); + }); + clusterManager + .getVertx() + .deployVerticle(verticle, + clusterManager + .newDeploymentOpts() + .setConfig(new JsonObject() + .put("botAddress", botAddress) + .put("botAlias", botAddress) + .put("local", false)), + (deployed) -> { + if (deployed.failed()) { + logger.error("Can't deploy bot \"" + botAddress + "\"", deployed.cause()); + } + deploymentHandler.handle(deployed); + } + ); + } + + private void putAllAsync(AsyncMap sharedMap, + Set valuesToAdd, + Handler> resultHandler) { + if (valuesToAdd.isEmpty()) { + resultHandler.handle(Future.succeededFuture()); + } else { + var valueToAdd = valuesToAdd.stream().findFirst().get(); + valuesToAdd.remove(valueToAdd); + sharedMap.putIfAbsent(valueToAdd, netInterface, result -> { + if (result.succeeded()) { + if (result.result() == null || result.result().equals(netInterface)) { + putAllAsync(sharedMap, valuesToAdd, resultHandler); + } else { + resultHandler.handle(Future.failedFuture(new UnsupportedOperationException("Key already present! Key: \"" + valueToAdd + "\", Value: \"" + result.result() + "\""))); + } + } else { + resultHandler.handle(Future.failedFuture(result.cause())); + } + }); + } + } + @Override public void close() { - clusterManager.blockFirst(); + clusterManager.asFlux().blockFirst(); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java index fe9c0f4..752817e 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java @@ -3,7 +3,6 @@ package it.tdlight.tdlibsession.td.direct; import io.vertx.core.AsyncResult; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Function; -import it.tdlight.jni.TdApi.Update; import it.tdlight.tdlibsession.td.TdResult; import java.time.Duration; import reactor.core.publisher.Flux; @@ -20,7 +19,7 @@ public interface AsyncTdDirect { * @return An incoming update or request response list. The object returned in the response may be * an empty list if the timeout expires. */ - Flux>> getUpdates(Duration receiveDuration, int eventsSize); + Flux>> getUpdates(Duration receiveDuration, int eventsSize); /** * Sends request to TDLib. May be called from any thread. diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index b35279f..7a4a300 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -5,9 +5,10 @@ import io.vertx.core.Future; import it.tdlight.common.TelegramClient; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; +import it.tdlight.jni.TdApi.Close; import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Object; -import it.tdlight.jni.TdApi.Update; +import it.tdlight.jni.TdApi.Ok; import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.tdlight.ClientManager; @@ -31,7 +32,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { private final Scheduler tdExecScheduler = Schedulers.newSingle("TdExec"); private final Scheduler tdResponsesOutputScheduler = Schedulers.boundedElastic(); - private Flux>> updatesProcessor; + private Flux>> updatesProcessor; private final String botAlias; public AsyncTdDirectImpl(String botAlias) { @@ -42,15 +43,32 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { public Mono> execute(Function request, boolean synchronous) { if (synchronous) { return Mono - .fromCallable(() -> TdResult.of(this.td.get().execute(request))) + .fromCallable(() -> { + var td = this.td.get(); + if (td == null) { + if (request.getConstructor() == Close.CONSTRUCTOR) { + return TdResult.of(new Ok()); + } + throw new IllegalStateException("TDLib client is destroyed"); + } + return TdResult.of(td.execute(request)); + }) .subscribeOn(tdResponsesScheduler) .publishOn(tdExecScheduler); } else { return Mono.>create(sink -> { try { - this.td.get().send(request, v -> { - sink.success(TdResult.of(v)); - }, sink::error); + var td = this.td.get(); + if (td == null) { + if (request.getConstructor() == Close.CONSTRUCTOR) { + sink.success(TdResult.of(new Ok())); + } + sink.error(new IllegalStateException("TDLib client is destroyed")); + } else { + td.send(request, v -> { + sink.success(TdResult.of(v)); + }, sink::error); + } } catch (Throwable t) { sink.error(t); } @@ -59,14 +77,14 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } @Override - public Flux>> getUpdates(Duration receiveDuration, int eventsSize) { + public Flux>> getUpdates(Duration receiveDuration, int eventsSize) { return updatesProcessor; } @Override public Mono initializeClient() { return Mono.create(sink -> { - var updatesConnectableFlux = Flux.>>create(emitter -> { + var updatesConnectableFlux = Flux.>>create(emitter -> { var client = ClientManager.create((Object object) -> { emitter.next(Future.succeededFuture(TdResult.of(object))); // Close the emitter if receive closed state @@ -100,9 +118,10 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { @Override public Mono destroyClient() { - return Mono.fromCallable(() -> { - // do nothing - return (Void) null; - }).single().subscribeOn(tdScheduler).publishOn(tdResponsesOutputScheduler); + return this + .execute(new TdApi.Close(), false) + .then() + .subscribeOn(tdScheduler) + .publishOn(tdResponsesOutputScheduler); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index 24ea6c4..c078474 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -138,7 +138,7 @@ public class AsyncTdEasy { * Receives fatal errors from TDLib. */ public Flux getFatalErrors() { - return Flux.from(fatalErrors); + return Flux.from(fatalErrors).publishOn(Schedulers.boundedElastic()); } /** @@ -149,8 +149,8 @@ public class AsyncTdEasy { return td.execute(request, false); } - private Mono> sendDirectly(TdApi.Function obj) { - return td.execute(obj, false); + private Mono> sendDirectly(TdApi.Function obj, boolean synchronous) { + return td.execute(obj, synchronous); } /** @@ -158,7 +158,7 @@ public class AsyncTdEasy { * @param i level */ public Mono setVerbosityLevel(int i) { - return MonoUtils.thenOrError(sendDirectly(new TdApi.SetLogVerbosityLevel(i))); + return MonoUtils.thenOrError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)); } /** @@ -166,7 +166,7 @@ public class AsyncTdEasy { * @param name option name */ public Mono clearOption(String name) { - return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()))); + return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false)); } /** @@ -175,7 +175,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionString(String name, String value) { - return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)))); + return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false)); } /** @@ -184,7 +184,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionInteger(String name, long value) { - return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)))); + return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false)); } /** @@ -193,7 +193,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionBoolean(String name, boolean value) { - return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)))); + return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false)); } /** @@ -202,7 +202,7 @@ public class AsyncTdEasy { * @return The value or nothing */ public Mono getOptionString(String name) { - return this.sendDirectly(new TdApi.GetOption(name)).flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { + return this.sendDirectly(new TdApi.GetOption(name), false).flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { switch (value.getConstructor()) { case OptionValueString.CONSTRUCTOR: return Mono.just(((OptionValueString) value).value); @@ -221,7 +221,7 @@ public class AsyncTdEasy { * @return The value or nothing */ public Mono getOptionInteger(String name) { - return this.sendDirectly(new TdApi.GetOption(name)).flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { + return this.sendDirectly(new TdApi.GetOption(name), false).flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { switch (value.getConstructor()) { case OptionValueInteger.CONSTRUCTOR: return Mono.just(((OptionValueInteger) value).value); @@ -240,7 +240,7 @@ public class AsyncTdEasy { * @return The value or nothing */ public Mono getOptionBoolean(String name) { - return this.sendDirectly(new TdApi.GetOption(name)).flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { + return this.sendDirectly(new TdApi.GetOption(name), false).flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { switch (value.getConstructor()) { case OptionValueBoolean.CONSTRUCTOR: return Mono.just(((OptionValueBoolean) value).value); @@ -289,6 +289,16 @@ public class AsyncTdEasy { .filter(closeRequested -> !closeRequested) .doOnSuccess(v -> requestedDefinitiveExit.onNext(true)) .then(td.execute(new TdApi.Close(), false)) + .doOnNext(ok -> { + logger.debug("Received Ok after TdApi.Close"); + }) + .then(authState + .filter(authorizationState -> authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) + .take(1) + .singleOrEmpty()) + .doOnNext(ok -> { + logger.info("Received AuthorizationStateClosed after TdApi.Close"); + }) .then(); } @@ -308,17 +318,17 @@ public class AsyncTdEasy { //todo: do this } - private Mono catchErrors(Object obj) { + private Mono catchErrors(Object obj) { if (obj.getConstructor() == Error.CONSTRUCTOR) { var error = (Error) obj; switch (error.message) { case "PHONE_CODE_INVALID": globalErrors.onNext(error); - return Mono.just(new AuthorizationStateWaitCode()); + return Mono.just(new UpdateAuthorizationState(new AuthorizationStateWaitCode())); case "PASSWORD_HASH_INVALID": globalErrors.onNext(error); - return Mono.just(new AuthorizationStateWaitPassword()); + return Mono.just(new UpdateAuthorizationState(new AuthorizationStateWaitPassword())); case "PHONE_NUMBER_INVALID": fatalErrors.onNext(FatalErrorType.PHONE_NUMBER_INVALID); break; @@ -328,20 +338,24 @@ public class AsyncTdEasy { case "CONNECTION_KILLED": fatalErrors.onNext(FatalErrorType.CONNECTION_KILLED); break; + case "INVALID_UPDATE": + fatalErrors.onNext(FatalErrorType.INVALID_UPDATE); + break; default: globalErrors.onNext(error); break; } return Mono.empty(); + } else { + return Mono.just((Update) obj); } - return Mono.just(obj); } public Mono isBot() { return Mono.from(settings).single().map(TdEasySettings::isBotTokenSet); } - private Publisher preprocessUpdates(Update updateObj) { + private Publisher preprocessUpdates(TdApi.Object updateObj) { return Mono .just(updateObj) .flatMap(this::catchErrors) @@ -368,28 +382,28 @@ public class AsyncTdEasy { parameters.enableStorageOptimizer = settings.enableStorageOptimizer; parameters.ignoreFileNames = settings.ignoreFileNames; return new SetTdlibParameters(parameters); - }).flatMap(this::sendDirectly)); + }).flatMap((SetTdlibParameters obj1) -> sendDirectly(obj1, false))); case AuthorizationStateWaitEncryptionKey.CONSTRUCTOR: return MonoUtils - .thenOrError(sendDirectly(new CheckDatabaseEncryptionKey())) + .thenOrError(sendDirectly(new CheckDatabaseEncryptionKey(), false)) .onErrorResume((error) -> { logger.error("Error while checking TDLib encryption key", error); - return sendDirectly(new TdApi.Close()).then(); + return sendDirectly(new TdApi.Close(), false).then(); }); case AuthorizationStateWaitPhoneNumber.CONSTRUCTOR: return MonoUtils.thenOrError(Mono.from(this.settings).flatMap(settings -> { if (settings.isPhoneNumberSet()) { return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()), new PhoneNumberAuthenticationSettings(false, false, false) - )); + ), false); } else if (settings.isBotTokenSet()) { - return sendDirectly(new CheckAuthenticationBotToken(settings.getBotToken())); + return sendDirectly(new CheckAuthenticationBotToken(settings.getBotToken()), false); } else { return Mono.error(new IllegalArgumentException("A bot is neither an user or a bot")); } })).onErrorResume((error) -> { logger.error("Error while waiting for phone number", error); - return sendDirectly(new TdApi.Close()).then(); + return sendDirectly(new TdApi.Close(), false).then(); }); case AuthorizationStateWaitRegistration.CONSTRUCTOR: var authorizationStateWaitRegistration = (AuthorizationStateWaitRegistration) obj; @@ -421,7 +435,7 @@ public class AsyncTdEasy { .defaultIfEmpty("") .doOnNext(lastName -> registerUser.lastName = lastName) ) - .then(sendDirectly(registerUser))); + .then(sendDirectly(registerUser, false))); }); case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR: var authorizationStateWaitOtherDeviceConfirmation = (AuthorizationStateWaitOtherDeviceConfirmation) obj; @@ -445,7 +459,7 @@ public class AsyncTdEasy { authorizationStateWaitCode.codeInfo.timeout, authorizationStateWaitCode.codeInfo.type ) - ).flatMap(code -> sendDirectly(new CheckAuthenticationCode(code)))); + ).flatMap(code -> sendDirectly(new CheckAuthenticationCode(code), false))); }); case AuthorizationStateWaitPassword.CONSTRUCTOR: var authorizationStateWaitPassword = (AuthorizationStateWaitPassword) obj; @@ -455,7 +469,7 @@ public class AsyncTdEasy { .flatMap(handler -> { return MonoUtils.thenOrLogRepeatError(() -> handler.onParameterRequest(Parameter.ASK_PASSWORD, new ParameterInfoPasswordHint(authorizationStateWaitPassword.passwordHint) - ).flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password)))); + ).flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password), false))); }); case AuthorizationStateReady.CONSTRUCTOR: { this.authState.onNext(new AuthorizationStateReady()); @@ -504,6 +518,6 @@ public class AsyncTdEasy { return Mono.empty(); } }) - .thenReturn(updateObj); + .then(Mono.justOrEmpty(updateObj.getConstructor() == Error.CONSTRUCTOR ? null : (Update) updateObj)); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java b/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java index 82aae3c..4590459 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java @@ -10,9 +10,9 @@ public interface AsyncTdMiddle { /** * Receives incoming updates from TDLib. * - * @return Updates + * @return Updates (or Error if received a fatal error. A fatal error means that the client is no longer working) */ - Flux getUpdates(); + Flux getUpdates(); /** * Sends request to TDLib. May be called from any thread. diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java index b030159..b6e22c3 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java @@ -19,8 +19,10 @@ import io.vertx.core.eventbus.MessageCodec; import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.http.ClientAuth; import io.vertx.core.net.JksOptions; +import io.vertx.core.shareddata.SharedData; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager; +import it.tdlight.utils.MonoUtils; import java.nio.channels.AlreadyBoundException; import java.util.ArrayList; import java.util.Collections; @@ -30,7 +32,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.jetbrains.annotations.Nullable; -import it.tdlight.utils.MonoUtils; import reactor.core.publisher.Mono; public class TdClusterManager { @@ -229,4 +230,8 @@ public class TdClusterManager { public DeploymentOptions newDeploymentOpts() { return new DeploymentOptions().setWorkerPoolName("td-main-pool"); } + + public SharedData getSharedData() { + return vertx.sharedData(); + } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptListMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptListMessageCodec.java index 2a9d265..3bf8fc6 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptListMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptListMessageCodec.java @@ -4,7 +4,6 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.MessageCodec; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Error; -import it.tdlight.jni.TdApi.Update; import it.tdlight.tdlibsession.td.TdResult; import it.unimi.dsi.fastutil.io.FastByteArrayInputStream; import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; @@ -27,7 +26,7 @@ public class TdOptListMessageCodec implements MessageCodec t1 : t) { + for (TdResult t1 : t) { if (t1.succeeded()) { dos.writeBoolean(true); t1.result().serialize(dos); @@ -55,10 +54,10 @@ public class TdOptListMessageCodec implements MessageCodec> list = new ArrayList<>(); + ArrayList> list = new ArrayList<>(); for (int i = 0; i < size; i++) { if (dis.readBoolean()) { - list.add(TdResult.succeeded((Update) TdApi.Deserializer.deserialize(dis))); + list.add(TdResult.succeeded((TdApi.Object) TdApi.Deserializer.deserialize(dis))); } else { list.add(TdResult.failed((Error) TdApi.Deserializer.deserialize(dis))); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptionalList.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptionalList.java index 73206d3..260bf26 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptionalList.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptionalList.java @@ -8,9 +8,9 @@ import java.util.StringJoiner; public class TdOptionalList { private final boolean isSet; - private final List> values; + private final List> values; - public TdOptionalList(boolean isSet, List> values) { + public TdOptionalList(boolean isSet, List> values) { this.isSet = isSet; this.values = values; } @@ -19,7 +19,7 @@ public class TdOptionalList { return isSet; } - public List> getValues() { + public List> getValues() { return values; } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 8848ba1..3273b18 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -7,12 +7,13 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Promise; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.json.JsonObject; import it.tdlight.common.ConstructorDetector; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; +import it.tdlight.jni.TdApi.Error; import it.tdlight.jni.TdApi.Function; -import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.tdlibsession.td.ResponseError; import it.tdlight.tdlibsession.td.TdResult; @@ -30,6 +31,7 @@ import java.time.Duration; import java.util.List; import java.util.Objects; import java.util.StringJoiner; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; @@ -41,7 +43,8 @@ import reactor.core.publisher.ReplayProcessor; public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements AsyncTdMiddle { - private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusClient.class); + private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusClient.class ); + public static final boolean OUTPUT_REQUESTS = false; public static final byte[] EMPTY = new byte[0]; @@ -49,7 +52,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy private final DeliveryOptions deliveryOptions; private final DeliveryOptions deliveryOptionsWithTimeout; - private ReplayProcessor> incomingUpdatesCo = ReplayProcessor.cacheLast(); + private ReplayProcessor> incomingUpdatesCo = ReplayProcessor.cacheLast(); private TdClusterManager cluster; @@ -57,6 +60,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy private String botAlias; private boolean local; private long initTime; + private MessageConsumer readyToStartConsumer; @SuppressWarnings({"unchecked", "rawtypes"}) public AsyncTdMiddleEventBusClient(TdClusterManager clusterManager) { @@ -121,37 +125,41 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy startBreaker.execute(future -> { try { - logger.error("Requesting " + botAddress + ".ping"); - cluster - .getEventBus() - .request(botAddress + ".ping", EMPTY, deliveryOptions, pingMsg -> { - if (pingMsg.succeeded()) { - logger.error("Received ping reply (succeeded)"); - logger.error("Requesting " + botAddress + ".start"); + logger.debug("Requesting tdlib.remoteclient.clients.deploy.existing"); + cluster.getEventBus().publish("tdlib.remoteclient.clients.deploy", botAddress, deliveryOptions); + + + logger.debug("Waiting for " + botAddress + ".readyToStart"); + AtomicBoolean alreadyReceived = new AtomicBoolean(false); + this.readyToStartConsumer = cluster.getEventBus().consumer(botAddress + ".readyToStart", (Message pingMsg) -> { + // Reply instantly + pingMsg.reply(new byte[0]); + + if (!alreadyReceived.getAndSet(true)) { + logger.debug("Received ping reply (succeeded)"); + logger.debug("Requesting " + botAddress + ".start"); + cluster + .getEventBus() + .request(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout, startMsg -> { + if (startMsg.succeeded()) { + logger.debug("Requesting " + botAddress + ".isWorking"); cluster .getEventBus() - .request(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout, startMsg -> { - if (startMsg.succeeded()) { - logger.error("Requesting " + botAddress + ".isWorking"); - cluster - .getEventBus() - .request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> { - if (msg.succeeded()) { - this.listen().then(this.pipe()).timeout(Duration.ofSeconds(10)).subscribe(v -> {}, future::fail, future::complete); - } else { - future.fail(msg.cause()); - } - }); + .request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> { + if (msg.succeeded()) { + this.listen().then(this.pipe()).timeout(Duration.ofSeconds(10)).subscribe(v -> {}, future::fail, future::complete); } else { - future.fail(startMsg.cause()); + future.fail(msg.cause()); } }); } else { - logger.error("Received ping reply (failed) (local=" + local + ")", pingMsg.cause()); - future.fail(pingMsg.cause()); + future.fail(startMsg.cause()); } - } - ); + }); + } else { + // Already received + } + }); } catch (Exception ex) { future.fail(ex); } @@ -165,8 +173,10 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy @Override public void stop(Promise stopPromise) { - tdClosed.onNext(true); - stopPromise.complete(); + readyToStartConsumer.unregister(result -> { + tdClosed.onNext(true); + stopPromise.complete(); + }); } private Mono listen() { @@ -178,7 +188,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy var updates = this.requestUpdatesBatchFromNetwork() .repeatWhen(nFlux -> { return Flux.push(emitter -> { - var dispos = Flux.combineLatest(nFlux, tdClosed, Pair::of).subscribe(val -> { + var dispos = Flux.combineLatest(nFlux, tdClosed.distinct(), Pair::of).subscribe(val -> { //noinspection PointlessBooleanExpression if (val.getRight() == true) { emitter.complete(); @@ -194,12 +204,19 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy }); }) // Repeat when there is one batch with a flux of updates .flatMap(batch -> batch) + .onErrorResume(error -> { + logger.error("Bot updates request failed! Marking as closed.", error); + if (error.getMessage().contains("Timed out")) { + return Flux.just(new Error(444, "CONNECTION_KILLED")); + } else { + return Flux.just(new Error(406, "INVALID_UPDATE")); + } + }) .flatMap(update -> { - return Mono.create(sink -> { + return Mono.create(sink -> { if (update.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { var state = (UpdateAuthorizationState) update; if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { - tdClosed.onNext(true); this.getVertx().undeploy(this.deploymentID(), undeployed -> { if (undeployed.failed()) { logger.error("Error when undeploying td verticle", undeployed.cause()); @@ -225,10 +242,10 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy } private static class UpdatesBatchResult { - public final Flux updatesFlux; + public final Flux updatesFlux; public final boolean completed; - private UpdatesBatchResult(Flux updatesFlux, boolean completed) { + private UpdatesBatchResult(Flux updatesFlux, boolean completed) { this.updatesFlux = updatesFlux; this.completed = completed; } @@ -242,15 +259,15 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy } } - private Mono> requestUpdatesBatchFromNetwork() { + private Mono> requestUpdatesBatchFromNetwork() { return Mono - .from(tdClosed) + .from(tdClosed.distinct()) .single() .filter(tdClosed -> !tdClosed) - .flatMap(_x -> Mono.>create(sink -> { + .flatMap(_x -> Mono.>create(sink -> { cluster.getEventBus().request(botAddress + ".getNextUpdatesBlock", EMPTY, - deliveryOptions, + deliveryOptionsWithTimeout, msg -> { if (msg.failed()) { //if (System.currentTimeMillis() - initTime <= 30000) { @@ -267,8 +284,8 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy } else { var resultBody = msg.result().body(); if (resultBody.isSet()) { - List> updates = resultBody.getValues(); - for (TdResult updateObj : updates) { + List> updates = resultBody.getValues(); + for (TdResult updateObj : updates) { if (updateObj.succeeded()) { if (OUTPUT_REQUESTS) { System.out.println(" <- " + updateObj.result() @@ -297,7 +314,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy } @Override - public Flux getUpdates() { + public Flux getUpdates() { return incomingUpdatesCo.filter(Objects::nonNull).flatMap(v -> v); } @@ -313,7 +330,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy .replace(" = ", "=")); } - return Mono.from(tdClosed).single().filter(tdClosed -> !tdClosed).>flatMap((_x) -> Mono.create(sink -> { + return Mono.from(tdClosed.distinct()).single().filter(tdClosed -> !tdClosed).>flatMap((_x) -> Mono.create(sink -> { try { cluster .getEventBus() diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index 346f817..3829bbb 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -5,20 +5,18 @@ import static it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServ import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; import io.vertx.core.json.JsonObject; +import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Object; -import it.tdlight.jni.TdApi.Update; import it.tdlight.tdlibsession.td.ResponseError; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.utils.MonoUtils; -import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.warp.commonutils.error.InitializationException; -import reactor.core.publisher.ConnectableFlux; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; @@ -31,7 +29,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd protected AsyncTdDirectImpl td; private String botAddress; private String botAlias; - private Flux updatesFluxCo; + private Flux updatesFluxCo; public AsyncTdMiddleDirect() { } @@ -78,11 +76,11 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd logger.error("Received an errored update", ResponseError.newResponseError("incoming update", botAlias, result.result().cause()) ); - return Mono.empty(); + return Mono.empty(); } } else { logger.error("Received an errored update", result.cause()); - return Mono.empty(); + return Mono.empty(); } })).publish().refCount(1); startPromise.complete(); @@ -105,7 +103,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd } @Override - public Flux getUpdates() { + public Flux getUpdates() { return Flux.from(updatesFluxCo); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java index db13000..bf4c132 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java @@ -1,19 +1,18 @@ package it.tdlight.tdlibsession.td.middle.direct; -import io.vertx.core.DeploymentOptions; import io.vertx.core.json.JsonObject; +import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Object; -import it.tdlight.jni.TdApi.Update; import it.tdlight.tdlibsession.td.TdResult; +import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; +import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClient; import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer; +import it.tdlight.utils.MonoUtils; import java.util.Objects; import org.warp.commonutils.error.InitializationException; -import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; -import it.tdlight.tdlibsession.td.middle.TdClusterManager; -import it.tdlight.utils.MonoUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; @@ -56,7 +55,7 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle { } @Override - public Flux getUpdates() { + public Flux getUpdates() { return cli.filter(Objects::nonNull).single().flatMapMany(AsyncTdMiddleEventBusClient::getUpdates); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 1c7b0af..082eceb 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -4,12 +4,14 @@ import static it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClie import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageConsumer; import it.tdlight.common.ConstructorDetector; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; -import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.tdlibsession.td.TdResultMessage; @@ -26,9 +28,11 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -40,6 +44,7 @@ import reactor.core.scheduler.Schedulers; public class AsyncTdMiddleEventBusServer extends AbstractVerticle { private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusServer.class); + private static final byte[] EMPTY = new byte[0]; // todo: restore duration to 2 seconds instead of 10 millis, when the bug of tdlight double queue wait is fixed public static final Duration WAIT_DURATION = Duration.ofSeconds(1);// Duration.ofMillis(10); @@ -54,8 +59,14 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { protected final ReplayProcessor tdClosed = ReplayProcessor.cacheLastOrDefault(false); protected AsyncTdDirectImpl td; - protected final LinkedBlockingQueue>> queue = new LinkedBlockingQueue<>(); + protected final LinkedBlockingQueue>> queue = new LinkedBlockingQueue<>(); private final Scheduler tdSrvPoll; + private List>> onBeforeStopListeners = new CopyOnWriteArrayList<>(); + private List>> onAfterStopListeners = new CopyOnWriteArrayList<>(); + private MessageConsumer startConsumer; + private MessageConsumer isWorkingConsumer; + private MessageConsumer getNextUpdatesBlockConsumer; + private MessageConsumer executeConsumer; @SuppressWarnings({"unchecked", "rawtypes"}) public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) { @@ -89,23 +100,18 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { this.local = local; this.td = new AsyncTdDirectImpl(botAlias); - cluster.getEventBus().consumer(botAddress + ".ping", (Message msg) -> { - logger.error("Received ping. Replying..."); - msg.reply(EMPTY); - logger.error("Replied."); - }); - AtomicBoolean alreadyDeployed = new AtomicBoolean(false); - cluster.getEventBus().consumer(botAddress + ".start", (Message msg) -> { + this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message msg) -> { if (alreadyDeployed.compareAndSet(false, true)) { td.initializeClient() .then(this.listen()) .then(this.pipe()) .then(Mono.create(registrationSink -> { - cluster.getEventBus().consumer(botAddress + ".isWorking", (Message workingMsg) -> { + this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message workingMsg) -> { workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local)); - }).completionHandler(MonoUtils.toHandler(registrationSink)); + }); + this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink)); })) .subscribe(v -> {}, ex -> { @@ -119,7 +125,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } else { msg.reply(EMPTY); } - }).completionHandler(h -> { + }); + startConsumer.completionHandler(h -> { logger.info(botAddress + " server deployed. succeeded: " + h.succeeded()); if (h.succeeded()) { startPromise.complete(h.result()); @@ -127,30 +134,131 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { startPromise.fail(h.cause()); } }); + + logger.debug("Sending " + botAddress + ".readyToStart"); + cluster.getEventBus().send(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(10000)); + + var clientDeadCheckThread = new Thread(() -> { + Throwable ex = null; + try { + while (!Thread.interrupted()) { + Thread.sleep(5000); + Promise promise = Promise.promise(); + cluster + .getEventBus() + .request(botAddress + ".readyToStart", + EMPTY, + cluster.newDeliveryOpts().setSendTimeout(10000), + r -> promise.handle(r.mapEmpty()) + ); + promise.future().toCompletionStage().toCompletableFuture().join(); + } + } catch (Throwable e) { + ex = e; + } + var closed = tdClosed.blockFirst(); + if (closed == null || !closed) { + if (ex != null && !ex.getMessage().contains("NO_HANDLERS")) { + logger.error(ex.getLocalizedMessage(), ex); + } + logger.error("TDLib client disconnected unexpectedly! Closing the server..."); + undeploy(() -> {}); + } + }); + clientDeadCheckThread.setName("Client " + botAddress + " dead check"); + clientDeadCheckThread.setDaemon(true); + clientDeadCheckThread.start(); + } + + public void onBeforeStop(Consumer> r) { + this.onBeforeStopListeners.add(r); + } + + public void onAfterStop(Consumer> r) { + this.onAfterStopListeners.add(r); } @Override public void stop(Promise stopPromise) { - tdClosed.onNext(true); - td.destroyClient().onErrorResume(ex -> { - logger.error("Can't destroy client", ex); - return Mono.empty(); - }).doOnTerminate(() -> { - logger.debug("TdMiddle verticle stopped"); - }).subscribe(MonoUtils.toSubscriber(stopPromise)); + runAll(onBeforeStopListeners, onBeforeStopHandler -> { + if (onBeforeStopHandler.failed()) { + logger.error("A beforeStop listener failed: "+ onBeforeStopHandler.cause()); + } + + td.destroyClient().onErrorResume(ex -> { + logger.error("Can't destroy client", ex); + return Mono.empty(); + }).doOnError(err -> { + logger.error("TdMiddle verticle failed during stop", err); + }).then(Mono.create(sink -> { + this.isWorkingConsumer.unregister(result -> { + if (result.failed()) { + logger.error("Can't unregister consumer", result.cause()); + } + this.startConsumer.unregister(result2 -> { + if (result2.failed()) { + logger.error("Can't unregister consumer", result2.cause()); + } + + tdClosed.onNext(true); + + this.getNextUpdatesBlockConsumer.unregister(result3 -> { + if (result3.failed()) { + logger.error("Can't unregister consumer", result3.cause()); + } + + this.executeConsumer.unregister(result4 -> { + if (result4.failed()) { + logger.error("Can't unregister consumer", result4.cause()); + } + + sink.success(); + }); + }); + }); + }); + })).doFinally(signalType -> { + logger.info("TdMiddle verticle \"" + botAddress + "\" stopped"); + + runAll(onAfterStopListeners, onAfterStopHandler -> { + if (onAfterStopHandler.failed()) { + logger.error("An afterStop listener failed: " + onAfterStopHandler.cause()); + } + + stopPromise.complete(); + }); + }).subscribe(); + }); + } + + private void runAll(List>> actions, Handler> resultHandler) { + if (actions.isEmpty()) { + resultHandler.handle(Future.succeededFuture()); + } else { + var firstAction = actions.remove(0); + Promise promise = Promise.promise(); + firstAction.accept(promise); + promise.future().onComplete(handler -> { + if (handler.succeeded()) { + runAll(new ArrayList<>(actions), resultHandler); + } else { + resultHandler.handle(Future.failedFuture(handler.cause())); + } + }); + } } private Mono listen() { return Mono.create(registrationSink -> { - cluster.getEventBus().consumer(botAddress + ".getNextUpdatesBlock", (Message msg) -> { + this.getNextUpdatesBlockConsumer = cluster.getEventBus().consumer(botAddress + ".getNextUpdatesBlock", (Message msg) -> { // Run only if tdlib is not closed Mono.from(tdClosed).single().filter(tdClosedVal -> !tdClosedVal) // Get a list of updates .flatMap(_v -> Mono - .>>>fromSupplier(() -> { + .>>>fromSupplier(() -> { // When a request is asked, read up to 1000 available updates in the queue long requestTime = System.currentTimeMillis(); - ArrayList>> updatesBatch = new ArrayList<>(); + ArrayList>> updatesBatch = new ArrayList<>(); try { // Block until an update is found or 5 seconds passed var item = queue.poll(5, TimeUnit.SECONDS); @@ -207,20 +315,14 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { if (received.succeeded() && received.result().getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { var authState = (UpdateAuthorizationState) received.result(); if (authState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { - tdClosed.onNext(true); - vertx.undeploy(deploymentID(), undeployed -> { - if (undeployed.failed()) { - logger.error("Error when undeploying td verticle", undeployed.cause()); - } - sink.success(); - }); + undeploy(sink::success); } else { sink.success(); } } else { sink.success(); } - }).then(Mono.>create(sink -> { + }).then(Mono.>create(sink -> { sink.success(received); })); } else { @@ -237,11 +339,12 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { logger.error("Error when processing a 'receiveUpdates' request", ex); msg.fail(500, ex.getLocalizedMessage()); }, () -> {}); - }).completionHandler(MonoUtils.toHandler(registrationSink)); + }); + getNextUpdatesBlockConsumer.completionHandler(MonoUtils.toHandler(registrationSink)); }).then(Mono.create(registrationSink -> { - cluster.getEventBus().consumer(botAddress + ".execute", (Message msg) -> { + this.executeConsumer = cluster.getEventBus().consumer(botAddress + ".execute", (Message msg) -> { try { if (OUTPUT_REQUESTS) { System.out.println(":=> " + msg @@ -276,11 +379,21 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { logger.error("Error when deserializing a request", ex); msg.fail(500, ex.getMessage()); } - }).completionHandler(MonoUtils.toHandler(registrationSink)); + }); + executeConsumer.completionHandler(MonoUtils.toHandler(registrationSink)); })); } + private void undeploy(Runnable whenUndeployed) { + vertx.undeploy(deploymentID(), undeployed -> { + if (undeployed.failed()) { + logger.error("Error when undeploying td verticle", undeployed.cause()); + } + whenUndeployed.run(); + }); + } + private Mono pipe() { return Mono.fromCallable(() -> { td