diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index e49e941..8e0c9a0 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -11,6 +11,7 @@ import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.CheckAuthenticationBotToken; import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey; import it.tdlight.jni.TdApi.Close; +import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Object; import it.tdlight.jni.TdApi.PhoneNumberAuthenticationSettings; import it.tdlight.jni.TdApi.SetAuthenticationPhoneNumber; @@ -23,6 +24,8 @@ import it.tdlight.reactiveapi.Event.OnUpdateData; import it.tdlight.reactiveapi.Event.OnUpdateError; import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested; import it.tdlight.reactiveapi.Event.Request; +import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent; +import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; import it.tdlight.tdlight.ClientManager; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -35,11 +38,15 @@ import java.time.Instant; import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; import org.apache.commons.lang3.SerializationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Schedulers; public class ReactiveApiPublisher { @@ -52,12 +59,15 @@ public class ReactiveApiPublisher { private final ClusterEventService eventService; private final ReactiveTelegramClient rawTelegramClient; private final Flux telegramClient; + private final AtomicReference state = new AtomicReference<>(new State(LOGGED_OUT)); private final long userId; private final long liveId; private final String botToken; private final Long phoneNumber; + private AtomicReference disposable = new AtomicReference<>(); + private ReactiveApiPublisher(Atomix atomix, long liveId, long userId, String botToken, Long phoneNumber) { this.atomix = atomix; this.userId = userId; @@ -86,36 +96,70 @@ public class ReactiveApiPublisher { public void start(Path path) { LOG.info("Starting session \"{}\" in path \"{}\"", this, path); - telegramClient.subscribeOn(Schedulers.parallel()).subscribe(this::onSignal); + var publishedResultingEvents = telegramClient + .subscribeOn(Schedulers.parallel()) + .mapNotNull(this::onSignal) + .publish(); + + publishedResultingEvents + .filter(s -> s instanceof TDLibBoundResultingEvent) + .map(s -> ((TDLibBoundResultingEvent) s).action()) + .flatMap(function -> Mono + .from(rawTelegramClient.send(function, SPECIAL_RAW_TIMEOUT_DURATION)) + .doOnNext(resp -> { + if (resp.getConstructor() == TdApi.Error.CONSTRUCTOR) { + LOG.error("Received error for special request {}: {}", function, resp); + } + }) + .doOnError(ex -> LOG.error("Failed to receive the response for special request {}", function, ex)) + ) + .subscribeOn(Schedulers.parallel()) + .subscribe(); + publishedResultingEvents + .filter(s -> s instanceof ClientBoundResultingEvent) + .cast(ClientBoundResultingEvent.class) + .subscribeOn(Schedulers.parallel()) + .subscribe(clientBoundResultingEvent -> eventService.broadcast("session-" + liveId + "-clientbound-events", + clientBoundResultingEvent.event(), + ReactiveApiPublisher::serializeEvent + )); + + + var prev = this.disposable.getAndSet(publishedResultingEvents.connect()); + if (prev != null) { + LOG.error("The API started twice!"); + prev.dispose(); + } } - private void onSignal(Signal signal) { + @Nullable + private ResultingEvent onSignal(Signal signal) { // Update the state var state = this.state.updateAndGet(oldState -> oldState.withSignal(signal)); if (state.authPhase() == LOGGED_IN) { var update = (TdApi.Update) signal.getUpdate(); - var event = new OnUpdateData(liveId, userId, update); - sendEvent(event); + return new ClientBoundResultingEvent(new OnUpdateData(liveId, userId, update)); } else { LOG.trace("Signal has not been broadcasted because the session {} is not logged in: {}", userId, signal); - this.handleSpecialSignal(state, signal); + return this.handleSpecialSignal(state, signal); } } - private void handleSpecialSignal(State state, Signal signal) { + @Nullable + private ResultingEvent handleSpecialSignal(State state, Signal signal) { if (signal.isException()) { LOG.error("Received an error signal", signal.getException()); - return; + return null; } if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) { var error = ((TdApi.Error) signal.getUpdate()); LOG.error("Received a TDLib error signal! Error {}: {}", error.code, error.message); - return; + return null; } if (!signal.isUpdate()) { LOG.error("Received a signal that's not an update: {}", signal); - return; + return null; } var update = signal.getUpdate(); switch (state.authPhase()) { @@ -125,7 +169,9 @@ public class ReactiveApiPublisher { case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> { var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; switch (updateAuthorizationState.authorizationState.getConstructor()) { - case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> sendSpecialRaw(new SetTdlibParameters()); + case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> { + return new TDLibBoundResultingEvent<>(new SetTdlibParameters()); + } } } } @@ -135,8 +181,9 @@ public class ReactiveApiPublisher { case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> { var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; switch (updateAuthorizationState.authorizationState.getConstructor()) { - case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> - sendSpecialRaw(new CheckDatabaseEncryptionKey()); + case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> { + return new TDLibBoundResultingEvent<>(new CheckDatabaseEncryptionKey()); + } } } } @@ -146,21 +193,26 @@ public class ReactiveApiPublisher { case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> { var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; switch (updateAuthorizationState.authorizationState.getConstructor()) { - case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR -> - sendEvent(new OnUserLoginCodeRequested(liveId, userId, phoneNumber)); - case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR -> - sendEvent(new OnOtherDeviceLoginRequested(liveId, userId)); - case TdApi.AuthorizationStateWaitPassword.CONSTRUCTOR -> - sendEvent(new OnPasswordRequested(liveId, userId)); + case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR -> { + return new ClientBoundResultingEvent(new OnUserLoginCodeRequested(liveId, userId, phoneNumber)); + } + case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR -> { + return new ClientBoundResultingEvent(new OnOtherDeviceLoginRequested(liveId, userId)); + } + case TdApi.AuthorizationStateWaitPassword.CONSTRUCTOR -> { + return new ClientBoundResultingEvent(new OnPasswordRequested(liveId, userId)); + } case TdApi.AuthorizationStateWaitPhoneNumber.CONSTRUCTOR -> { if (botToken != null) { - sendSpecialRaw(new CheckAuthenticationBotToken(botToken)); + return new TDLibBoundResultingEvent<>(new CheckAuthenticationBotToken(botToken)); } else { var authSettings = new PhoneNumberAuthenticationSettings(); authSettings.allowFlashCall = false; authSettings.allowSmsRetrieverApi = false; authSettings.isCurrentPhoneNumber = false; - sendSpecialRaw(new SetAuthenticationPhoneNumber("+" + phoneNumber, authSettings)); + return new TDLibBoundResultingEvent<>(new SetAuthenticationPhoneNumber("+" + phoneNumber, + authSettings + )); } } } @@ -168,24 +220,7 @@ public class ReactiveApiPublisher { } } } - } - - private void sendEvent(ClientBoundEvent clientBoundEvent) { - eventService.broadcast("session-" + liveId + "-clientbound-events", - clientBoundEvent, - ReactiveApiPublisher::serializeEvent - ); - } - - private void sendSpecialRaw(TdApi.Function function) { - Mono - .from(rawTelegramClient.send(function, SPECIAL_RAW_TIMEOUT_DURATION)) - .subscribeOn(Schedulers.parallel()) - .subscribe(resp -> { - if (resp.getConstructor() == TdApi.Error.CONSTRUCTOR) { - LOG.error("Received error for special request {}: {}", function, resp); - } - }, ex -> LOG.error("Failed to receive the response for special request {}", function, ex)); + return null; } private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) { diff --git a/src/main/java/it/tdlight/reactiveapi/ResultingEvent.java b/src/main/java/it/tdlight/reactiveapi/ResultingEvent.java new file mode 100644 index 0000000..146b1e8 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ResultingEvent.java @@ -0,0 +1,13 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.jni.TdApi; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent; +import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; + +public sealed interface ResultingEvent permits ClientBoundResultingEvent, TDLibBoundResultingEvent { + + record ClientBoundResultingEvent(ClientBoundEvent event) implements ResultingEvent {} + + record TDLibBoundResultingEvent(TdApi.Function action) implements ResultingEvent {} +}