Complete login phase

This commit is contained in:
Andrea Cavalli 2021-12-10 02:23:19 +01:00
parent 8b0220ccfc
commit ee19a97b00
2 changed files with 118 additions and 39 deletions

View File

@ -118,7 +118,7 @@ public class ReactiveApi {
LOG.warn("The user id {} has been deleted from the disk configuration file", user); LOG.warn("The user id {} has been deleted from the disk configuration file", user);
} }
return new DiskChanges(unmodifiableSet(normalUsers), unmodifiableSet(addedUsers), unmodifiableSet(remoteSet)); return new DiskChanges(unmodifiableSet(normalUsers), unmodifiableSet(addedUsers), unmodifiableSet(deletedUsers));
}).cache(); }).cache();
var removeObsoleteDiskSessions = diskChangesMono var removeObsoleteDiskSessions = diskChangesMono

View File

@ -1,21 +1,22 @@
package it.tdlight.reactiveapi; package it.tdlight.reactiveapi;
import static it.tdlight.reactiveapi.AuthPhase.*; import static it.tdlight.reactiveapi.AuthPhase.*;
import static java.util.Objects.requireNonNull;
import io.atomix.cluster.messaging.ClusterEventService; import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.core.Atomix; import io.atomix.core.Atomix;
import it.tdlight.common.ReactiveTelegramClient; import it.tdlight.common.ReactiveTelegramClient;
import it.tdlight.common.Response; import it.tdlight.common.Response;
import it.tdlight.common.Signal; import it.tdlight.common.Signal;
import it.tdlight.common.utils.LibraryVersion;
import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.CheckAuthenticationBotToken; import it.tdlight.jni.TdApi.CheckAuthenticationBotToken;
import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey; import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey;
import it.tdlight.jni.TdApi.Close;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Object; import it.tdlight.jni.TdApi.Object;
import it.tdlight.jni.TdApi.PhoneNumberAuthenticationSettings; import it.tdlight.jni.TdApi.PhoneNumberAuthenticationSettings;
import it.tdlight.jni.TdApi.SetAuthenticationPhoneNumber; import it.tdlight.jni.TdApi.SetAuthenticationPhoneNumber;
import it.tdlight.jni.TdApi.SetTdlibParameters; import it.tdlight.jni.TdApi.SetTdlibParameters;
import it.tdlight.jni.TdApi.TdlibParameters;
import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested; import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested;
import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested; import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested;
@ -42,14 +43,13 @@ import javax.annotation.Nullable;
import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.warp.commonutils.metrics.AtomicDetailedTimeAbsoluteSamples;
import reactor.core.Disposable; import reactor.core.Disposable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
public class ReactiveApiPublisher { public abstract class ReactiveApiPublisher {
private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class); private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class);
@ -60,24 +60,21 @@ public class ReactiveApiPublisher {
private final Flux<Signal> telegramClient; private final Flux<Signal> telegramClient;
private final AtomicReference<State> state = new AtomicReference<>(new State(LOGGED_OUT)); private final AtomicReference<State> state = new AtomicReference<>(new State(LOGGED_OUT));
private final long userId; protected final long userId;
private final long liveId; protected final long liveId;
private final String botToken;
private final Long phoneNumber;
private final AtomicReference<Disposable> disposable = new AtomicReference<>(); private final AtomicReference<Disposable> disposable = new AtomicReference<>();
private final AtomicReference<Path> path = new AtomicReference<>();
private ReactiveApiPublisher(Atomix atomix, long liveId, long userId, String botToken, Long phoneNumber) { private ReactiveApiPublisher(Atomix atomix, long liveId, long userId) {
this.userId = userId; this.userId = userId;
this.liveId = liveId; this.liveId = liveId;
this.botToken = botToken;
this.phoneNumber = phoneNumber;
this.rawTelegramClient = ClientManager.createReactive(); this.rawTelegramClient = ClientManager.createReactive();
this.telegramClient = Flux.<Signal>create(sink -> { this.telegramClient = Flux.<Signal>create(sink -> {
rawTelegramClient.createAndRegisterClient();
rawTelegramClient.setListener(sink::next); rawTelegramClient.setListener(sink::next);
sink.onCancel(rawTelegramClient::cancel); sink.onCancel(rawTelegramClient::cancel);
sink.onDispose(rawTelegramClient::dispose); sink.onDispose(rawTelegramClient::dispose);
rawTelegramClient.createAndRegisterClient();
this.registerTopics(); this.registerTopics();
}).share(); }).share();
@ -85,14 +82,15 @@ public class ReactiveApiPublisher {
} }
public static ReactiveApiPublisher fromToken(Atomix atomix, Long liveId, long userId, String token) { public static ReactiveApiPublisher fromToken(Atomix atomix, Long liveId, long userId, String token) {
return new ReactiveApiPublisher(atomix, liveId, userId, token, null); return new ReactiveApiPublisherToken(atomix, liveId, userId, token);
} }
public static ReactiveApiPublisher fromPhoneNumber(Atomix atomix, Long liveId, long userId, long phoneNumber) { public static ReactiveApiPublisher fromPhoneNumber(Atomix atomix, Long liveId, long userId, long phoneNumber) {
return new ReactiveApiPublisher(atomix, liveId, userId, null, phoneNumber); return new ReactiveApiPublisherPhoneNumber(atomix, liveId, userId, phoneNumber);
} }
public void start(Path path) { public void start(Path path) {
this.path.set(path);
LOG.info("Starting session \"{}\" in path \"{}\"", this, path); LOG.info("Starting session \"{}\" in path \"{}\"", this, path);
var publishedResultingEvents = telegramClient var publishedResultingEvents = telegramClient
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
@ -102,25 +100,27 @@ public class ReactiveApiPublisher {
publishedResultingEvents publishedResultingEvents
.filter(s -> s instanceof TDLibBoundResultingEvent<?>) .filter(s -> s instanceof TDLibBoundResultingEvent<?>)
.map(s -> ((TDLibBoundResultingEvent<?>) s).action()) .map(s -> ((TDLibBoundResultingEvent<?>) s).action())
.flatMap(function -> Mono .flatMapSequential(function -> Mono
.from(rawTelegramClient.send(function, SPECIAL_RAW_TIMEOUT_DURATION)) .from(rawTelegramClient.send(function, SPECIAL_RAW_TIMEOUT_DURATION))
.doOnNext(resp -> { .mapNotNull(resp -> {
if (resp.getConstructor() == TdApi.Error.CONSTRUCTOR) { if (resp.getConstructor() == TdApi.Error.CONSTRUCTOR) {
LOG.error("Received error for special request {}: {}", function, resp); LOG.error("Received error for special request {}: {}", function, resp);
return new OnUpdateError(liveId, userId, (TdApi.Error) resp);
} else {
return null;
} }
}) })
.doOnError(ex -> LOG.error("Failed to receive the response for special request {}", function, ex)) .doOnError(ex -> LOG.error("Failed to receive the response for special request {}", function, ex))
.onErrorResume(ex -> Mono.just(new OnUpdateError(liveId, userId, new TdApi.Error(500, ex.getMessage()))))
) )
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.subscribe(); .subscribe(this::sendClientBoundEvent);
publishedResultingEvents publishedResultingEvents
.filter(s -> s instanceof ClientBoundResultingEvent) .filter(s -> s instanceof ClientBoundResultingEvent)
.cast(ClientBoundResultingEvent.class) .cast(ClientBoundResultingEvent.class)
.map(ClientBoundResultingEvent::event)
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.subscribe(clientBoundResultingEvent -> eventService.broadcast("session-" + liveId + "-client-bound-events", .subscribe(this::sendClientBoundEvent);
clientBoundResultingEvent.event(),
ReactiveApiPublisher::serializeEvent
));
var prev = this.disposable.getAndSet(publishedResultingEvents.connect()); var prev = this.disposable.getAndSet(publishedResultingEvents.connect());
@ -130,6 +130,13 @@ public class ReactiveApiPublisher {
} }
} }
private void sendClientBoundEvent(ClientBoundEvent clientBoundResultingEvent) {
eventService.broadcast("session-" + liveId + "-client-bound-events",
clientBoundResultingEvent,
ReactiveApiPublisher::serializeEvent
);
}
@Nullable @Nullable
private ResultingEvent onSignal(Signal signal) { private ResultingEvent onSignal(Signal signal) {
// Update the state // Update the state
@ -169,7 +176,25 @@ public class ReactiveApiPublisher {
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
switch (updateAuthorizationState.authorizationState.getConstructor()) { switch (updateAuthorizationState.authorizationState.getConstructor()) {
case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> { case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> {
return new TDLibBoundResultingEvent<>(new SetTdlibParameters()); var tdlibParameters = new TdlibParameters();
var path = requireNonNull(this.path.get(), "Path must not be null");
tdlibParameters.databaseDirectory = path.resolve("database").toString();
tdlibParameters.apiId = 94575;
tdlibParameters.apiHash = "a3406de8d171bb422bb6ddf3bbd800e2";
tdlibParameters.filesDirectory = path.resolve("files").toString();
tdlibParameters.applicationVersion = it.tdlight.reactiveapi.generated.LibraryVersion.VERSION;
tdlibParameters.deviceModel = System.getProperty("os.name");
tdlibParameters.systemVersion = System.getProperty("os.version");
tdlibParameters.enableStorageOptimizer = true;
tdlibParameters.ignoreFileNames = true;
tdlibParameters.useTestDc = false;
tdlibParameters.useSecretChats = false;
tdlibParameters.useMessageDatabase = true;
tdlibParameters.useFileDatabase = true;
tdlibParameters.useChatInfoDatabase = true;
tdlibParameters.systemLanguageCode = System.getProperty("user.language", "en");
return new TDLibBoundResultingEvent<>(new SetTdlibParameters(tdlibParameters));
} }
} }
} }
@ -180,7 +205,7 @@ public class ReactiveApiPublisher {
case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> { case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> {
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
switch (updateAuthorizationState.authorizationState.getConstructor()) { switch (updateAuthorizationState.authorizationState.getConstructor()) {
case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> { case TdApi.AuthorizationStateWaitEncryptionKey.CONSTRUCTOR -> {
return new TDLibBoundResultingEvent<>(new CheckDatabaseEncryptionKey()); return new TDLibBoundResultingEvent<>(new CheckDatabaseEncryptionKey());
} }
} }
@ -193,7 +218,7 @@ public class ReactiveApiPublisher {
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
switch (updateAuthorizationState.authorizationState.getConstructor()) { switch (updateAuthorizationState.authorizationState.getConstructor()) {
case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR -> { case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR -> {
return new ClientBoundResultingEvent(new OnUserLoginCodeRequested(liveId, userId, phoneNumber)); return onWaitCode();
} }
case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR -> { case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR -> {
return new ClientBoundResultingEvent(new OnOtherDeviceLoginRequested(liveId, userId)); return new ClientBoundResultingEvent(new OnOtherDeviceLoginRequested(liveId, userId));
@ -202,17 +227,7 @@ public class ReactiveApiPublisher {
return new ClientBoundResultingEvent(new OnPasswordRequested(liveId, userId)); return new ClientBoundResultingEvent(new OnPasswordRequested(liveId, userId));
} }
case TdApi.AuthorizationStateWaitPhoneNumber.CONSTRUCTOR -> { case TdApi.AuthorizationStateWaitPhoneNumber.CONSTRUCTOR -> {
if (botToken != null) { return onWaitToken();
return new TDLibBoundResultingEvent<>(new CheckAuthenticationBotToken(botToken));
} else {
var authSettings = new PhoneNumberAuthenticationSettings();
authSettings.allowFlashCall = false;
authSettings.allowSmsRetrieverApi = false;
authSettings.isCurrentPhoneNumber = false;
return new TDLibBoundResultingEvent<>(new SetAuthenticationPhoneNumber("+" + phoneNumber,
authSettings
));
}
} }
} }
} }
@ -222,6 +237,13 @@ public class ReactiveApiPublisher {
return null; return null;
} }
protected abstract ResultingEvent onWaitToken();
protected ResultingEvent onWaitCode() {
LOG.error("Wait code event is not supported");
return null;
}
private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) { private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) {
try (var byteArrayOutputStream = new ByteArrayOutputStream()) { try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
@ -309,10 +331,67 @@ public class ReactiveApiPublisher {
return new StringJoiner(", ", ReactiveApiPublisher.class.getSimpleName() + "[", "]") return new StringJoiner(", ", ReactiveApiPublisher.class.getSimpleName() + "[", "]")
.add("userId=" + userId) .add("userId=" + userId)
.add("liveId=" + liveId) .add("liveId=" + liveId)
.add("botToken='" + botToken + "'")
.add("phoneNumber=" + phoneNumber)
.toString(); .toString();
} }
private record RequestWithTimeoutInstant<T extends TdApi.Object>(TdApi.Function<T> request, Instant timeout) {} private record RequestWithTimeoutInstant<T extends TdApi.Object>(TdApi.Function<T> request, Instant timeout) {}
private static class ReactiveApiPublisherToken extends ReactiveApiPublisher {
private final String botToken;
public ReactiveApiPublisherToken(Atomix atomix, Long liveId, long userId, String botToken) {
super(atomix, liveId, userId);
this.botToken = botToken;
}
@Override
protected ResultingEvent onWaitToken() {
return new TDLibBoundResultingEvent<>(new CheckAuthenticationBotToken(botToken));
}
@Override
public String toString() {
return new StringJoiner(", ", ReactiveApiPublisherToken.class.getSimpleName() + "[", "]")
.add("userId=" + userId)
.add("liveId=" + liveId)
.add("token='" + botToken + "'")
.toString();
}
}
private static class ReactiveApiPublisherPhoneNumber extends ReactiveApiPublisher {
private final long phoneNumber;
public ReactiveApiPublisherPhoneNumber(Atomix atomix, Long liveId, long userId, long phoneNumber) {
super(atomix, liveId, userId);
this.phoneNumber = phoneNumber;
}
@Override
protected ResultingEvent onWaitToken() {
var authSettings = new PhoneNumberAuthenticationSettings();
authSettings.allowFlashCall = false;
authSettings.allowSmsRetrieverApi = false;
authSettings.isCurrentPhoneNumber = false;
return new TDLibBoundResultingEvent<>(new SetAuthenticationPhoneNumber("+" + phoneNumber,
authSettings
));
}
@Override
public ClientBoundResultingEvent onWaitCode() {
return new ClientBoundResultingEvent(new OnUserLoginCodeRequested(liveId, userId, phoneNumber));
}
@Override
public String toString() {
return new StringJoiner(", ", ReactiveApiPublisherPhoneNumber.class.getSimpleName() + "[", "]")
.add("userId=" + userId)
.add("liveId=" + liveId)
.add("phoneNumber=" + phoneNumber)
.toString();
}
}
} }