Add reactor telegram client

This commit is contained in:
Andrea Cavalli 2021-02-13 17:42:22 +01:00
parent 800187b9e2
commit faabbc59e8
5 changed files with 174 additions and 118 deletions

View File

@ -0,0 +1,15 @@
package it.tdlight.tdlibsession.td;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Object;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface ReactorTelegramClient {
Flux<Object> initialize();
Mono<Object> send(TdApi.Function query);
Object execute(TdApi.Function query);
}

View File

@ -0,0 +1,54 @@
package it.tdlight.tdlibsession.td;
import it.tdlight.common.ReactiveTelegramClient;
import it.tdlight.jni.TdApi;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class WrappedReactorTelegramClient implements ReactorTelegramClient {
private final ReactiveTelegramClient reactiveTelegramClient;
public WrappedReactorTelegramClient(ReactiveTelegramClient reactiveTelegramClient) {
this.reactiveTelegramClient = reactiveTelegramClient;
}
@Override
public Flux<TdApi.Object> initialize() {
return Flux.from(reactiveTelegramClient).concatMap(item -> {
if (item.isUpdate()) {
return Mono.just(item.getUpdate());
} else if (item.isHandleException()) {
return Mono.error(item.getHandleException());
} else if (item.isUpdateException()) {
return Mono.error(item.getUpdateException());
} else {
return Mono.error(new IllegalStateException("This shouldn't happen. Received unknown ReactiveItem type"));
}
});
}
/**
* Sends a request to the TDLib.
*
* @param query Object representing a query to the TDLib.
* @throws NullPointerException if query is null.
* @return a publisher that will emit exactly one item, or an error
*/
@Override
public Mono<TdApi.Object> send(TdApi.Function query) {
return Mono.from(reactiveTelegramClient.send(query));
}
/**
* Synchronously executes a TDLib request. Only a few marked accordingly requests can be executed synchronously.
*
* @param query Object representing a query to the TDLib.
* @return request result or {@link TdApi.Error}.
* @throws NullPointerException if query is null.
*/
@Override
public TdApi.Object execute(TdApi.Function query) {
return reactiveTelegramClient.execute(query);
}
}

View File

@ -1,13 +1,13 @@
package it.tdlight.tdlibsession.td.direct;
import io.vertx.core.json.JsonObject;
import it.tdlight.common.TelegramClient;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.Close;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Ok;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.td.ReactorTelegramClient;
import it.tdlight.tdlibsession.td.TdError;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.utils.MonoUtils;
@ -27,7 +27,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
private final JsonObject implementationDetails;
private final String botAlias;
private final One<TelegramClient> td = Sinks.one();
private final One<ReactorTelegramClient> td = Sinks.one();
public AsyncTdDirectImpl(TelegramClientFactory telegramClientFactory,
JsonObject implementationDetails,
@ -42,10 +42,10 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
if (synchronous) {
return td.asMono().single().flatMap(td -> MonoUtils.fromBlockingSingle(() -> {
if (td != null) {
return TdResult.<T>of(td.execute(request));
return TdResult.of(td.execute(request));
} else {
if (request.getConstructor() == Close.CONSTRUCTOR) {
return TdResult.<T>of(new Ok());
return TdResult.of(new Ok());
}
throw new IllegalStateException("TDLib client is destroyed");
}
@ -53,11 +53,14 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
} else {
return td.asMono().single().flatMap(td -> Mono.<TdResult<T>>create(sink -> {
if (td != null) {
td.send(request, v -> sink.success(TdResult.of(v)), sink::error);
Mono
.from(td.send(request))
.subscribeOn(Schedulers.single())
.subscribe(v -> sink.success(TdResult.of(v)), sink::error);
} else {
if (request.getConstructor() == Close.CONSTRUCTOR) {
logger.trace("Sending close success to sink " + sink.toString());
sink.success(TdResult.<T>of(new Ok()));
sink.success(TdResult.of(new Ok()));
} else {
logger.trace("Sending close error to sink " + sink.toString());
sink.error(new IllegalStateException("TDLib client is destroyed"));
@ -71,44 +74,29 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
public Flux<TdApi.Object> receive(AsyncTdDirectOptions options) {
// If closed it will be either true or false
final One<Boolean> closedFromTd = Sinks.one();
return telegramClientFactory.create(implementationDetails)
.flatMapMany(client -> Flux
.<TdApi.Object>create(updatesSink -> {
client.execute(new TdApi.SetLogVerbosityLevel(1));
client.initialize((TdApi.Object object) -> {
updatesSink.next(object);
// Close the emitter if receive closed state
if (object.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR
&& ((UpdateAuthorizationState) object).authorizationState.getConstructor()
== AuthorizationStateClosed.CONSTRUCTOR) {
logger.debug("Received closed status from tdlib");
closedFromTd.tryEmitValue(true);
updatesSink.complete();
}
}, updatesSink::error, updatesSink::error);
if (td.tryEmitValue(client).isFailure()) {
updatesSink.error(new TdError(500, "Failed to emit td client"));
}
// Send close if the stream is disposed before tdlib is closed
updatesSink.onDispose(() -> {
Mono.firstWithValue(closedFromTd.asMono(), Mono.empty()).switchIfEmpty(Mono.defer(() -> Mono.fromRunnable(() -> {
// Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false.
closedFromTd.tryEmitValue(false);
}))).filter(isClosedFromTd -> !isClosedFromTd).doOnNext(x -> {
logger.warn("The stream has been disposed without closing tdlib. Sending TdApi.Close()...");
client.send(new Close(),
result -> logger.warn("Close result: {}", result),
ex -> logger.error("Error when disposing td client", ex)
);
}).subscribeOn(Schedulers.parallel()).subscribe();
});
})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())
)
return telegramClientFactory
.create(implementationDetails)
.doOnNext(client -> client.execute(new TdApi.SetLogVerbosityLevel(1)))
.flatMap(client -> {
if (td.tryEmitValue(client).isFailure()) {
return Mono.error(new TdError(500, "Failed to emit td client"));
}
return Mono.just(client);
})
.flatMapMany(ReactorTelegramClient::initialize)
.doOnNext(update -> {
// Close the emitter if receive closed state
if (update.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR
&& ((UpdateAuthorizationState) update).authorizationState.getConstructor()
== AuthorizationStateClosed.CONSTRUCTOR) {
logger.debug("Received closed status from tdlib");
closedFromTd.tryEmitValue(true);
}
})
.doOnCancel(() -> {
// Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false.
closedFromTd.tryEmitValue(false);
})
.subscribeOn(Schedulers.parallel());
}
}

View File

@ -1,7 +1,8 @@
package it.tdlight.tdlibsession.td.direct;
import io.vertx.core.json.JsonObject;
import it.tdlight.common.TelegramClient;
import it.tdlight.tdlibsession.td.ReactorTelegramClient;
import it.tdlight.tdlibsession.td.WrappedReactorTelegramClient;
import it.tdlight.tdlight.ClientManager;
import it.tdlight.utils.MonoUtils;
import reactor.core.publisher.Mono;
@ -12,12 +13,12 @@ public class TelegramClientFactory {
}
public Mono<TelegramClient> create(JsonObject implementationDetails) {
public Mono<ReactorTelegramClient> create(JsonObject implementationDetails) {
return MonoUtils.fromBlockingSingle(() -> {
var implementationName = implementationDetails.getString("name", "native-client");
switch (implementationName) {
case "native-client":
return ClientManager.create();
return new WrappedReactorTelegramClient(ClientManager.createReactive());
case "test-client":
return new TestClient(implementationDetails.getJsonObject("test-client-settings"));
default:

View File

@ -2,12 +2,11 @@ package it.tdlight.tdlibsession.td.direct;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import it.tdlight.common.ExceptionHandler;
import it.tdlight.common.ResultHandler;
import it.tdlight.common.TelegramClient;
import it.tdlight.common.UpdatesHandler;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.AuthorizationStateClosing;
import it.tdlight.jni.TdApi.AuthorizationStateReady;
import it.tdlight.jni.TdApi.Close;
import it.tdlight.jni.TdApi.ConnectionStateReady;
import it.tdlight.jni.TdApi.Error;
import it.tdlight.jni.TdApi.FormattedText;
@ -25,29 +24,27 @@ import it.tdlight.jni.TdApi.TextEntity;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.jni.TdApi.UpdateConnectionState;
import it.tdlight.jni.TdApi.UpdateNewMessage;
import it.tdlight.tdlibsession.td.ReactorTelegramClient;
import it.tdlight.tdlibsession.td.TdError;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.core.publisher.Sinks.Empty;
public class TestClient implements TelegramClient {
public class TestClient implements ReactorTelegramClient {
private static final Logger logger = LoggerFactory.getLogger(TestClient.class);
private static final AtomicLong incrementalMessageId = new AtomicLong(1);
private final Many<Object> updates = Sinks.many().unicast().onBackpressureError();
private final Scheduler testClientScheduler = Schedulers.newSingle("test-client", true);
private final List<String> features;
private UpdatesHandler updatesHandler;
private ExceptionHandler updateExceptionHandler;
private ExceptionHandler defaultExceptionHandler;
private final Empty<java.lang.Object> closedSink = Sinks.empty();
public TestClient(JsonObject testClientSettings) {
JsonArray features = testClientSettings.getJsonArray("features", new JsonArray());
@ -58,52 +55,6 @@ public class TestClient implements TelegramClient {
}
}
@Override
public void initialize(UpdatesHandler updatesHandler,
ExceptionHandler updateExceptionHandler,
ExceptionHandler defaultExceptionHandler) {
this.updatesHandler = updatesHandler;
this.updateExceptionHandler = updateExceptionHandler;
this.defaultExceptionHandler = defaultExceptionHandler;
updates
.asFlux()
.buffer(50)
.doOnNext(ub -> logger.trace("Received update block of size {}", ub.size()))
.subscribeOn(testClientScheduler)
.subscribe(updatesHandler::onUpdates, updateExceptionHandler::onException);
for (String featureName : features) {
switch (featureName) {
case "status-update":
Mono
.<List<TdApi.Object>>just(List.of(
new UpdateAuthorizationState(new AuthorizationStateReady()),
new UpdateConnectionState(new ConnectionStateReady()))
)
.doOnNext(updatesHandler::onUpdates)
.subscribeOn(testClientScheduler)
.subscribe();
break;
case "infinite-messages":
Mono
.<TdApi.Object>fromSupplier(() -> new UpdateNewMessage(generateRandomMessage(
features.contains("random-senders"),
features.contains("random-chats"),
features.contains("random-text")))
)
.repeat()
.buffer(100)
.doOnNext(updatesHandler::onUpdates)
.subscribeOn(testClientScheduler)
.subscribe();
break;
default:
throw new IllegalArgumentException("Unknown feature name: " + featureName);
}
}
}
private static Message generateRandomMessage(boolean randomSender, boolean randomChat, boolean randomText) {
var msg = new Message();
msg.sender = new MessageSenderUser(312042);
@ -117,27 +68,74 @@ public class TestClient implements TelegramClient {
}
@Override
public void send(Function query, ResultHandler resultHandler, ExceptionHandler exceptionHandler) {
switch (query.getConstructor()) {
case SetLogVerbosityLevel.CONSTRUCTOR:
case SetLogTagVerbosityLevel.CONSTRUCTOR:
case SetTdlibParameters.CONSTRUCTOR:
case SetOption.CONSTRUCTOR:
resultHandler.onResult(new Ok());
return;
}
exceptionHandler.onException(new TdError(500, "Unsupported"));
public Flux<TdApi.Object> initialize() {
return Flux.fromIterable(features).flatMap(featureName -> {
switch (featureName) {
case "status-update":
return Flux.<TdApi.Object>just(
new UpdateAuthorizationState(new AuthorizationStateReady()),
new UpdateConnectionState(new ConnectionStateReady())
).mergeWith(closedSink
.asMono()
.thenMany(Flux.just(new UpdateAuthorizationState(new AuthorizationStateClosing()),
new UpdateAuthorizationState(new AuthorizationStateClosed())
)));
case "infinite-messages":
var randomSenders = features.contains("random-senders");
var randomChats = features.contains("random-chats");
var randomTexts = features.contains("random-text");
return Flux
.<TdApi.Object>fromIterable(() -> new Iterator<>() {
@Override
public boolean hasNext() {
return true;
}
@Override
public TdApi.Object next() {
return new UpdateNewMessage(generateRandomMessage(randomSenders, randomChats, randomTexts));
}
}).takeUntilOther(this.closedSink.asMono());
default:
return Mono.fromCallable(() -> {
throw new IllegalArgumentException("Unknown feature name: " + featureName);
});
}
});
}
@Override
public Object execute(Function query) {
public Mono<Object> send(Function query) {
return Mono.fromCallable(() -> {
TdApi.Object result = executeCommon(query);
if (result != null) {
return result;
}
throw new TdError(500, "Unsupported");
});
}
@Override
public TdApi.Object execute(Function query) {
TdApi.Object result = executeCommon(query);
if (result != null) {
return result;
}
return new Error(500, "Unsupported");
}
@Nullable
public TdApi.Object executeCommon(Function query) {
switch (query.getConstructor()) {
case SetLogVerbosityLevel.CONSTRUCTOR:
case SetLogTagVerbosityLevel.CONSTRUCTOR:
case SetTdlibParameters.CONSTRUCTOR:
case SetOption.CONSTRUCTOR:
return new Ok();
case Close.CONSTRUCTOR:
closedSink.tryEmitEmpty();
return new Ok();
}
return new Error(500, "Unsupported");
return null;
}
}