diff --git a/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java new file mode 100644 index 0000000..3921c8c --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java @@ -0,0 +1,15 @@ +package it.tdlight.tdlibsession.td; + +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Object; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface ReactorTelegramClient { + + Flux initialize(); + + Mono send(TdApi.Function query); + + Object execute(TdApi.Function query); +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java new file mode 100644 index 0000000..4af92fb --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java @@ -0,0 +1,54 @@ +package it.tdlight.tdlibsession.td; + +import it.tdlight.common.ReactiveTelegramClient; +import it.tdlight.jni.TdApi; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class WrappedReactorTelegramClient implements ReactorTelegramClient { + + private final ReactiveTelegramClient reactiveTelegramClient; + + public WrappedReactorTelegramClient(ReactiveTelegramClient reactiveTelegramClient) { + this.reactiveTelegramClient = reactiveTelegramClient; + } + + @Override + public Flux initialize() { + return Flux.from(reactiveTelegramClient).concatMap(item -> { + if (item.isUpdate()) { + return Mono.just(item.getUpdate()); + } else if (item.isHandleException()) { + return Mono.error(item.getHandleException()); + } else if (item.isUpdateException()) { + return Mono.error(item.getUpdateException()); + } else { + return Mono.error(new IllegalStateException("This shouldn't happen. Received unknown ReactiveItem type")); + } + }); + } + + /** + * Sends a request to the TDLib. + * + * @param query Object representing a query to the TDLib. + * @throws NullPointerException if query is null. + * @return a publisher that will emit exactly one item, or an error + */ + @Override + public Mono send(TdApi.Function query) { + return Mono.from(reactiveTelegramClient.send(query)); + } + + /** + * Synchronously executes a TDLib request. Only a few marked accordingly requests can be executed synchronously. + * + * @param query Object representing a query to the TDLib. + * @return request result or {@link TdApi.Error}. + * @throws NullPointerException if query is null. + */ + @Override + public TdApi.Object execute(TdApi.Function query) { + return reactiveTelegramClient.execute(query); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index e51b519..03982bf 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -1,13 +1,13 @@ package it.tdlight.tdlibsession.td.direct; import io.vertx.core.json.JsonObject; -import it.tdlight.common.TelegramClient; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.Close; import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Ok; import it.tdlight.jni.TdApi.UpdateAuthorizationState; +import it.tdlight.tdlibsession.td.ReactorTelegramClient; import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.utils.MonoUtils; @@ -27,7 +27,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { private final JsonObject implementationDetails; private final String botAlias; - private final One td = Sinks.one(); + private final One td = Sinks.one(); public AsyncTdDirectImpl(TelegramClientFactory telegramClientFactory, JsonObject implementationDetails, @@ -42,10 +42,10 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { if (synchronous) { return td.asMono().single().flatMap(td -> MonoUtils.fromBlockingSingle(() -> { if (td != null) { - return TdResult.of(td.execute(request)); + return TdResult.of(td.execute(request)); } else { if (request.getConstructor() == Close.CONSTRUCTOR) { - return TdResult.of(new Ok()); + return TdResult.of(new Ok()); } throw new IllegalStateException("TDLib client is destroyed"); } @@ -53,11 +53,14 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } else { return td.asMono().single().flatMap(td -> Mono.>create(sink -> { if (td != null) { - td.send(request, v -> sink.success(TdResult.of(v)), sink::error); + Mono + .from(td.send(request)) + .subscribeOn(Schedulers.single()) + .subscribe(v -> sink.success(TdResult.of(v)), sink::error); } else { if (request.getConstructor() == Close.CONSTRUCTOR) { logger.trace("Sending close success to sink " + sink.toString()); - sink.success(TdResult.of(new Ok())); + sink.success(TdResult.of(new Ok())); } else { logger.trace("Sending close error to sink " + sink.toString()); sink.error(new IllegalStateException("TDLib client is destroyed")); @@ -71,44 +74,29 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { public Flux receive(AsyncTdDirectOptions options) { // If closed it will be either true or false final One closedFromTd = Sinks.one(); - return telegramClientFactory.create(implementationDetails) - .flatMapMany(client -> Flux - .create(updatesSink -> { - client.execute(new TdApi.SetLogVerbosityLevel(1)); - client.initialize((TdApi.Object object) -> { - updatesSink.next(object); - // Close the emitter if receive closed state - if (object.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR - && ((UpdateAuthorizationState) object).authorizationState.getConstructor() - == AuthorizationStateClosed.CONSTRUCTOR) { - logger.debug("Received closed status from tdlib"); - closedFromTd.tryEmitValue(true); - updatesSink.complete(); - } - }, updatesSink::error, updatesSink::error); - - if (td.tryEmitValue(client).isFailure()) { - updatesSink.error(new TdError(500, "Failed to emit td client")); - } - - // Send close if the stream is disposed before tdlib is closed - updatesSink.onDispose(() -> { - - Mono.firstWithValue(closedFromTd.asMono(), Mono.empty()).switchIfEmpty(Mono.defer(() -> Mono.fromRunnable(() -> { - // Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false. - closedFromTd.tryEmitValue(false); - }))).filter(isClosedFromTd -> !isClosedFromTd).doOnNext(x -> { - logger.warn("The stream has been disposed without closing tdlib. Sending TdApi.Close()..."); - client.send(new Close(), - result -> logger.warn("Close result: {}", result), - ex -> logger.error("Error when disposing td client", ex) - ); - }).subscribeOn(Schedulers.parallel()).subscribe(); - }); - }) - .subscribeOn(Schedulers.boundedElastic()) - .publishOn(Schedulers.parallel()) - ) + return telegramClientFactory + .create(implementationDetails) + .doOnNext(client -> client.execute(new TdApi.SetLogVerbosityLevel(1))) + .flatMap(client -> { + if (td.tryEmitValue(client).isFailure()) { + return Mono.error(new TdError(500, "Failed to emit td client")); + } + return Mono.just(client); + }) + .flatMapMany(ReactorTelegramClient::initialize) + .doOnNext(update -> { + // Close the emitter if receive closed state + if (update.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR + && ((UpdateAuthorizationState) update).authorizationState.getConstructor() + == AuthorizationStateClosed.CONSTRUCTOR) { + logger.debug("Received closed status from tdlib"); + closedFromTd.tryEmitValue(true); + } + }) + .doOnCancel(() -> { + // Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false. + closedFromTd.tryEmitValue(false); + }) .subscribeOn(Schedulers.parallel()); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/TelegramClientFactory.java b/src/main/java/it/tdlight/tdlibsession/td/direct/TelegramClientFactory.java index e1b3f8f..0c2a95a 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/TelegramClientFactory.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/TelegramClientFactory.java @@ -1,7 +1,8 @@ package it.tdlight.tdlibsession.td.direct; import io.vertx.core.json.JsonObject; -import it.tdlight.common.TelegramClient; +import it.tdlight.tdlibsession.td.ReactorTelegramClient; +import it.tdlight.tdlibsession.td.WrappedReactorTelegramClient; import it.tdlight.tdlight.ClientManager; import it.tdlight.utils.MonoUtils; import reactor.core.publisher.Mono; @@ -12,12 +13,12 @@ public class TelegramClientFactory { } - public Mono create(JsonObject implementationDetails) { + public Mono create(JsonObject implementationDetails) { return MonoUtils.fromBlockingSingle(() -> { var implementationName = implementationDetails.getString("name", "native-client"); switch (implementationName) { case "native-client": - return ClientManager.create(); + return new WrappedReactorTelegramClient(ClientManager.createReactive()); case "test-client": return new TestClient(implementationDetails.getJsonObject("test-client-settings")); default: diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java index 1e8837d..07233fb 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java @@ -2,12 +2,11 @@ package it.tdlight.tdlibsession.td.direct; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; -import it.tdlight.common.ExceptionHandler; -import it.tdlight.common.ResultHandler; -import it.tdlight.common.TelegramClient; -import it.tdlight.common.UpdatesHandler; import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.AuthorizationStateClosed; +import it.tdlight.jni.TdApi.AuthorizationStateClosing; import it.tdlight.jni.TdApi.AuthorizationStateReady; +import it.tdlight.jni.TdApi.Close; import it.tdlight.jni.TdApi.ConnectionStateReady; import it.tdlight.jni.TdApi.Error; import it.tdlight.jni.TdApi.FormattedText; @@ -25,29 +24,27 @@ import it.tdlight.jni.TdApi.TextEntity; import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.jni.TdApi.UpdateConnectionState; import it.tdlight.jni.TdApi.UpdateNewMessage; +import it.tdlight.tdlibsession.td.ReactorTelegramClient; import it.tdlight.tdlibsession.td.TdError; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicLong; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.Many; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; +import reactor.core.publisher.Sinks.Empty; -public class TestClient implements TelegramClient { +public class TestClient implements ReactorTelegramClient { private static final Logger logger = LoggerFactory.getLogger(TestClient.class); private static final AtomicLong incrementalMessageId = new AtomicLong(1); - private final Many updates = Sinks.many().unicast().onBackpressureError(); - private final Scheduler testClientScheduler = Schedulers.newSingle("test-client", true); private final List features; - private UpdatesHandler updatesHandler; - private ExceptionHandler updateExceptionHandler; - private ExceptionHandler defaultExceptionHandler; + private final Empty closedSink = Sinks.empty(); public TestClient(JsonObject testClientSettings) { JsonArray features = testClientSettings.getJsonArray("features", new JsonArray()); @@ -58,52 +55,6 @@ public class TestClient implements TelegramClient { } } - @Override - public void initialize(UpdatesHandler updatesHandler, - ExceptionHandler updateExceptionHandler, - ExceptionHandler defaultExceptionHandler) { - this.updatesHandler = updatesHandler; - this.updateExceptionHandler = updateExceptionHandler; - this.defaultExceptionHandler = defaultExceptionHandler; - - updates - .asFlux() - .buffer(50) - .doOnNext(ub -> logger.trace("Received update block of size {}", ub.size())) - .subscribeOn(testClientScheduler) - .subscribe(updatesHandler::onUpdates, updateExceptionHandler::onException); - - for (String featureName : features) { - switch (featureName) { - case "status-update": - Mono - .>just(List.of( - new UpdateAuthorizationState(new AuthorizationStateReady()), - new UpdateConnectionState(new ConnectionStateReady())) - ) - .doOnNext(updatesHandler::onUpdates) - .subscribeOn(testClientScheduler) - .subscribe(); - break; - case "infinite-messages": - Mono - .fromSupplier(() -> new UpdateNewMessage(generateRandomMessage( - features.contains("random-senders"), - features.contains("random-chats"), - features.contains("random-text"))) - ) - .repeat() - .buffer(100) - .doOnNext(updatesHandler::onUpdates) - .subscribeOn(testClientScheduler) - .subscribe(); - break; - default: - throw new IllegalArgumentException("Unknown feature name: " + featureName); - } - } - } - private static Message generateRandomMessage(boolean randomSender, boolean randomChat, boolean randomText) { var msg = new Message(); msg.sender = new MessageSenderUser(312042); @@ -117,27 +68,74 @@ public class TestClient implements TelegramClient { } @Override - public void send(Function query, ResultHandler resultHandler, ExceptionHandler exceptionHandler) { - switch (query.getConstructor()) { - case SetLogVerbosityLevel.CONSTRUCTOR: - case SetLogTagVerbosityLevel.CONSTRUCTOR: - case SetTdlibParameters.CONSTRUCTOR: - case SetOption.CONSTRUCTOR: - resultHandler.onResult(new Ok()); - return; - } - exceptionHandler.onException(new TdError(500, "Unsupported")); + public Flux initialize() { + return Flux.fromIterable(features).flatMap(featureName -> { + switch (featureName) { + case "status-update": + return Flux.just( + new UpdateAuthorizationState(new AuthorizationStateReady()), + new UpdateConnectionState(new ConnectionStateReady()) + ).mergeWith(closedSink + .asMono() + .thenMany(Flux.just(new UpdateAuthorizationState(new AuthorizationStateClosing()), + new UpdateAuthorizationState(new AuthorizationStateClosed()) + ))); + case "infinite-messages": + var randomSenders = features.contains("random-senders"); + var randomChats = features.contains("random-chats"); + var randomTexts = features.contains("random-text"); + return Flux + .fromIterable(() -> new Iterator<>() { + @Override + public boolean hasNext() { + return true; + } + + @Override + public TdApi.Object next() { + return new UpdateNewMessage(generateRandomMessage(randomSenders, randomChats, randomTexts)); + } + }).takeUntilOther(this.closedSink.asMono()); + default: + return Mono.fromCallable(() -> { + throw new IllegalArgumentException("Unknown feature name: " + featureName); + }); + } + }); } @Override - public Object execute(Function query) { + public Mono send(Function query) { + return Mono.fromCallable(() -> { + TdApi.Object result = executeCommon(query); + if (result != null) { + return result; + } + throw new TdError(500, "Unsupported"); + }); + } + + @Override + public TdApi.Object execute(Function query) { + TdApi.Object result = executeCommon(query); + if (result != null) { + return result; + } + return new Error(500, "Unsupported"); + } + + @Nullable + public TdApi.Object executeCommon(Function query) { switch (query.getConstructor()) { case SetLogVerbosityLevel.CONSTRUCTOR: case SetLogTagVerbosityLevel.CONSTRUCTOR: case SetTdlibParameters.CONSTRUCTOR: case SetOption.CONSTRUCTOR: return new Ok(); + case Close.CONSTRUCTOR: + closedSink.tryEmitEmpty(); + return new Ok(); } - return new Error(500, "Unsupported"); + return null; } }