diff --git a/pom.xml b/pom.xml index 430f275..3dff675 100644 --- a/pom.xml +++ b/pom.xml @@ -11,6 +11,7 @@ 0-SNAPSHOT 3.0.6 + 29 @@ -177,6 +178,12 @@ net.minecrell terminalconsoleappender + + io.soabase.record-builder + record-builder-processor + ${record.builder.version} + provided + @@ -210,10 +217,20 @@ org.apache.maven.plugins maven-compiler-plugin 3.8.1 - - 17 - false - + + 17 + 17 + + + io.soabase.record-builder + record-builder-processor + ${record.builder.version} + + + + io.soabase.recordbuilder.processor.RecordBuilderProcessor + + org.apache.maven.plugins diff --git a/src/main/java/it/tdlight/reactiveapi/AuthPhase.java b/src/main/java/it/tdlight/reactiveapi/AuthPhase.java new file mode 100644 index 0000000..81d771c --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/AuthPhase.java @@ -0,0 +1,14 @@ +package it.tdlight.reactiveapi; + +public enum AuthPhase { + LOGGED_OUT, + PARAMETERS_PHASE, + ENCRYPTION_PHASE, + AUTH_PHASE, + LOGGED_IN, + LOGGING_OUT, + /** + * Similar to {@link #LOGGED_OUT}, but it can't be recovered + */ + BROKEN +} diff --git a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java index 3dc481c..b3f1c96 100644 --- a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java +++ b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java @@ -100,15 +100,6 @@ public class Entrypoint { .withDataDirectory(Paths.get(".data-" + instanceSettings.id).toFile()) .build()); - /*atomixBuilder.withPartitionGroups(RaftPartitionGroup - .builder("raft") - .withNumPartitions(3) - .withFlushOnCommit() - .withStorageLevel(StorageLevel.MAPPED) - .withDataDirectory(Paths.get(".data-" + instanceSettings.id).toFile()) - .build()); - */ - atomixBuilder.withShutdownHook(false); atomixBuilder.withTypeRegistrationRequired(); @@ -119,8 +110,7 @@ public class Entrypoint { var profCfg = (ConsensusProfileConfig) prof.config(); //profCfg.setDataGroup("raft"); profCfg.setDataPath(".data-" + instanceSettings.id); - //profCfg.setPartitions(3); - //profCfg.setManagementGroup("system"); + profCfg.setPartitions(3); atomixBuilder.addProfile(prof); //atomixBuilder.addProfile(Profile.dataGrid(32)); } diff --git a/src/main/java/it/tdlight/reactiveapi/Event.java b/src/main/java/it/tdlight/reactiveapi/Event.java index dde248c..a8477b8 100644 --- a/src/main/java/it/tdlight/reactiveapi/Event.java +++ b/src/main/java/it/tdlight/reactiveapi/Event.java @@ -1,23 +1,31 @@ package it.tdlight.reactiveapi; import it.tdlight.jni.TdApi; -import it.tdlight.reactiveapi.Event.AuthenticatedEvent; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import it.tdlight.reactiveapi.Event.ServerBoundEvent; +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import org.apache.commons.lang3.SerializationException; /** * Any event received from a session */ -public sealed interface Event permits AuthenticatedEvent { +public sealed interface Event permits ClientBoundEvent, ServerBoundEvent { /** * * @return temporary unique identifier of the session */ - long sessionId(); + long liveId(); /** * Event received after choosing the user id of the session */ - sealed interface AuthenticatedEvent extends Event permits OnLoginCodeRequested, OnUpdate { + sealed interface ClientBoundEvent extends Event permits OnLoginCodeRequested, OnOtherDeviceLoginRequested, + OnPasswordRequested, OnUpdate { /** * @@ -26,22 +34,45 @@ public sealed interface Event permits AuthenticatedEvent { long userId(); } + sealed interface ServerBoundEvent extends Event permits Request {} + /** * TDLib is asking for an authorization code */ - sealed interface OnLoginCodeRequested extends AuthenticatedEvent + sealed interface OnLoginCodeRequested extends ClientBoundEvent permits OnBotLoginCodeRequested, OnUserLoginCodeRequested {} - final record OnUserLoginCodeRequested(long sessionId, long userId, long phoneNumber) implements OnLoginCodeRequested {} + record OnUserLoginCodeRequested(long liveId, long userId, long phoneNumber) implements OnLoginCodeRequested {} - final record OnBotLoginCodeRequested(long sessionId, long userId, String token) implements OnLoginCodeRequested {} + record OnBotLoginCodeRequested(long liveId, long userId, String token) implements OnLoginCodeRequested {} + + record OnOtherDeviceLoginRequested(long liveId, long userId) implements ClientBoundEvent {} + + record OnPasswordRequested(long liveId, long userId) implements ClientBoundEvent {} /** * Event received from TDLib */ - sealed interface OnUpdate extends AuthenticatedEvent permits OnUpdateData, OnUpdateError {} + sealed interface OnUpdate extends ClientBoundEvent permits OnUpdateData, OnUpdateError {} - final record OnUpdateData(long sessionId, long userId, TdApi.Update update) implements OnUpdate {} + record OnUpdateData(long liveId, long userId, TdApi.Update update) implements OnUpdate {} - final record OnUpdateError(long sessionId, long userId, TdApi.Error error) implements OnUpdate {} + record OnUpdateError(long liveId, long userId, TdApi.Error error) implements OnUpdate {} + + record Request(long liveId, TdApi.Function request, Instant timeout) implements + ServerBoundEvent { + + public static Request deserialize(DataInput dataInput) { + try { + var liveId = dataInput.readLong(); + @SuppressWarnings("unchecked") + TdApi.Function request = (TdApi.Function) TdApi.Deserializer.deserialize(dataInput); + long millis = dataInput.readLong(); + var timeout = Instant.ofEpochMilli(millis); + return new Request<>(liveId, request, timeout); + } catch (IOException e) { + throw new SerializationException(e); + } + } + } } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 8a72f17..e49e941 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -1,19 +1,58 @@ package it.tdlight.reactiveapi; +import static it.tdlight.reactiveapi.AuthPhase.*; + +import io.atomix.cluster.messaging.ClusterEventService; import io.atomix.core.Atomix; +import it.tdlight.common.ReactiveTelegramClient; +import it.tdlight.common.Response; +import it.tdlight.common.Signal; +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.CheckAuthenticationBotToken; +import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey; +import it.tdlight.jni.TdApi.Close; +import it.tdlight.jni.TdApi.Object; +import it.tdlight.jni.TdApi.PhoneNumberAuthenticationSettings; +import it.tdlight.jni.TdApi.SetAuthenticationPhoneNumber; +import it.tdlight.jni.TdApi.SetTdlibParameters; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested; +import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested; +import it.tdlight.reactiveapi.Event.OnPasswordRequested; +import it.tdlight.reactiveapi.Event.OnUpdateData; +import it.tdlight.reactiveapi.Event.OnUpdateError; +import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested; +import it.tdlight.reactiveapi.Event.Request; +import it.tdlight.tdlight.ClientManager; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; import java.util.StringJoiner; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.SerializationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class ReactiveApiPublisher { private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class); - private static final SchedulerExecutor SCHEDULER_EXECUTOR = new SchedulerExecutor(Schedulers.boundedElastic()); + private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofSeconds(10); private final Atomix atomix; + private final ClusterEventService eventService; + private final ReactiveTelegramClient rawTelegramClient; + private final Flux telegramClient; + private final AtomicReference state = new AtomicReference<>(new State(LOGGED_OUT)); private final long userId; private final long liveId; private final String botToken; @@ -25,6 +64,16 @@ public class ReactiveApiPublisher { this.liveId = liveId; this.botToken = botToken; this.phoneNumber = phoneNumber; + this.rawTelegramClient = ClientManager.createReactive(); + this.telegramClient = Flux.create(sink -> { + rawTelegramClient.setListener(sink::next); + sink.onCancel(rawTelegramClient::cancel); + sink.onDispose(rawTelegramClient::dispose); + rawTelegramClient.createAndRegisterClient(); + + this.registerTopics(); + }).share(); + this.eventService = atomix.getEventService(); } public static ReactiveApiPublisher fromToken(Atomix atomix, Long liveId, long userId, String token) { @@ -37,6 +86,188 @@ public class ReactiveApiPublisher { public void start(Path path) { LOG.info("Starting session \"{}\" in path \"{}\"", this, path); + telegramClient.subscribeOn(Schedulers.parallel()).subscribe(this::onSignal); + } + + private void onSignal(Signal signal) { + // Update the state + var state = this.state.updateAndGet(oldState -> oldState.withSignal(signal)); + + if (state.authPhase() == LOGGED_IN) { + var update = (TdApi.Update) signal.getUpdate(); + var event = new OnUpdateData(liveId, userId, update); + sendEvent(event); + } else { + LOG.trace("Signal has not been broadcasted because the session {} is not logged in: {}", userId, signal); + this.handleSpecialSignal(state, signal); + } + } + + private void handleSpecialSignal(State state, Signal signal) { + if (signal.isException()) { + LOG.error("Received an error signal", signal.getException()); + return; + } + if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) { + var error = ((TdApi.Error) signal.getUpdate()); + LOG.error("Received a TDLib error signal! Error {}: {}", error.code, error.message); + return; + } + if (!signal.isUpdate()) { + LOG.error("Received a signal that's not an update: {}", signal); + return; + } + var update = signal.getUpdate(); + switch (state.authPhase()) { + case BROKEN -> {} + case PARAMETERS_PHASE -> { + switch (update.getConstructor()) { + case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> { + var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; + switch (updateAuthorizationState.authorizationState.getConstructor()) { + case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> sendSpecialRaw(new SetTdlibParameters()); + } + } + } + } + case ENCRYPTION_PHASE -> { + switch (update.getConstructor()) { + case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> { + var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; + switch (updateAuthorizationState.authorizationState.getConstructor()) { + case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> + sendSpecialRaw(new CheckDatabaseEncryptionKey()); + } + } + } + } + case AUTH_PHASE -> { + switch (update.getConstructor()) { + case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> { + var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; + switch (updateAuthorizationState.authorizationState.getConstructor()) { + case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR -> + sendEvent(new OnUserLoginCodeRequested(liveId, userId, phoneNumber)); + case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR -> + sendEvent(new OnOtherDeviceLoginRequested(liveId, userId)); + case TdApi.AuthorizationStateWaitPassword.CONSTRUCTOR -> + sendEvent(new OnPasswordRequested(liveId, userId)); + case TdApi.AuthorizationStateWaitPhoneNumber.CONSTRUCTOR -> { + if (botToken != null) { + sendSpecialRaw(new CheckAuthenticationBotToken(botToken)); + } else { + var authSettings = new PhoneNumberAuthenticationSettings(); + authSettings.allowFlashCall = false; + authSettings.allowSmsRetrieverApi = false; + authSettings.isCurrentPhoneNumber = false; + sendSpecialRaw(new SetAuthenticationPhoneNumber("+" + phoneNumber, authSettings)); + } + } + } + } + } + } + } + } + + private void sendEvent(ClientBoundEvent clientBoundEvent) { + eventService.broadcast("session-" + liveId + "-clientbound-events", + clientBoundEvent, + ReactiveApiPublisher::serializeEvent + ); + } + + private void sendSpecialRaw(TdApi.Function function) { + Mono + .from(rawTelegramClient.send(function, SPECIAL_RAW_TIMEOUT_DURATION)) + .subscribeOn(Schedulers.parallel()) + .subscribe(resp -> { + if (resp.getConstructor() == TdApi.Error.CONSTRUCTOR) { + LOG.error("Received error for special request {}: {}", function, resp); + } + }, ex -> LOG.error("Failed to receive the response for special request {}", function, ex)); + } + + private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) { + try (var baos = new ByteArrayOutputStream()) { + try (var daos = new DataOutputStream(baos)) { + if (clientBoundEvent instanceof OnUpdateData onUpdateData) { + daos.write(0x1); + onUpdateData.update().serialize(daos); + } else if (clientBoundEvent instanceof OnUpdateError onUpdateError) { + daos.write(0x2); + onUpdateError.error().serialize(daos); + } else if (clientBoundEvent instanceof OnUserLoginCodeRequested onUserLoginCodeRequested) { + daos.write(0x3); + daos.writeLong(onUserLoginCodeRequested.phoneNumber()); + } else if (clientBoundEvent instanceof OnBotLoginCodeRequested onBotLoginCodeRequested) { + daos.write(0x4); + daos.writeUTF(onBotLoginCodeRequested.token()); + } else { + throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent); + } + return baos.toByteArray(); + } + } catch (IOException ex) { + throw new SerializationException(ex); + } + } + + private void registerTopics() { + // Start receiving requests + eventService.subscribe("session-" + liveId + "-requests", + ReactiveApiPublisher::deserializeRequest, + this::handleRequest, + ReactiveApiPublisher::serializeResponse); + } + + private static byte[] serializeResponse(Response response) { + var id = response.getId(); + var object = response.getObject(); + try (var baos = new ByteArrayOutputStream()) { + try (var daos = new DataOutputStream(baos)) { + daos.writeLong(id); + object.serialize(daos); + return baos.toByteArray(); + } + } catch (IOException ex) { + throw new SerializationException(ex); + } + } + + private CompletableFuture handleRequest(Request requestObj) { + return Mono + .just(requestObj) + .filter(req -> { + if (userId != req.liveId()) { + LOG.error("Received a request for another session!"); + return false; + } else { + return true; + } + }) + .map(req -> new RequestWithTimeoutInstant<>(req.request(), req.timeout())) + .flatMap(requestWithTimeoutInstant -> { + var state = this.state.get(); + if (state.authPhase() == LOGGED_IN) { + var request = requestWithTimeoutInstant.request(); + var timeoutDuration = Duration.between(Instant.now(), requestWithTimeoutInstant.timeout()); + if (timeoutDuration.isZero() || timeoutDuration.isNegative()) { + LOG.error("Received an expired request. Expiration: {}", requestWithTimeoutInstant.timeout()); + } + + return Mono.from(rawTelegramClient.send(request, timeoutDuration)); + } else { + LOG.error("Ignored a request because the current state is {}", state); + return Mono.empty(); + } + }) + .map(responseObj -> new Response(liveId, responseObj)) + .toFuture(); + } + + private static Request deserializeRequest(byte[] bytes) { + return Request.deserialize(new DataInputStream(new ByteArrayInputStream(bytes))); } @Override @@ -48,4 +279,6 @@ public class ReactiveApiPublisher { .add("phoneNumber=" + phoneNumber) .toString(); } + + private record RequestWithTimeoutInstant(TdApi.Function request, Instant timeout) {} } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiUpdate.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiUpdate.java deleted file mode 100644 index 914c1d4..0000000 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiUpdate.java +++ /dev/null @@ -1,8 +0,0 @@ -package it.tdlight.reactiveapi; - -import it.tdlight.jni.TdApi; - -/** - * {@link #sessionUuid} changes every time a session is restarted - */ -public record ReactiveApiUpdate(long sessionUuid, TdApi.Object update) {} diff --git a/src/main/java/it/tdlight/reactiveapi/State.java b/src/main/java/it/tdlight/reactiveapi/State.java new file mode 100644 index 0000000..a961c3c --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/State.java @@ -0,0 +1,138 @@ +package it.tdlight.reactiveapi; + +import static it.tdlight.reactiveapi.AuthPhase.AUTH_PHASE; +import static it.tdlight.reactiveapi.AuthPhase.BROKEN; +import static it.tdlight.reactiveapi.AuthPhase.ENCRYPTION_PHASE; +import static it.tdlight.reactiveapi.AuthPhase.LOGGED_IN; +import static it.tdlight.reactiveapi.AuthPhase.LOGGED_OUT; +import static it.tdlight.reactiveapi.AuthPhase.LOGGING_OUT; +import static it.tdlight.reactiveapi.AuthPhase.PARAMETERS_PHASE; + +import io.soabase.recordbuilder.core.RecordBuilder; +import it.tdlight.common.Signal; +import it.tdlight.jni.TdApi; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RecordBuilder +public record State(AuthPhase authPhase) implements StateBuilder.With { + + private static final Logger LOG = LoggerFactory.getLogger(State.class); + + public State withSignal(Signal signal) { + var newState = this; + + // Mark state as broken if the connection is errored unexpectedly + if (signal.isException()) { + newState = newState.withAuthPhase(BROKEN); + } + + newState = switch (newState.authPhase) { + // Mark state as broken if the connection is terminated unexpectedly + case PARAMETERS_PHASE, ENCRYPTION_PHASE, AUTH_PHASE, LOGGED_IN -> { + if (signal.isClosed()) { + yield newState.withAuthPhase(BROKEN); + } else { + yield newState; + } + } + case LOGGING_OUT -> { + // Mark state as logged out if the connection is terminated successfully + if (signal.isClosed()) { + yield newState.withAuthPhase(LOGGED_OUT); + } else { + yield newState; + } + } + default -> newState; + }; + + if (newState.authPhase != BROKEN && signal.isUpdate()) { + var update = signal.getUpdate(); + newState = switch (update.getConstructor()) { + // Forcefully logout if the update stream fails + case TdApi.Error.CONSTRUCTOR -> newState.withAuthPhase(LOGGED_OUT); + case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> { + var updateAuthState = (TdApi.UpdateAuthorizationState) update; + yield switch (updateAuthState.authorizationState.getConstructor()) { + case TdApi.AuthorizationStateClosing.CONSTRUCTOR -> { + if (newState.authPhase != LOGGED_IN) { + LOG.warn("Logging out, but the current auth phase is {} instead of {}", + newState.authPhase, + Set.of(LOGGED_IN) + ); + } + yield newState.withAuthPhase(LOGGING_OUT); + } + case TdApi.AuthorizationStateClosed.CONSTRUCTOR -> { + if (newState.authPhase != LOGGING_OUT) { + LOG.warn("Logged out, but the current auth phase is {} instead of {}", + newState.authPhase, + Set.of(LOGGING_OUT) + ); + } + yield newState.withAuthPhase(LOGGED_OUT); + } + case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> { + if (newState.authPhase != LOGGED_OUT) { + LOG.warn("Waiting parameters, but the current auth phase is {} instead of {}", + newState.authPhase, + Set.of(LOGGED_OUT) + ); + } + yield newState.withAuthPhase(PARAMETERS_PHASE); + } + case TdApi.AuthorizationStateWaitEncryptionKey.CONSTRUCTOR -> { + if (newState.authPhase != PARAMETERS_PHASE) { + LOG.warn("Waiting parameters, but the current auth phase is {} instead of {}", + newState.authPhase, + Set.of(PARAMETERS_PHASE) + ); + } + yield newState.withAuthPhase(ENCRYPTION_PHASE); + } + case TdApi.AuthorizationStateWaitPhoneNumber.CONSTRUCTOR, + TdApi.AuthorizationStateWaitRegistration.CONSTRUCTOR, + TdApi.AuthorizationStateWaitCode.CONSTRUCTOR, + TdApi.AuthorizationStateWaitPassword.CONSTRUCTOR, + TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR -> { + if (newState.authPhase != ENCRYPTION_PHASE && newState.authPhase != AUTH_PHASE) { + LOG.warn( + "Waiting for authentication, but the current auth phase is {} instead of {}", + newState.authPhase, + Set.of(ENCRYPTION_PHASE, AUTH_PHASE) + ); + } + yield newState.withAuthPhase(AUTH_PHASE); + } + case TdApi.AuthorizationStateReady.CONSTRUCTOR -> { + if (newState.authPhase != ENCRYPTION_PHASE && newState.authPhase != AUTH_PHASE) { + LOG.warn("Logged in, but the current auth phase is {} instead of {}", + newState.authPhase, + Set.of(ENCRYPTION_PHASE, AUTH_PHASE) + ); + } + yield newState.withAuthPhase(LOGGED_IN); + } + case TdApi.AuthorizationStateLoggingOut.CONSTRUCTOR -> { + if (newState.authPhase != LOGGED_IN) { + LOG.warn("Logged in, but the current auth phase is {} instead of {}", + newState.authPhase, + Set.of(LOGGED_IN) + ); + } + yield newState.withAuthPhase(LOGGING_OUT); + } + default -> { + LOG.error("Unknown authorization state: {}", updateAuthState.authorizationState); + yield newState; + } + }; + } + default -> newState; + }; + } + return newState; + } +}