From ee19a97b00ff1b1d530a0b2765256ad4d9cc5971 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 10 Dec 2021 02:23:19 +0100 Subject: [PATCH] Complete login phase --- .../it/tdlight/reactiveapi/ReactiveApi.java | 2 +- .../reactiveapi/ReactiveApiPublisher.java | 155 +++++++++++++----- 2 files changed, 118 insertions(+), 39 deletions(-) diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java index d488ecf..78e04a1 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java @@ -118,7 +118,7 @@ public class ReactiveApi { LOG.warn("The user id {} has been deleted from the disk configuration file", user); } - return new DiskChanges(unmodifiableSet(normalUsers), unmodifiableSet(addedUsers), unmodifiableSet(remoteSet)); + return new DiskChanges(unmodifiableSet(normalUsers), unmodifiableSet(addedUsers), unmodifiableSet(deletedUsers)); }).cache(); var removeObsoleteDiskSessions = diskChangesMono diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 3155d7d..f30137e 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -1,21 +1,22 @@ package it.tdlight.reactiveapi; import static it.tdlight.reactiveapi.AuthPhase.*; +import static java.util.Objects.requireNonNull; import io.atomix.cluster.messaging.ClusterEventService; import io.atomix.core.Atomix; import it.tdlight.common.ReactiveTelegramClient; import it.tdlight.common.Response; import it.tdlight.common.Signal; +import it.tdlight.common.utils.LibraryVersion; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.CheckAuthenticationBotToken; import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey; -import it.tdlight.jni.TdApi.Close; -import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Object; import it.tdlight.jni.TdApi.PhoneNumberAuthenticationSettings; import it.tdlight.jni.TdApi.SetAuthenticationPhoneNumber; import it.tdlight.jni.TdApi.SetTdlibParameters; +import it.tdlight.jni.TdApi.TdlibParameters; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested; import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested; @@ -42,14 +43,13 @@ import javax.annotation.Nullable; import org.apache.commons.lang3.SerializationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.warp.commonutils.metrics.AtomicDetailedTimeAbsoluteSamples; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Schedulers; -public class ReactiveApiPublisher { +public abstract class ReactiveApiPublisher { private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class); @@ -60,24 +60,21 @@ public class ReactiveApiPublisher { private final Flux telegramClient; private final AtomicReference state = new AtomicReference<>(new State(LOGGED_OUT)); - private final long userId; - private final long liveId; - private final String botToken; - private final Long phoneNumber; + protected final long userId; + protected final long liveId; private final AtomicReference disposable = new AtomicReference<>(); + private final AtomicReference path = new AtomicReference<>(); - private ReactiveApiPublisher(Atomix atomix, long liveId, long userId, String botToken, Long phoneNumber) { + private ReactiveApiPublisher(Atomix atomix, long liveId, long userId) { this.userId = userId; this.liveId = liveId; - this.botToken = botToken; - this.phoneNumber = phoneNumber; this.rawTelegramClient = ClientManager.createReactive(); this.telegramClient = Flux.create(sink -> { + rawTelegramClient.createAndRegisterClient(); rawTelegramClient.setListener(sink::next); sink.onCancel(rawTelegramClient::cancel); sink.onDispose(rawTelegramClient::dispose); - rawTelegramClient.createAndRegisterClient(); this.registerTopics(); }).share(); @@ -85,14 +82,15 @@ public class ReactiveApiPublisher { } public static ReactiveApiPublisher fromToken(Atomix atomix, Long liveId, long userId, String token) { - return new ReactiveApiPublisher(atomix, liveId, userId, token, null); + return new ReactiveApiPublisherToken(atomix, liveId, userId, token); } public static ReactiveApiPublisher fromPhoneNumber(Atomix atomix, Long liveId, long userId, long phoneNumber) { - return new ReactiveApiPublisher(atomix, liveId, userId, null, phoneNumber); + return new ReactiveApiPublisherPhoneNumber(atomix, liveId, userId, phoneNumber); } public void start(Path path) { + this.path.set(path); LOG.info("Starting session \"{}\" in path \"{}\"", this, path); var publishedResultingEvents = telegramClient .subscribeOn(Schedulers.parallel()) @@ -102,25 +100,27 @@ public class ReactiveApiPublisher { publishedResultingEvents .filter(s -> s instanceof TDLibBoundResultingEvent) .map(s -> ((TDLibBoundResultingEvent) s).action()) - .flatMap(function -> Mono + .flatMapSequential(function -> Mono .from(rawTelegramClient.send(function, SPECIAL_RAW_TIMEOUT_DURATION)) - .doOnNext(resp -> { + .mapNotNull(resp -> { if (resp.getConstructor() == TdApi.Error.CONSTRUCTOR) { LOG.error("Received error for special request {}: {}", function, resp); + return new OnUpdateError(liveId, userId, (TdApi.Error) resp); + } else { + return null; } }) .doOnError(ex -> LOG.error("Failed to receive the response for special request {}", function, ex)) + .onErrorResume(ex -> Mono.just(new OnUpdateError(liveId, userId, new TdApi.Error(500, ex.getMessage())))) ) .subscribeOn(Schedulers.parallel()) - .subscribe(); + .subscribe(this::sendClientBoundEvent); publishedResultingEvents .filter(s -> s instanceof ClientBoundResultingEvent) .cast(ClientBoundResultingEvent.class) + .map(ClientBoundResultingEvent::event) .subscribeOn(Schedulers.parallel()) - .subscribe(clientBoundResultingEvent -> eventService.broadcast("session-" + liveId + "-client-bound-events", - clientBoundResultingEvent.event(), - ReactiveApiPublisher::serializeEvent - )); + .subscribe(this::sendClientBoundEvent); var prev = this.disposable.getAndSet(publishedResultingEvents.connect()); @@ -130,6 +130,13 @@ public class ReactiveApiPublisher { } } + private void sendClientBoundEvent(ClientBoundEvent clientBoundResultingEvent) { + eventService.broadcast("session-" + liveId + "-client-bound-events", + clientBoundResultingEvent, + ReactiveApiPublisher::serializeEvent + ); + } + @Nullable private ResultingEvent onSignal(Signal signal) { // Update the state @@ -169,7 +176,25 @@ public class ReactiveApiPublisher { var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; switch (updateAuthorizationState.authorizationState.getConstructor()) { case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> { - return new TDLibBoundResultingEvent<>(new SetTdlibParameters()); + var tdlibParameters = new TdlibParameters(); + var path = requireNonNull(this.path.get(), "Path must not be null"); + tdlibParameters.databaseDirectory = path.resolve("database").toString(); + tdlibParameters.apiId = 94575; + tdlibParameters.apiHash = "a3406de8d171bb422bb6ddf3bbd800e2"; + tdlibParameters.filesDirectory = path.resolve("files").toString(); + tdlibParameters.applicationVersion = it.tdlight.reactiveapi.generated.LibraryVersion.VERSION; + tdlibParameters.deviceModel = System.getProperty("os.name"); + tdlibParameters.systemVersion = System.getProperty("os.version"); + tdlibParameters.enableStorageOptimizer = true; + tdlibParameters.ignoreFileNames = true; + tdlibParameters.useTestDc = false; + tdlibParameters.useSecretChats = false; + + tdlibParameters.useMessageDatabase = true; + tdlibParameters.useFileDatabase = true; + tdlibParameters.useChatInfoDatabase = true; + tdlibParameters.systemLanguageCode = System.getProperty("user.language", "en"); + return new TDLibBoundResultingEvent<>(new SetTdlibParameters(tdlibParameters)); } } } @@ -180,7 +205,7 @@ public class ReactiveApiPublisher { case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> { var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; switch (updateAuthorizationState.authorizationState.getConstructor()) { - case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> { + case TdApi.AuthorizationStateWaitEncryptionKey.CONSTRUCTOR -> { return new TDLibBoundResultingEvent<>(new CheckDatabaseEncryptionKey()); } } @@ -193,7 +218,7 @@ public class ReactiveApiPublisher { var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; switch (updateAuthorizationState.authorizationState.getConstructor()) { case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR -> { - return new ClientBoundResultingEvent(new OnUserLoginCodeRequested(liveId, userId, phoneNumber)); + return onWaitCode(); } case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR -> { return new ClientBoundResultingEvent(new OnOtherDeviceLoginRequested(liveId, userId)); @@ -202,17 +227,7 @@ public class ReactiveApiPublisher { return new ClientBoundResultingEvent(new OnPasswordRequested(liveId, userId)); } case TdApi.AuthorizationStateWaitPhoneNumber.CONSTRUCTOR -> { - if (botToken != null) { - return new TDLibBoundResultingEvent<>(new CheckAuthenticationBotToken(botToken)); - } else { - var authSettings = new PhoneNumberAuthenticationSettings(); - authSettings.allowFlashCall = false; - authSettings.allowSmsRetrieverApi = false; - authSettings.isCurrentPhoneNumber = false; - return new TDLibBoundResultingEvent<>(new SetAuthenticationPhoneNumber("+" + phoneNumber, - authSettings - )); - } + return onWaitToken(); } } } @@ -222,6 +237,13 @@ public class ReactiveApiPublisher { return null; } + protected abstract ResultingEvent onWaitToken(); + + protected ResultingEvent onWaitCode() { + LOG.error("Wait code event is not supported"); + return null; + } + private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) { try (var byteArrayOutputStream = new ByteArrayOutputStream()) { try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { @@ -309,10 +331,67 @@ public class ReactiveApiPublisher { return new StringJoiner(", ", ReactiveApiPublisher.class.getSimpleName() + "[", "]") .add("userId=" + userId) .add("liveId=" + liveId) - .add("botToken='" + botToken + "'") - .add("phoneNumber=" + phoneNumber) .toString(); } private record RequestWithTimeoutInstant(TdApi.Function request, Instant timeout) {} + + private static class ReactiveApiPublisherToken extends ReactiveApiPublisher { + + private final String botToken; + + public ReactiveApiPublisherToken(Atomix atomix, Long liveId, long userId, String botToken) { + super(atomix, liveId, userId); + this.botToken = botToken; + } + + @Override + protected ResultingEvent onWaitToken() { + return new TDLibBoundResultingEvent<>(new CheckAuthenticationBotToken(botToken)); + } + + @Override + public String toString() { + return new StringJoiner(", ", ReactiveApiPublisherToken.class.getSimpleName() + "[", "]") + .add("userId=" + userId) + .add("liveId=" + liveId) + .add("token='" + botToken + "'") + .toString(); + } + } + + private static class ReactiveApiPublisherPhoneNumber extends ReactiveApiPublisher { + + private final long phoneNumber; + + public ReactiveApiPublisherPhoneNumber(Atomix atomix, Long liveId, long userId, long phoneNumber) { + super(atomix, liveId, userId); + this.phoneNumber = phoneNumber; + } + + @Override + protected ResultingEvent onWaitToken() { + var authSettings = new PhoneNumberAuthenticationSettings(); + authSettings.allowFlashCall = false; + authSettings.allowSmsRetrieverApi = false; + authSettings.isCurrentPhoneNumber = false; + return new TDLibBoundResultingEvent<>(new SetAuthenticationPhoneNumber("+" + phoneNumber, + authSettings + )); + } + + @Override + public ClientBoundResultingEvent onWaitCode() { + return new ClientBoundResultingEvent(new OnUserLoginCodeRequested(liveId, userId, phoneNumber)); + } + + @Override + public String toString() { + return new StringJoiner(", ", ReactiveApiPublisherPhoneNumber.class.getSimpleName() + "[", "]") + .add("userId=" + userId) + .add("liveId=" + liveId) + .add("phoneNumber=" + phoneNumber) + .toString(); + } + } }