tdlib-session-container/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient...

375 lines
14 KiB
Java
Raw Normal View History

package it.tdlight.tdlibsession.td.middle.client;
2020-10-17 18:28:54 +02:00
import io.vertx.core.eventbus.DeliveryOptions;
2021-01-24 19:13:46 +01:00
import io.vertx.core.json.JsonObject;
2021-01-23 18:49:21 +01:00
import io.vertx.reactivex.core.Vertx;
2021-01-27 00:58:07 +01:00
import io.vertx.reactivex.core.buffer.Buffer;
2021-10-01 10:58:14 +02:00
import io.vertx.reactivex.core.eventbus.Message;
2021-01-23 22:33:52 +01:00
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import it.tdlight.jni.TdApi;
2021-03-06 13:35:11 +01:00
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Object;
2021-03-06 13:35:11 +01:00
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.td.ResponseError;
2021-01-23 18:49:21 +01:00
import it.tdlight.tdlibsession.td.TdError;
import it.tdlight.tdlibsession.td.TdResult;
2021-01-26 16:29:45 +01:00
import it.tdlight.tdlibsession.td.middle.TdResultMessage;
import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle;
2021-01-23 22:33:52 +01:00
import it.tdlight.tdlibsession.td.middle.EndSessionMessage;
import it.tdlight.tdlibsession.td.middle.ExecuteObject;
2021-01-23 18:49:21 +01:00
import it.tdlight.tdlibsession.td.middle.StartSessionMessage;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.tdlibsession.td.middle.TdResultList;
2021-01-23 18:49:21 +01:00
import it.tdlight.utils.BinlogAsyncFile;
import it.tdlight.utils.BinlogUtils;
import it.tdlight.utils.MonoUtils;
2021-01-24 23:08:14 +01:00
import java.net.ConnectException;
2021-01-23 18:49:21 +01:00
import java.nio.file.Path;
2021-01-24 03:15:45 +01:00
import java.time.Duration;
2021-09-30 19:18:25 +02:00
import java.util.concurrent.atomic.AtomicReference;
2021-09-30 18:22:50 +02:00
import java.util.concurrent.locks.LockSupport;
import org.warp.commonutils.locks.LockUtils;
2021-02-20 21:25:11 +01:00
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
2021-09-30 18:22:50 +02:00
import reactor.core.publisher.Sinks.EmitResult;
2021-01-22 17:31:09 +01:00
import reactor.core.publisher.Sinks.Empty;
2021-01-22 12:25:04 +01:00
import reactor.core.publisher.Sinks.One;
2021-01-19 03:18:00 +01:00
import reactor.core.scheduler.Schedulers;
2021-01-23 18:49:21 +01:00
public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
2021-01-23 18:49:21 +01:00
private Logger logger;
public static final byte[] EMPTY = new byte[0];
2021-01-22 17:31:09 +01:00
private final TdClusterManager cluster;
2020-10-17 18:28:54 +02:00
private final DeliveryOptions deliveryOptions;
private final DeliveryOptions deliveryOptionsWithTimeout;
2021-02-25 23:37:41 +01:00
private final DeliveryOptions pingDeliveryOptions;
2021-01-23 18:49:21 +01:00
private final One<BinlogAsyncFile> binlog = Sinks.one();
2021-01-26 16:29:45 +01:00
private final One<MessageConsumer<TdResultList>> updates = Sinks.one();
2021-01-23 18:49:21 +01:00
// This will only result in a successful completion, never completes in other ways
private final Empty<Void> updatesStreamEnd = Sinks.one();
// This will only result in a crash, never completes in other ways
private final Empty<Void> crash = Sinks.one();
2021-01-26 12:34:59 +01:00
// This will only result in a successful completion, never completes in other ways
private final Empty<Void> pingFail = Sinks.one();
2021-03-06 13:35:11 +01:00
// This will only result in a successful completion, never completes in other ways.
// It will be called when UpdateAuthorizationStateClosing is intercepted.
// If it's completed stop checking if the ping works or not
private final Empty<Void> authStateClosing = Sinks.one();
2021-01-23 18:49:21 +01:00
2021-03-10 12:35:56 +01:00
private long botId;
private String botAddress;
private String botAlias;
private boolean local;
public AsyncTdMiddleEventBusClient(TdClusterManager clusterManager) {
2021-01-23 18:49:21 +01:00
this.logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusClient.class);
this.cluster = clusterManager;
2020-10-17 18:28:54 +02:00
this.deliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local);
this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(30000);
2021-02-25 23:37:41 +01:00
this.pingDeliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(60000);
}
2021-02-25 11:21:03 +01:00
private Mono<AsyncTdMiddleEventBusClient> initializeEb() {
2021-01-25 16:06:05 +01:00
return Mono.just(this);
}
2021-02-25 11:21:03 +01:00
@Override
public Mono<Void> initialize() {
2021-02-25 23:37:41 +01:00
// Do nothing here.
2021-02-25 11:21:03 +01:00
return Mono.empty();
}
2021-01-22 17:31:09 +01:00
public static Mono<AsyncTdMiddle> getAndDeployInstance(TdClusterManager clusterManager,
2021-03-10 12:35:56 +01:00
long botId,
2021-01-22 17:31:09 +01:00
String botAlias,
2021-01-23 18:49:21 +01:00
boolean local,
2021-01-24 19:13:46 +01:00
JsonObject implementationDetails,
2021-01-23 18:49:21 +01:00
Path binlogsArchiveDirectory) {
return new AsyncTdMiddleEventBusClient(clusterManager)
2021-02-25 11:21:03 +01:00
.initializeEb()
.flatMap(instance -> retrieveBinlog(clusterManager.getVertx(), binlogsArchiveDirectory, botId)
.flatMap(binlog -> binlog
.getLastModifiedTime()
.filter(modTime -> modTime == 0)
.doOnNext(v -> LoggerFactory
.getLogger(AsyncTdMiddleEventBusClient.class)
.error("Can't retrieve binlog of bot " + botId + " " + botAlias + ". Creating a new one..."))
.thenReturn(binlog)).<AsyncTdMiddle>flatMap(binlog -> instance
.start(botId, botAlias, local, implementationDetails, binlog)
.thenReturn(instance)
)
.single()
2021-01-23 22:33:52 +01:00
)
.single();
}
2021-01-23 18:49:21 +01:00
/**
*
* @return optional result
*/
2021-03-10 12:35:56 +01:00
public static Mono<BinlogAsyncFile> retrieveBinlog(Vertx vertx, Path binlogsArchiveDirectory, long botId) {
2021-01-23 18:49:21 +01:00
return BinlogUtils.retrieveBinlog(vertx.fileSystem(), binlogsArchiveDirectory.resolve(botId + ".binlog"));
}
2021-01-27 00:58:07 +01:00
private Mono<Void> saveBinlog(Buffer data) {
2021-01-23 18:49:21 +01:00
return this.binlog.asMono().flatMap(binlog -> BinlogUtils.saveBinlog(binlog, data));
}
2021-03-10 12:35:56 +01:00
public Mono<Void> start(long botId,
2021-01-24 19:13:46 +01:00
String botAlias,
boolean local,
JsonObject implementationDetails,
BinlogAsyncFile binlog) {
2021-01-23 18:49:21 +01:00
this.botId = botId;
this.botAlias = botAlias;
this.botAddress = "bots.bot." + this.botId;
this.local = local;
this.logger = LoggerFactory.getLogger(this.botId + " " + botAlias);
return MonoUtils
2021-09-30 18:22:50 +02:00
.fromBlockingEmpty(() -> {
EmitResult result;
while ((result = this.binlog.tryEmitValue(binlog)) == EmitResult.FAIL_NON_SERIALIZED) {
// 10ms
LockSupport.parkNanos(10000000);
}
result.orThrow();
})
2021-01-23 18:49:21 +01:00
.then(binlog.getLastModifiedTime())
2021-01-27 00:58:07 +01:00
.zipWith(binlog.readFully().map(Buffer::getDelegate))
2021-01-23 18:49:21 +01:00
.single()
.flatMap(tuple -> {
var binlogLastModifiedTime = tuple.getT1();
var binlogData = tuple.getT2();
2021-01-24 19:13:46 +01:00
var msg = new StartSessionMessage(this.botId,
this.botAlias,
2021-01-27 00:58:07 +01:00
Buffer.newInstance(binlogData),
2021-01-24 19:13:46 +01:00
binlogLastModifiedTime,
implementationDetails
);
2021-01-23 18:49:21 +01:00
return setupUpdatesListener()
.then(Mono.defer(() -> {
if (local) {
return Mono.empty();
}
2021-01-26 12:34:59 +01:00
logger.trace("Requesting bots.start-bot");
return cluster.getEventBus()
.<byte[]>rxRequest("bots.start-bot", msg).as(MonoUtils::toMono)
2021-01-26 12:34:59 +01:00
.doOnSuccess(s -> logger.trace("bots.start-bot returned successfully"))
.subscribeOn(Schedulers.boundedElastic());
}))
2021-01-26 12:34:59 +01:00
.then(setupPing());
2021-02-14 22:59:20 +01:00
});
}
2021-01-26 12:34:59 +01:00
private Mono<Void> setupPing() {
return Mono.<Void>fromCallable(() -> {
logger.trace("Setting up ping");
// Disable ping on local servers
if (!local) {
Mono
.defer(() -> {
logger.trace("Requesting ping...");
2021-01-26 19:22:55 +01:00
return cluster.getEventBus()
2021-02-25 23:37:41 +01:00
.<byte[]>rxRequest(botAddress + ".ping", EMPTY, pingDeliveryOptions)
.as(MonoUtils::toMono);
})
2021-07-01 20:08:57 +02:00
.flatMap(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic()))
2021-01-26 12:34:59 +01:00
.repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true))
2021-09-30 18:22:50 +02:00
.takeUntilOther(Mono.firstWithSignal(
this.updatesStreamEnd
.asMono()
.doOnTerminate(() -> logger.trace("About to kill pinger because updates stream ended")),
this.crash
.asMono()
.onErrorResume(ex -> Mono.empty())
.doOnTerminate(() -> logger.trace("About to kill pinger because it has seen a crash signal"))
))
2021-01-26 19:22:55 +01:00
.doOnNext(s -> logger.trace("PING"))
2021-01-26 12:34:59 +01:00
.then()
2021-01-26 19:22:55 +01:00
.onErrorResume(ex -> {
2021-03-06 13:35:11 +01:00
logger.warn("Ping failed: {}", ex.getMessage());
2021-01-26 19:22:55 +01:00
return Mono.empty();
})
.doOnNext(s -> logger.debug("END PING"))
2021-09-30 18:22:50 +02:00
.then(MonoUtils.fromBlockingEmpty(() -> {
while (this.pingFail.tryEmitEmpty() == EmitResult.FAIL_NON_SERIALIZED) {
// 10ms
LockSupport.parkNanos(10000000);
}
}))
2021-01-27 01:09:22 +01:00
.subscribeOn(Schedulers.parallel())
2021-01-26 12:34:59 +01:00
.subscribe();
}
logger.trace("Ping setup success");
return null;
}).subscribeOn(Schedulers.boundedElastic());
}
2021-01-26 12:34:59 +01:00
private Mono<Void> setupUpdatesListener() {
return Mono
.fromRunnable(() -> logger.trace("Setting up updates listener..."))
.then(MonoUtils.<MessageConsumer<TdResultList>>fromBlockingSingle(() -> MessageConsumer
.newInstance(cluster.getEventBus().<TdResultList>consumer(botAddress + ".updates")
.setMaxBufferedMessages(5000)
.getDelegate()
))
)
2021-01-26 12:34:59 +01:00
.flatMap(updateConsumer -> {
2021-01-25 16:06:05 +01:00
// Return when the registration of all the consumers has been done across the cluster
2021-01-26 12:34:59 +01:00
return Mono
.fromRunnable(() -> logger.trace("Emitting updates flux to sink"))
2021-09-30 18:22:50 +02:00
.then(MonoUtils.fromBlockingEmpty(() -> {
EmitResult result;
while ((result = this.updates.tryEmitValue(updateConsumer)) == EmitResult.FAIL_NON_SERIALIZED) {
// 10ms
LockSupport.parkNanos(10000000);
}
result.orThrow();
}))
2021-01-26 12:34:59 +01:00
.doOnSuccess(s -> logger.trace("Emitted updates flux to sink"))
.doOnSuccess(s -> logger.trace("Waiting to register update consumer across the cluster"))
.doOnSuccess(s -> logger.trace("Registered update consumer across the cluster"));
})
2021-01-26 12:34:59 +01:00
.doOnSuccess(s ->logger.trace("Set up updates listener"))
2021-01-25 16:06:05 +01:00
.then();
}
2021-02-25 23:37:41 +01:00
@SuppressWarnings("Convert2MethodRef")
2021-01-23 18:49:21 +01:00
@Override
public Flux<TdApi.Object> receive() {
// Here the updates will be received
2021-01-26 16:29:45 +01:00
2021-01-24 19:13:46 +01:00
return Mono
.fromRunnable(() -> logger.trace("Called receive() from parent"))
2021-02-14 22:59:20 +01:00
.then(updates.asMono())
2021-01-27 01:09:22 +01:00
.publishOn(Schedulers.parallel())
2021-02-25 23:37:41 +01:00
.timeout(Duration.ofSeconds(30))
2021-03-31 12:02:49 +02:00
.doOnSuccess(s -> logger.trace("Registering updates flux"))
.flatMapMany(updatesMessageConsumer -> MonoUtils.fromMessageConsumer(Mono
.empty()
.doOnSuccess(s -> logger.trace("Sending ready-to-receive"))
.then(cluster.getEventBus().<byte[]>rxRequest(botAddress + ".ready-to-receive",
EMPTY,
deliveryOptionsWithTimeout
).as(MonoUtils::toMono))
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
.doOnSuccess(s -> logger.trace("About to read updates flux"))
.then(), updatesMessageConsumer)
)
.takeUntilOther(Flux
.merge(
crash.asMono()
.onErrorResume(ex -> {
logger.error("TDLib crashed", ex);
return Mono.empty();
}),
pingFail.asMono()
.then(Mono.fromCallable(() -> {
var ex = new ConnectException("Server did not respond to ping");
ex.setStackTrace(new StackTraceElement[0]);
throw ex;
2021-09-30 18:22:50 +02:00
})).onErrorResume(ex -> MonoUtils.fromBlockingSingle(() -> {
EmitResult result;
while ((result = this.crash.tryEmitError(ex)) == EmitResult.FAIL_NON_SERIALIZED) {
// 10ms
LockSupport.parkNanos(10000000);
}
return result;
}))
2021-03-31 12:02:49 +02:00
.takeUntilOther(Mono
.firstWithSignal(crash.asMono(), authStateClosing.asMono())
.onErrorResume(e -> Mono.empty())
)
)
2021-03-31 12:02:49 +02:00
.doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end"))
)
.takeUntil(a -> a.succeeded() && a.value().stream().anyMatch(item -> {
if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
return ((UpdateAuthorizationState) item).authorizationState.getConstructor()
== AuthorizationStateClosed.CONSTRUCTOR;
}
return false;
}))
.flatMapSequential(updates -> {
if (updates.succeeded()) {
return Flux.fromIterable(updates.value());
} else {
return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow());
}
})
.concatMap(update -> interceptUpdate(update))
// Redirect errors to crash sink
.doOnError(error -> crash.tryEmitError(error))
.onErrorResume(ex -> {
logger.trace("Absorbing the error, the error has been published using the crash sink", ex);
return Mono.empty();
})
2021-01-26 12:34:59 +01:00
2021-03-31 12:02:49 +02:00
.doOnTerminate(updatesStreamEnd::tryEmitEmpty);
}
2021-03-31 12:02:49 +02:00
private Mono<TdApi.Object> interceptUpdate(Object update) {
2021-01-26 16:29:45 +01:00
logger.trace("Received update {}", update.getClass().getSimpleName());
2021-09-30 18:22:50 +02:00
if (update.getConstructor() == TdApi.UpdateAuthorizationState.CONSTRUCTOR) {
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
switch (updateAuthorizationState.authorizationState.getConstructor()) {
case TdApi.AuthorizationStateClosing.CONSTRUCTOR:
authStateClosing.tryEmitEmpty();
break;
case TdApi.AuthorizationStateClosed.CONSTRUCTOR:
2021-10-01 10:58:14 +02:00
logger.info("Received AuthorizationStateClosed from tdlib");
return cluster.getEventBus()
.<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY)
.as(MonoUtils::toMono)
.mapNotNull(Message::body)
.doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: {}",
BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length())))
2021-09-30 18:22:50 +02:00
.flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog()))
.doOnSuccess(s -> logger.info("Overwritten binlog from server"))
.thenReturn(update);
}
2021-01-23 22:33:52 +01:00
}
return Mono.just(update);
}
@Override
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, boolean executeDirectly) {
var req = new ExecuteObject(executeDirectly, request);
2021-09-30 19:18:25 +02:00
var crashMono = crash.asMono()
.doOnSuccess(s -> logger.debug("Failed request {} because the TDLib session was already crashed", request))
.then(Mono.<TdResult<T>>empty());
2021-10-01 10:58:14 +02:00
var executionMono = cluster.getEventBus()
.<TdResultMessage>rxRequest(botAddress + ".execute", req, deliveryOptions)
.as(MonoUtils::toMono)
2021-09-30 19:18:25 +02:00
.onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex))
.<TdResult<T>>handle((resp, sink) -> {
if (resp.body() == null) {
var tdError = new TdError(500, "Response is empty");
sink.error(ResponseError.newResponseError(request, botAlias, tdError));
} else {
sink.next(resp.body().toTdResult());
}
})
2021-10-01 10:58:14 +02:00
.doFirst(() -> logger.trace("Executing request {}", request))
2021-09-30 19:18:25 +02:00
.doOnSuccess(s -> logger.trace("Executed request {}", request))
.doOnError(ex -> logger.debug("Failed request {}: {}", req, ex));
2021-01-22 17:31:09 +01:00
return Mono
2021-09-30 19:18:25 +02:00
.firstWithSignal(crashMono, executionMono)
.switchIfEmpty(Mono.error(() -> ResponseError.newResponseError(request, botAlias,
new TdError(500, "Client is closed or response is empty"))));
}
}