Big rewrite, failover and automatic bot deploying

This commit is contained in:
Andrea Cavalli 2020-10-28 12:04:42 +01:00
parent 7f8300fec5
commit aaf6d79b2b
15 changed files with 528 additions and 184 deletions

View File

@ -1,5 +1,5 @@
package it.tdlight.tdlibsession;
public enum FatalErrorType {
ACCESS_TOKEN_INVALID, PHONE_NUMBER_INVALID, CONNECTION_KILLED
ACCESS_TOKEN_INVALID, PHONE_NUMBER_INVALID, CONNECTION_KILLED, INVALID_UPDATE
}

View File

@ -0,0 +1,7 @@
package it.tdlight.tdlibsession.remoteclient;
public enum DeployClientResult {
DEPLOYED,
IGNORED,
FAILED
}

View File

@ -0,0 +1,42 @@
package it.tdlight.tdlibsession.remoteclient;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
public class RemoteClientBotAddresses {
private final Set<String> addresses;
private final Path addressesFilePath;
public RemoteClientBotAddresses(Path addressesFilePath) throws IOException {
this.addressesFilePath = addressesFilePath;
if (Files.notExists(addressesFilePath)) {
Files.createFile(addressesFilePath);
}
addresses = Files.readAllLines(addressesFilePath, StandardCharsets.UTF_8).stream().filter(address -> !address.isBlank()).collect(Collectors.toSet());
}
public synchronized void putAddress(String address) throws IOException {
addresses.add(address);
Files.write(addressesFilePath, addresses, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.SYNC);
}
public synchronized void removeAddress(String address) throws IOException {
addresses.remove(address);
Files.write(addressesFilePath, addresses, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.SYNC);
}
public synchronized boolean has(String botAddress) {
return addresses.contains(botAddress);
}
public synchronized Set<String> values() {
return new HashSet<>(addresses);
}
}

View File

@ -1,28 +1,31 @@
package it.tdlight.tdlibsession.remoteclient;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.Lock;
import it.tdlight.common.Init;
import it.tdlight.common.utils.CantLoadLibrary;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer;
import it.tdlight.utils.MonoUtils;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
public class TDLibRemoteClient implements AutoCloseable {
@ -33,16 +36,14 @@ public class TDLibRemoteClient implements AutoCloseable {
private final String netInterface;
private final int port;
private final Set<String> membersAddresses;
private final LinkedHashSet<String> botIds;
private final ReplayProcessor<TdClusterManager> clusterManager = ReplayProcessor.cacheLast();
private final Many<TdClusterManager> clusterManager = Sinks.many().replay().latest();
public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set<String> membersAddresses, Set<String> botIds) {
public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set<String> membersAddresses) {
this.securityInfo = securityInfo;
this.masterHostname = masterHostname;
this.netInterface = netInterface;
this.port = port;
this.membersAddresses = membersAddresses;
this.botIds = new LinkedHashSet<>(botIds);
try {
Init.start();
@ -66,19 +67,17 @@ public class TDLibRemoteClient implements AutoCloseable {
Set<String> membersAddresses = Set.of(args[2].split(","));
Set<String> botIds = Set.of(args[3].split(","));
Path keyStorePath = Paths.get(args[4]);
Path keyStorePasswordPath = Paths.get(args[5]);
Path trustStorePath = Paths.get(args[6]);
Path trustStorePasswordPath = Paths.get(args[7]);
Path keyStorePath = Paths.get(args[3]);
Path keyStorePasswordPath = Paths.get(args[4]);
Path trustStorePath = Paths.get(args[5]);
Path trustStorePasswordPath = Paths.get(args[6]);
var loggerContext = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false);
loggerContext.setConfigLocation(TDLibRemoteClient.class.getResource("/tdlib-session-container-log4j2.xml").toURI());
var securityInfo = new SecurityInfo(keyStorePath, keyStorePasswordPath, trustStorePath, trustStorePasswordPath);
new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses, botIds).run(x -> {});
new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses).run(x -> {});
}
public void start(Handler<Void> startedEventHandler) throws IllegalStateException {
@ -97,6 +96,9 @@ public class TDLibRemoteClient implements AutoCloseable {
logger.info("TDLib remote client is being hosted on" + netInterface + ":" + port + ". Master: " + masterHostname);
var botAddresses = new RemoteClientBotAddresses(Paths.get("remote_client_bot_addresses.txt"));
botAddresses.values().forEach(botAddress -> logger.info("Bot address is registered on this cluster:" + botAddress));
var keyStoreOptions = new JksOptions()
.setPath(securityInfo.getKeyStorePath().toAbsolutePath().toString())
.setPassword(securityInfo.getKeyStorePassword());
@ -105,40 +107,93 @@ public class TDLibRemoteClient implements AutoCloseable {
.setPath(securityInfo.getTrustStorePath().toAbsolutePath().toString())
.setPassword(securityInfo.getTrustStorePassword());
Mono<TdClusterManager> flux;
if (!botIds.isEmpty()) {
flux = TdClusterManager.ofNodes(keyStoreOptions,
TdClusterManager.ofNodes(keyStoreOptions,
trustStoreOptions,
false,
masterHostname,
netInterface,
port,
membersAddresses
);
} else {
flux = Mono.empty();
}
)
.doOnNext(clusterManager::tryEmitNext)
.doOnTerminate(clusterManager::tryEmitComplete)
.doOnError(clusterManager::tryEmitError)
.flatMapMany(clusterManager -> {
return Flux.create(sink -> {
var sharedData = clusterManager.getSharedData();
sharedData.getClusterWideMap("deployableBotAddresses", mapResult -> {
if (mapResult.succeeded()) {
var deployableBotAddresses = mapResult.result();
flux
.doOnNext(clusterManager::onNext)
.doOnTerminate(clusterManager::onComplete)
.doOnError(clusterManager::onError)
.flatMapIterable(clusterManager -> botIds
.stream()
.map(id -> Map.entry(clusterManager, id))
.collect(Collectors.toList()))
.flatMap(entry -> Mono.<String>create(sink -> {
entry
.getKey()
.getVertx()
.deployVerticle(new AsyncTdMiddleEventBusServer(entry.getKey()),
entry.getKey().newDeploymentOpts().setConfig(new JsonObject()
.put("botAddress", entry.getValue())
.put("botAlias", entry.getValue())
.put("local", false)),
MonoUtils.toHandler(sink)
);
}))
sharedData.getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> {
if (lockAcquisitionResult.succeeded()) {
var deploymentLock = lockAcquisitionResult.result();
putAllAsync(deployableBotAddresses, botAddresses.values(), (AsyncResult<Void> putAllResult) -> {
if (putAllResult.succeeded()) {
clusterManager
.getEventBus()
.consumer("tdlib.remoteclient.clients.deploy", (Message<String> msg) -> {
var botAddress = msg.body();
if (botAddresses.has(botAddress)) {
deployBot(clusterManager, botAddress, deploymentResult -> {
if (deploymentResult.failed()) {
msg.fail(500, "Failed to deploy existing bot \"" + botAddress + "\": " + deploymentResult.cause().getLocalizedMessage());
sink.error(deploymentResult.cause());
} else {
sink.next(botAddress);
}
deploymentLock.release();
});
} else {
logger.info("Deploying new bot at address \"" + botAddress + "\"");
deployableBotAddresses.putIfAbsent(botAddress, netInterface, putResult -> {
if (putResult.succeeded()) {
if (putResult.result() == null) {
try {
botAddresses.putAddress(botAddress);
} catch (IOException e) {
logger.error("Can't save bot address \"" + botAddress + "\" to addresses file", e);
}
deployBot(clusterManager, botAddress, deploymentResult -> {
if (deploymentResult.failed()) {
msg.fail(500, "Failed to deploy new bot \"" + botAddress + "\": " + deploymentResult.cause().getLocalizedMessage());
sink.error(deploymentResult.cause());
} else {
sink.next(botAddress);
}
deploymentLock.release();
});
} else {
logger.error("Can't add new bot address \"" + botAddress + "\" because it's already present! Value: \"" + putResult.result() + "\"");
sink.error(new UnsupportedOperationException("Can't add new bot address \"" + botAddress + "\" because it's already present! Value: \"" + putResult.result() + "\""));
deploymentLock.release();
}
} else {
logger.error("Can't update shared map", putResult.cause());
sink.error(putResult.cause());
deploymentLock.release();
}
});
}
});
} else {
logger.error("Can't update shared map", putAllResult.cause());
sink.error(putAllResult.cause());
deploymentLock.release();
}
});
} else {
logger.error("Can't obtain deployment lock", lockAcquisitionResult.cause());
sink.error(lockAcquisitionResult.cause());
}
});
} else {
logger.error("Can't get shared map", mapResult.cause());
sink.error(mapResult.cause());
}
});
});
})
.doOnError(ex -> {
logger.error(ex.getLocalizedMessage(), ex);
}).subscribe(i -> {}, e -> {}, () -> startedEventHandler.handle(null));
@ -147,8 +202,85 @@ public class TDLibRemoteClient implements AutoCloseable {
}
}
private void deployBot(TdClusterManager clusterManager, String botAddress, Handler<AsyncResult<String>> deploymentHandler) {
AsyncTdMiddleEventBusServer verticle = new AsyncTdMiddleEventBusServer(clusterManager);
AtomicReference<Lock> deploymentLock = new AtomicReference<>();
verticle.onBeforeStop(handler -> {
clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> {
if (lockAcquisitionResult.succeeded()) {
deploymentLock.set(lockAcquisitionResult.result());
var sharedData = clusterManager.getSharedData();
sharedData.getClusterWideMap("deployableBotAddresses", (AsyncResult<AsyncMap<String, String>> mapResult) -> {
if (mapResult.succeeded()) {
var deployableBotAddresses = mapResult.result();
deployableBotAddresses.removeIfPresent(botAddress, netInterface, putResult -> {
if (putResult.succeeded()) {
if (putResult.result() != null) {
handler.complete();
} else {
handler.fail("Can't destroy bot with address \"" + botAddress + "\" because it has been already destroyed");
}
} else {
handler.fail(putResult.cause());
}
});
} else {
handler.fail(mapResult.cause());
}
});
} else {
handler.fail(lockAcquisitionResult.cause());
}
});
});
verticle.onAfterStop(handler -> {
if (deploymentLock.get() != null) {
deploymentLock.get().release();
}
handler.complete();
});
clusterManager
.getVertx()
.deployVerticle(verticle,
clusterManager
.newDeploymentOpts()
.setConfig(new JsonObject()
.put("botAddress", botAddress)
.put("botAlias", botAddress)
.put("local", false)),
(deployed) -> {
if (deployed.failed()) {
logger.error("Can't deploy bot \"" + botAddress + "\"", deployed.cause());
}
deploymentHandler.handle(deployed);
}
);
}
private void putAllAsync(AsyncMap<Object, Object> sharedMap,
Set<String> valuesToAdd,
Handler<AsyncResult<Void>> resultHandler) {
if (valuesToAdd.isEmpty()) {
resultHandler.handle(Future.succeededFuture());
} else {
var valueToAdd = valuesToAdd.stream().findFirst().get();
valuesToAdd.remove(valueToAdd);
sharedMap.putIfAbsent(valueToAdd, netInterface, result -> {
if (result.succeeded()) {
if (result.result() == null || result.result().equals(netInterface)) {
putAllAsync(sharedMap, valuesToAdd, resultHandler);
} else {
resultHandler.handle(Future.failedFuture(new UnsupportedOperationException("Key already present! Key: \"" + valueToAdd + "\", Value: \"" + result.result() + "\"")));
}
} else {
resultHandler.handle(Future.failedFuture(result.cause()));
}
});
}
}
@Override
public void close() {
clusterManager.blockFirst();
clusterManager.asFlux().blockFirst();
}
}

View File

@ -3,7 +3,6 @@ package it.tdlight.tdlibsession.td.direct;
import io.vertx.core.AsyncResult;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Update;
import it.tdlight.tdlibsession.td.TdResult;
import java.time.Duration;
import reactor.core.publisher.Flux;
@ -20,7 +19,7 @@ public interface AsyncTdDirect {
* @return An incoming update or request response list. The object returned in the response may be
* an empty list if the timeout expires.
*/
Flux<AsyncResult<TdResult<Update>>> getUpdates(Duration receiveDuration, int eventsSize);
Flux<AsyncResult<TdResult<TdApi.Object>>> getUpdates(Duration receiveDuration, int eventsSize);
/**
* Sends request to TDLib. May be called from any thread.

View File

@ -5,9 +5,10 @@ import io.vertx.core.Future;
import it.tdlight.common.TelegramClient;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.Close;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Object;
import it.tdlight.jni.TdApi.Update;
import it.tdlight.jni.TdApi.Ok;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlight.ClientManager;
@ -31,7 +32,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
private final Scheduler tdExecScheduler = Schedulers.newSingle("TdExec");
private final Scheduler tdResponsesOutputScheduler = Schedulers.boundedElastic();
private Flux<AsyncResult<TdResult<Update>>> updatesProcessor;
private Flux<AsyncResult<TdResult<TdApi.Object>>> updatesProcessor;
private final String botAlias;
public AsyncTdDirectImpl(String botAlias) {
@ -42,15 +43,32 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, boolean synchronous) {
if (synchronous) {
return Mono
.fromCallable(() -> TdResult.<T>of(this.td.get().execute(request)))
.fromCallable(() -> {
var td = this.td.get();
if (td == null) {
if (request.getConstructor() == Close.CONSTRUCTOR) {
return TdResult.<T>of(new Ok());
}
throw new IllegalStateException("TDLib client is destroyed");
}
return TdResult.<T>of(td.execute(request));
})
.subscribeOn(tdResponsesScheduler)
.publishOn(tdExecScheduler);
} else {
return Mono.<TdResult<T>>create(sink -> {
try {
this.td.get().send(request, v -> {
sink.success(TdResult.of(v));
}, sink::error);
var td = this.td.get();
if (td == null) {
if (request.getConstructor() == Close.CONSTRUCTOR) {
sink.success(TdResult.<T>of(new Ok()));
}
sink.error(new IllegalStateException("TDLib client is destroyed"));
} else {
td.send(request, v -> {
sink.success(TdResult.of(v));
}, sink::error);
}
} catch (Throwable t) {
sink.error(t);
}
@ -59,14 +77,14 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
}
@Override
public Flux<AsyncResult<TdResult<Update>>> getUpdates(Duration receiveDuration, int eventsSize) {
public Flux<AsyncResult<TdResult<TdApi.Object>>> getUpdates(Duration receiveDuration, int eventsSize) {
return updatesProcessor;
}
@Override
public Mono<Void> initializeClient() {
return Mono.<Boolean>create(sink -> {
var updatesConnectableFlux = Flux.<AsyncResult<TdResult<Update>>>create(emitter -> {
var updatesConnectableFlux = Flux.<AsyncResult<TdResult<TdApi.Object>>>create(emitter -> {
var client = ClientManager.create((Object object) -> {
emitter.next(Future.succeededFuture(TdResult.of(object)));
// Close the emitter if receive closed state
@ -100,9 +118,10 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
@Override
public Mono<Void> destroyClient() {
return Mono.fromCallable(() -> {
// do nothing
return (Void) null;
}).single().subscribeOn(tdScheduler).publishOn(tdResponsesOutputScheduler);
return this
.execute(new TdApi.Close(), false)
.then()
.subscribeOn(tdScheduler)
.publishOn(tdResponsesOutputScheduler);
}
}

View File

@ -138,7 +138,7 @@ public class AsyncTdEasy {
* Receives fatal errors from TDLib.
*/
public Flux<FatalErrorType> getFatalErrors() {
return Flux.from(fatalErrors);
return Flux.from(fatalErrors).publishOn(Schedulers.boundedElastic());
}
/**
@ -149,8 +149,8 @@ public class AsyncTdEasy {
return td.<T>execute(request, false);
}
private <T extends TdApi.Object> Mono<TdResult<T>> sendDirectly(TdApi.Function obj) {
return td.execute(obj, false);
private <T extends TdApi.Object> Mono<TdResult<T>> sendDirectly(TdApi.Function obj, boolean synchronous) {
return td.execute(obj, synchronous);
}
/**
@ -158,7 +158,7 @@ public class AsyncTdEasy {
* @param i level
*/
public Mono<Void> setVerbosityLevel(int i) {
return MonoUtils.thenOrError(sendDirectly(new TdApi.SetLogVerbosityLevel(i)));
return MonoUtils.thenOrError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true));
}
/**
@ -166,7 +166,7 @@ public class AsyncTdEasy {
* @param name option name
*/
public Mono<Void> clearOption(String name) {
return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty())));
return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false));
}
/**
@ -175,7 +175,7 @@ public class AsyncTdEasy {
* @param value option value
*/
public Mono<Void> setOptionString(String name, String value) {
return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value))));
return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false));
}
/**
@ -184,7 +184,7 @@ public class AsyncTdEasy {
* @param value option value
*/
public Mono<Void> setOptionInteger(String name, long value) {
return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value))));
return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false));
}
/**
@ -193,7 +193,7 @@ public class AsyncTdEasy {
* @param value option value
*/
public Mono<Void> setOptionBoolean(String name, boolean value) {
return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value))));
return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false));
}
/**
@ -202,7 +202,7 @@ public class AsyncTdEasy {
* @return The value or nothing
*/
public Mono<String> getOptionString(String name) {
return this.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name)).<OptionValue>flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> {
return this.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name), false).<OptionValue>flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> {
switch (value.getConstructor()) {
case OptionValueString.CONSTRUCTOR:
return Mono.just(((OptionValueString) value).value);
@ -221,7 +221,7 @@ public class AsyncTdEasy {
* @return The value or nothing
*/
public Mono<Long> getOptionInteger(String name) {
return this.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name)).<TdApi.OptionValue>flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> {
return this.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name), false).<TdApi.OptionValue>flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> {
switch (value.getConstructor()) {
case OptionValueInteger.CONSTRUCTOR:
return Mono.just(((OptionValueInteger) value).value);
@ -240,7 +240,7 @@ public class AsyncTdEasy {
* @return The value or nothing
*/
public Mono<Boolean> getOptionBoolean(String name) {
return this.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name)).<TdApi.OptionValue>flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> {
return this.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name), false).<TdApi.OptionValue>flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> {
switch (value.getConstructor()) {
case OptionValueBoolean.CONSTRUCTOR:
return Mono.just(((OptionValueBoolean) value).value);
@ -289,6 +289,16 @@ public class AsyncTdEasy {
.filter(closeRequested -> !closeRequested)
.doOnSuccess(v -> requestedDefinitiveExit.onNext(true))
.then(td.execute(new TdApi.Close(), false))
.doOnNext(ok -> {
logger.debug("Received Ok after TdApi.Close");
})
.then(authState
.filter(authorizationState -> authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR)
.take(1)
.singleOrEmpty())
.doOnNext(ok -> {
logger.info("Received AuthorizationStateClosed after TdApi.Close");
})
.then();
}
@ -308,17 +318,17 @@ public class AsyncTdEasy {
//todo: do this
}
private Mono<? extends Object> catchErrors(Object obj) {
private Mono<Update> catchErrors(Object obj) {
if (obj.getConstructor() == Error.CONSTRUCTOR) {
var error = (Error) obj;
switch (error.message) {
case "PHONE_CODE_INVALID":
globalErrors.onNext(error);
return Mono.just(new AuthorizationStateWaitCode());
return Mono.just(new UpdateAuthorizationState(new AuthorizationStateWaitCode()));
case "PASSWORD_HASH_INVALID":
globalErrors.onNext(error);
return Mono.just(new AuthorizationStateWaitPassword());
return Mono.just(new UpdateAuthorizationState(new AuthorizationStateWaitPassword()));
case "PHONE_NUMBER_INVALID":
fatalErrors.onNext(FatalErrorType.PHONE_NUMBER_INVALID);
break;
@ -328,20 +338,24 @@ public class AsyncTdEasy {
case "CONNECTION_KILLED":
fatalErrors.onNext(FatalErrorType.CONNECTION_KILLED);
break;
case "INVALID_UPDATE":
fatalErrors.onNext(FatalErrorType.INVALID_UPDATE);
break;
default:
globalErrors.onNext(error);
break;
}
return Mono.empty();
} else {
return Mono.just((Update) obj);
}
return Mono.just(obj);
}
public Mono<Boolean> isBot() {
return Mono.from(settings).single().map(TdEasySettings::isBotTokenSet);
}
private Publisher<Update> preprocessUpdates(Update updateObj) {
private Publisher<TdApi.Update> preprocessUpdates(TdApi.Object updateObj) {
return Mono
.just(updateObj)
.flatMap(this::catchErrors)
@ -368,28 +382,28 @@ public class AsyncTdEasy {
parameters.enableStorageOptimizer = settings.enableStorageOptimizer;
parameters.ignoreFileNames = settings.ignoreFileNames;
return new SetTdlibParameters(parameters);
}).flatMap(this::sendDirectly));
}).flatMap((SetTdlibParameters obj1) -> sendDirectly(obj1, false)));
case AuthorizationStateWaitEncryptionKey.CONSTRUCTOR:
return MonoUtils
.thenOrError(sendDirectly(new CheckDatabaseEncryptionKey()))
.thenOrError(sendDirectly(new CheckDatabaseEncryptionKey(), false))
.onErrorResume((error) -> {
logger.error("Error while checking TDLib encryption key", error);
return sendDirectly(new TdApi.Close()).then();
return sendDirectly(new TdApi.Close(), false).then();
});
case AuthorizationStateWaitPhoneNumber.CONSTRUCTOR:
return MonoUtils.thenOrError(Mono.from(this.settings).flatMap(settings -> {
if (settings.isPhoneNumberSet()) {
return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()),
new PhoneNumberAuthenticationSettings(false, false, false)
));
), false);
} else if (settings.isBotTokenSet()) {
return sendDirectly(new CheckAuthenticationBotToken(settings.getBotToken()));
return sendDirectly(new CheckAuthenticationBotToken(settings.getBotToken()), false);
} else {
return Mono.error(new IllegalArgumentException("A bot is neither an user or a bot"));
}
})).onErrorResume((error) -> {
logger.error("Error while waiting for phone number", error);
return sendDirectly(new TdApi.Close()).then();
return sendDirectly(new TdApi.Close(), false).then();
});
case AuthorizationStateWaitRegistration.CONSTRUCTOR:
var authorizationStateWaitRegistration = (AuthorizationStateWaitRegistration) obj;
@ -421,7 +435,7 @@ public class AsyncTdEasy {
.defaultIfEmpty("")
.doOnNext(lastName -> registerUser.lastName = lastName)
)
.then(sendDirectly(registerUser)));
.then(sendDirectly(registerUser, false)));
});
case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR:
var authorizationStateWaitOtherDeviceConfirmation = (AuthorizationStateWaitOtherDeviceConfirmation) obj;
@ -445,7 +459,7 @@ public class AsyncTdEasy {
authorizationStateWaitCode.codeInfo.timeout,
authorizationStateWaitCode.codeInfo.type
)
).flatMap(code -> sendDirectly(new CheckAuthenticationCode(code))));
).flatMap(code -> sendDirectly(new CheckAuthenticationCode(code), false)));
});
case AuthorizationStateWaitPassword.CONSTRUCTOR:
var authorizationStateWaitPassword = (AuthorizationStateWaitPassword) obj;
@ -455,7 +469,7 @@ public class AsyncTdEasy {
.flatMap(handler -> {
return MonoUtils.thenOrLogRepeatError(() -> handler.onParameterRequest(Parameter.ASK_PASSWORD,
new ParameterInfoPasswordHint(authorizationStateWaitPassword.passwordHint)
).flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password))));
).flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password), false)));
});
case AuthorizationStateReady.CONSTRUCTOR: {
this.authState.onNext(new AuthorizationStateReady());
@ -504,6 +518,6 @@ public class AsyncTdEasy {
return Mono.empty();
}
})
.thenReturn(updateObj);
.then(Mono.justOrEmpty(updateObj.getConstructor() == Error.CONSTRUCTOR ? null : (Update) updateObj));
}
}

View File

@ -10,9 +10,9 @@ public interface AsyncTdMiddle {
/**
* Receives incoming updates from TDLib.
*
* @return Updates
* @return Updates (or Error if received a fatal error. A fatal error means that the client is no longer working)
*/
Flux<TdApi.Update> getUpdates();
Flux<TdApi.Object> getUpdates();
/**
* Sends request to TDLib. May be called from any thread.

View File

@ -19,8 +19,10 @@ import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.ClientAuth;
import io.vertx.core.net.JksOptions;
import io.vertx.core.shareddata.SharedData;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
import it.tdlight.utils.MonoUtils;
import java.nio.channels.AlreadyBoundException;
import java.util.ArrayList;
import java.util.Collections;
@ -30,7 +32,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jetbrains.annotations.Nullable;
import it.tdlight.utils.MonoUtils;
import reactor.core.publisher.Mono;
public class TdClusterManager {
@ -229,4 +230,8 @@ public class TdClusterManager {
public DeploymentOptions newDeploymentOpts() {
return new DeploymentOptions().setWorkerPoolName("td-main-pool");
}
public SharedData getSharedData() {
return vertx.sharedData();
}
}

View File

@ -4,7 +4,6 @@ import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Error;
import it.tdlight.jni.TdApi.Update;
import it.tdlight.tdlibsession.td.TdResult;
import it.unimi.dsi.fastutil.io.FastByteArrayInputStream;
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
@ -27,7 +26,7 @@ public class TdOptListMessageCodec implements MessageCodec<TdOptionalList, TdOpt
if (ts.isSet()) {
var t = ts.getValues();
dos.writeInt(t.size());
for (TdResult<Update> t1 : t) {
for (TdResult<TdApi.Object> t1 : t) {
if (t1.succeeded()) {
dos.writeBoolean(true);
t1.result().serialize(dos);
@ -55,10 +54,10 @@ public class TdOptListMessageCodec implements MessageCodec<TdOptionalList, TdOpt
if (size < 0) {
return new TdOptionalList(false, Collections.emptyList());
} else {
ArrayList<TdResult<TdApi.Update>> list = new ArrayList<>();
ArrayList<TdResult<TdApi.Object>> list = new ArrayList<>();
for (int i = 0; i < size; i++) {
if (dis.readBoolean()) {
list.add(TdResult.succeeded((Update) TdApi.Deserializer.deserialize(dis)));
list.add(TdResult.succeeded((TdApi.Object) TdApi.Deserializer.deserialize(dis)));
} else {
list.add(TdResult.failed((Error) TdApi.Deserializer.deserialize(dis)));
}

View File

@ -8,9 +8,9 @@ import java.util.StringJoiner;
public class TdOptionalList {
private final boolean isSet;
private final List<TdResult<TdApi.Update>> values;
private final List<TdResult<TdApi.Object>> values;
public TdOptionalList(boolean isSet, List<TdResult<TdApi.Update>> values) {
public TdOptionalList(boolean isSet, List<TdResult<TdApi.Object>> values) {
this.isSet = isSet;
this.values = values;
}
@ -19,7 +19,7 @@ public class TdOptionalList {
return isSet;
}
public List<TdResult<TdApi.Update>> getValues() {
public List<TdResult<TdApi.Object>> getValues() {
return values;
}

View File

@ -7,12 +7,13 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import it.tdlight.common.ConstructorDetector;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.Error;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Update;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.td.ResponseError;
import it.tdlight.tdlibsession.td.TdResult;
@ -30,6 +31,7 @@ import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
@ -41,7 +43,8 @@ import reactor.core.publisher.ReplayProcessor;
public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements AsyncTdMiddle {
private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusClient.class);
private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusClient.class );
public static final boolean OUTPUT_REQUESTS = false;
public static final byte[] EMPTY = new byte[0];
@ -49,7 +52,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
private final DeliveryOptions deliveryOptions;
private final DeliveryOptions deliveryOptionsWithTimeout;
private ReplayProcessor<Flux<Update>> incomingUpdatesCo = ReplayProcessor.cacheLast();
private ReplayProcessor<Flux<TdApi.Object>> incomingUpdatesCo = ReplayProcessor.cacheLast();
private TdClusterManager cluster;
@ -57,6 +60,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
private String botAlias;
private boolean local;
private long initTime;
private MessageConsumer<byte[]> readyToStartConsumer;
@SuppressWarnings({"unchecked", "rawtypes"})
public AsyncTdMiddleEventBusClient(TdClusterManager clusterManager) {
@ -121,37 +125,41 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
startBreaker.execute(future -> {
try {
logger.error("Requesting " + botAddress + ".ping");
cluster
.getEventBus()
.request(botAddress + ".ping", EMPTY, deliveryOptions, pingMsg -> {
if (pingMsg.succeeded()) {
logger.error("Received ping reply (succeeded)");
logger.error("Requesting " + botAddress + ".start");
logger.debug("Requesting tdlib.remoteclient.clients.deploy.existing");
cluster.getEventBus().publish("tdlib.remoteclient.clients.deploy", botAddress, deliveryOptions);
logger.debug("Waiting for " + botAddress + ".readyToStart");
AtomicBoolean alreadyReceived = new AtomicBoolean(false);
this.readyToStartConsumer = cluster.getEventBus().consumer(botAddress + ".readyToStart", (Message<byte[]> pingMsg) -> {
// Reply instantly
pingMsg.reply(new byte[0]);
if (!alreadyReceived.getAndSet(true)) {
logger.debug("Received ping reply (succeeded)");
logger.debug("Requesting " + botAddress + ".start");
cluster
.getEventBus()
.request(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout, startMsg -> {
if (startMsg.succeeded()) {
logger.debug("Requesting " + botAddress + ".isWorking");
cluster
.getEventBus()
.request(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout, startMsg -> {
if (startMsg.succeeded()) {
logger.error("Requesting " + botAddress + ".isWorking");
cluster
.getEventBus()
.request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> {
if (msg.succeeded()) {
this.listen().then(this.pipe()).timeout(Duration.ofSeconds(10)).subscribe(v -> {}, future::fail, future::complete);
} else {
future.fail(msg.cause());
}
});
.request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> {
if (msg.succeeded()) {
this.listen().then(this.pipe()).timeout(Duration.ofSeconds(10)).subscribe(v -> {}, future::fail, future::complete);
} else {
future.fail(startMsg.cause());
future.fail(msg.cause());
}
});
} else {
logger.error("Received ping reply (failed) (local=" + local + ")", pingMsg.cause());
future.fail(pingMsg.cause());
future.fail(startMsg.cause());
}
}
);
});
} else {
// Already received
}
});
} catch (Exception ex) {
future.fail(ex);
}
@ -165,8 +173,10 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
@Override
public void stop(Promise<Void> stopPromise) {
tdClosed.onNext(true);
stopPromise.complete();
readyToStartConsumer.unregister(result -> {
tdClosed.onNext(true);
stopPromise.complete();
});
}
private Mono<Void> listen() {
@ -178,7 +188,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
var updates = this.requestUpdatesBatchFromNetwork()
.repeatWhen(nFlux -> {
return Flux.push(emitter -> {
var dispos = Flux.combineLatest(nFlux, tdClosed, Pair::of).subscribe(val -> {
var dispos = Flux.combineLatest(nFlux, tdClosed.distinct(), Pair::of).subscribe(val -> {
//noinspection PointlessBooleanExpression
if (val.getRight() == true) {
emitter.complete();
@ -194,12 +204,19 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
});
}) // Repeat when there is one batch with a flux of updates
.flatMap(batch -> batch)
.onErrorResume(error -> {
logger.error("Bot updates request failed! Marking as closed.", error);
if (error.getMessage().contains("Timed out")) {
return Flux.just(new Error(444, "CONNECTION_KILLED"));
} else {
return Flux.just(new Error(406, "INVALID_UPDATE"));
}
})
.flatMap(update -> {
return Mono.<Update>create(sink -> {
return Mono.<TdApi.Object>create(sink -> {
if (update.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
var state = (UpdateAuthorizationState) update;
if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) {
tdClosed.onNext(true);
this.getVertx().undeploy(this.deploymentID(), undeployed -> {
if (undeployed.failed()) {
logger.error("Error when undeploying td verticle", undeployed.cause());
@ -225,10 +242,10 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
}
private static class UpdatesBatchResult {
public final Flux<Update> updatesFlux;
public final Flux<TdApi.Object> updatesFlux;
public final boolean completed;
private UpdatesBatchResult(Flux<Update> updatesFlux, boolean completed) {
private UpdatesBatchResult(Flux<TdApi.Object> updatesFlux, boolean completed) {
this.updatesFlux = updatesFlux;
this.completed = completed;
}
@ -242,15 +259,15 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
}
}
private Mono<Flux<TdApi.Update>> requestUpdatesBatchFromNetwork() {
private Mono<Flux<TdApi.Object>> requestUpdatesBatchFromNetwork() {
return Mono
.from(tdClosed)
.from(tdClosed.distinct())
.single()
.filter(tdClosed -> !tdClosed)
.flatMap(_x -> Mono.<Flux<TdApi.Update>>create(sink -> {
.flatMap(_x -> Mono.<Flux<TdApi.Object>>create(sink -> {
cluster.getEventBus().<TdOptionalList>request(botAddress + ".getNextUpdatesBlock",
EMPTY,
deliveryOptions,
deliveryOptionsWithTimeout,
msg -> {
if (msg.failed()) {
//if (System.currentTimeMillis() - initTime <= 30000) {
@ -267,8 +284,8 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
} else {
var resultBody = msg.result().body();
if (resultBody.isSet()) {
List<TdResult<Update>> updates = resultBody.getValues();
for (TdResult<Update> updateObj : updates) {
List<TdResult<TdApi.Object>> updates = resultBody.getValues();
for (TdResult<TdApi.Object> updateObj : updates) {
if (updateObj.succeeded()) {
if (OUTPUT_REQUESTS) {
System.out.println(" <- " + updateObj.result()
@ -297,7 +314,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
}
@Override
public Flux<Update> getUpdates() {
public Flux<TdApi.Object> getUpdates() {
return incomingUpdatesCo.filter(Objects::nonNull).flatMap(v -> v);
}
@ -313,7 +330,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
.replace(" = ", "="));
}
return Mono.from(tdClosed).single().filter(tdClosed -> !tdClosed).<TdResult<T>>flatMap((_x) -> Mono.create(sink -> {
return Mono.from(tdClosed.distinct()).single().filter(tdClosed -> !tdClosed).<TdResult<T>>flatMap((_x) -> Mono.create(sink -> {
try {
cluster
.getEventBus()

View File

@ -5,20 +5,18 @@ import static it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServ
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Object;
import it.tdlight.jni.TdApi.Update;
import it.tdlight.tdlibsession.td.ResponseError;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.utils.MonoUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.warp.commonutils.error.InitializationException;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
@ -31,7 +29,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
protected AsyncTdDirectImpl td;
private String botAddress;
private String botAlias;
private Flux<Update> updatesFluxCo;
private Flux<TdApi.Object> updatesFluxCo;
public AsyncTdMiddleDirect() {
}
@ -78,11 +76,11 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
logger.error("Received an errored update",
ResponseError.newResponseError("incoming update", botAlias, result.result().cause())
);
return Mono.<Update>empty();
return Mono.<TdApi.Object>empty();
}
} else {
logger.error("Received an errored update", result.cause());
return Mono.<Update>empty();
return Mono.<TdApi.Object>empty();
}
})).publish().refCount(1);
startPromise.complete();
@ -105,7 +103,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
}
@Override
public Flux<Update> getUpdates() {
public Flux<TdApi.Object> getUpdates() {
return Flux.from(updatesFluxCo);
}

View File

@ -1,19 +1,18 @@
package it.tdlight.tdlibsession.td.middle.direct;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.json.JsonObject;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Object;
import it.tdlight.jni.TdApi.Update;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClient;
import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer;
import it.tdlight.utils.MonoUtils;
import java.util.Objects;
import org.warp.commonutils.error.InitializationException;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.utils.MonoUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
@ -56,7 +55,7 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle {
}
@Override
public Flux<Update> getUpdates() {
public Flux<TdApi.Object> getUpdates() {
return cli.filter(Objects::nonNull).single().flatMapMany(AsyncTdMiddleEventBusClient::getUpdates);
}

View File

@ -4,12 +4,14 @@ import static it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClie
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import it.tdlight.common.ConstructorDetector;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.Update;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlibsession.td.TdResultMessage;
@ -26,9 +28,11 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
@ -40,6 +44,7 @@ import reactor.core.scheduler.Schedulers;
public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusServer.class);
private static final byte[] EMPTY = new byte[0];
// todo: restore duration to 2 seconds instead of 10 millis, when the bug of tdlight double queue wait is fixed
public static final Duration WAIT_DURATION = Duration.ofSeconds(1);// Duration.ofMillis(10);
@ -54,8 +59,14 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
protected final ReplayProcessor<Boolean> tdClosed = ReplayProcessor.cacheLastOrDefault(false);
protected AsyncTdDirectImpl td;
protected final LinkedBlockingQueue<AsyncResult<TdResult<Update>>> queue = new LinkedBlockingQueue<>();
protected final LinkedBlockingQueue<AsyncResult<TdResult<TdApi.Object>>> queue = new LinkedBlockingQueue<>();
private final Scheduler tdSrvPoll;
private List<Consumer<Promise<Void>>> onBeforeStopListeners = new CopyOnWriteArrayList<>();
private List<Consumer<Promise<Void>>> onAfterStopListeners = new CopyOnWriteArrayList<>();
private MessageConsumer<?> startConsumer;
private MessageConsumer<byte[]> isWorkingConsumer;
private MessageConsumer<byte[]> getNextUpdatesBlockConsumer;
private MessageConsumer<ExecuteObject> executeConsumer;
@SuppressWarnings({"unchecked", "rawtypes"})
public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) {
@ -89,23 +100,18 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
this.local = local;
this.td = new AsyncTdDirectImpl(botAlias);
cluster.getEventBus().consumer(botAddress + ".ping", (Message<byte[]> msg) -> {
logger.error("Received ping. Replying...");
msg.reply(EMPTY);
logger.error("Replied.");
});
AtomicBoolean alreadyDeployed = new AtomicBoolean(false);
cluster.getEventBus().consumer(botAddress + ".start", (Message<byte[]> msg) -> {
this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message<byte[]> msg) -> {
if (alreadyDeployed.compareAndSet(false, true)) {
td.initializeClient()
.then(this.listen())
.then(this.pipe())
.then(Mono.<Void>create(registrationSink -> {
cluster.getEventBus().consumer(botAddress + ".isWorking", (Message<byte[]> workingMsg) -> {
this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message<byte[]> workingMsg) -> {
workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local));
}).completionHandler(MonoUtils.toHandler(registrationSink));
});
this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
}))
.subscribe(v -> {}, ex -> {
@ -119,7 +125,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
} else {
msg.reply(EMPTY);
}
}).completionHandler(h -> {
});
startConsumer.completionHandler(h -> {
logger.info(botAddress + " server deployed. succeeded: " + h.succeeded());
if (h.succeeded()) {
startPromise.complete(h.result());
@ -127,30 +134,131 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
startPromise.fail(h.cause());
}
});
logger.debug("Sending " + botAddress + ".readyToStart");
cluster.getEventBus().send(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(10000));
var clientDeadCheckThread = new Thread(() -> {
Throwable ex = null;
try {
while (!Thread.interrupted()) {
Thread.sleep(5000);
Promise<Void> promise = Promise.promise();
cluster
.getEventBus()
.request(botAddress + ".readyToStart",
EMPTY,
cluster.newDeliveryOpts().setSendTimeout(10000),
r -> promise.handle(r.mapEmpty())
);
promise.future().toCompletionStage().toCompletableFuture().join();
}
} catch (Throwable e) {
ex = e;
}
var closed = tdClosed.blockFirst();
if (closed == null || !closed) {
if (ex != null && !ex.getMessage().contains("NO_HANDLERS")) {
logger.error(ex.getLocalizedMessage(), ex);
}
logger.error("TDLib client disconnected unexpectedly! Closing the server...");
undeploy(() -> {});
}
});
clientDeadCheckThread.setName("Client " + botAddress + " dead check");
clientDeadCheckThread.setDaemon(true);
clientDeadCheckThread.start();
}
public void onBeforeStop(Consumer<Promise<Void>> r) {
this.onBeforeStopListeners.add(r);
}
public void onAfterStop(Consumer<Promise<Void>> r) {
this.onAfterStopListeners.add(r);
}
@Override
public void stop(Promise<Void> stopPromise) {
tdClosed.onNext(true);
td.destroyClient().onErrorResume(ex -> {
logger.error("Can't destroy client", ex);
return Mono.empty();
}).doOnTerminate(() -> {
logger.debug("TdMiddle verticle stopped");
}).subscribe(MonoUtils.toSubscriber(stopPromise));
runAll(onBeforeStopListeners, onBeforeStopHandler -> {
if (onBeforeStopHandler.failed()) {
logger.error("A beforeStop listener failed: "+ onBeforeStopHandler.cause());
}
td.destroyClient().onErrorResume(ex -> {
logger.error("Can't destroy client", ex);
return Mono.empty();
}).doOnError(err -> {
logger.error("TdMiddle verticle failed during stop", err);
}).then(Mono.create(sink -> {
this.isWorkingConsumer.unregister(result -> {
if (result.failed()) {
logger.error("Can't unregister consumer", result.cause());
}
this.startConsumer.unregister(result2 -> {
if (result2.failed()) {
logger.error("Can't unregister consumer", result2.cause());
}
tdClosed.onNext(true);
this.getNextUpdatesBlockConsumer.unregister(result3 -> {
if (result3.failed()) {
logger.error("Can't unregister consumer", result3.cause());
}
this.executeConsumer.unregister(result4 -> {
if (result4.failed()) {
logger.error("Can't unregister consumer", result4.cause());
}
sink.success();
});
});
});
});
})).doFinally(signalType -> {
logger.info("TdMiddle verticle \"" + botAddress + "\" stopped");
runAll(onAfterStopListeners, onAfterStopHandler -> {
if (onAfterStopHandler.failed()) {
logger.error("An afterStop listener failed: " + onAfterStopHandler.cause());
}
stopPromise.complete();
});
}).subscribe();
});
}
private void runAll(List<Consumer<Promise<Void>>> actions, Handler<AsyncResult<Void>> resultHandler) {
if (actions.isEmpty()) {
resultHandler.handle(Future.succeededFuture());
} else {
var firstAction = actions.remove(0);
Promise<Void> promise = Promise.promise();
firstAction.accept(promise);
promise.future().onComplete(handler -> {
if (handler.succeeded()) {
runAll(new ArrayList<>(actions), resultHandler);
} else {
resultHandler.handle(Future.failedFuture(handler.cause()));
}
});
}
}
private Mono<Void> listen() {
return Mono.<Void>create(registrationSink -> {
cluster.getEventBus().consumer(botAddress + ".getNextUpdatesBlock", (Message<byte[]> msg) -> {
this.getNextUpdatesBlockConsumer = cluster.getEventBus().consumer(botAddress + ".getNextUpdatesBlock", (Message<byte[]> msg) -> {
// Run only if tdlib is not closed
Mono.from(tdClosed).single().filter(tdClosedVal -> !tdClosedVal)
// Get a list of updates
.flatMap(_v -> Mono
.<List<AsyncResult<TdResult<Update>>>>fromSupplier(() -> {
.<List<AsyncResult<TdResult<TdApi.Object>>>>fromSupplier(() -> {
// When a request is asked, read up to 1000 available updates in the queue
long requestTime = System.currentTimeMillis();
ArrayList<AsyncResult<TdResult<Update>>> updatesBatch = new ArrayList<>();
ArrayList<AsyncResult<TdResult<TdApi.Object>>> updatesBatch = new ArrayList<>();
try {
// Block until an update is found or 5 seconds passed
var item = queue.poll(5, TimeUnit.SECONDS);
@ -207,20 +315,14 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
if (received.succeeded() && received.result().getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
var authState = (UpdateAuthorizationState) received.result();
if (authState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) {
tdClosed.onNext(true);
vertx.undeploy(deploymentID(), undeployed -> {
if (undeployed.failed()) {
logger.error("Error when undeploying td verticle", undeployed.cause());
}
sink.success();
});
undeploy(sink::success);
} else {
sink.success();
}
} else {
sink.success();
}
}).then(Mono.<TdResult<Update>>create(sink -> {
}).then(Mono.<TdResult<TdApi.Object>>create(sink -> {
sink.success(received);
}));
} else {
@ -237,11 +339,12 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
logger.error("Error when processing a 'receiveUpdates' request", ex);
msg.fail(500, ex.getLocalizedMessage());
}, () -> {});
}).completionHandler(MonoUtils.toHandler(registrationSink));
});
getNextUpdatesBlockConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
}).then(Mono.<Void>create(registrationSink -> {
cluster.getEventBus().<ExecuteObject>consumer(botAddress + ".execute", (Message<ExecuteObject> msg) -> {
this.executeConsumer = cluster.getEventBus().<ExecuteObject>consumer(botAddress + ".execute", (Message<ExecuteObject> msg) -> {
try {
if (OUTPUT_REQUESTS) {
System.out.println(":=> " + msg
@ -276,11 +379,21 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
logger.error("Error when deserializing a request", ex);
msg.fail(500, ex.getMessage());
}
}).completionHandler(MonoUtils.toHandler(registrationSink));
});
executeConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
}));
}
private void undeploy(Runnable whenUndeployed) {
vertx.undeploy(deploymentID(), undeployed -> {
if (undeployed.failed()) {
logger.error("Error when undeploying td verticle", undeployed.cause());
}
whenUndeployed.run();
});
}
private Mono<Void> pipe() {
return Mono.fromCallable(() -> {
td