From 5b9fec980ec0637625b90dedf6695893feca120f Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 9 Jan 2022 18:27:14 +0100 Subject: [PATCH] Implement event transformers --- .../java/it/tdlight/reactiveapi/Address.java | 27 +++ .../reactiveapi/AtomixReactiveApi.java | 32 +++- src/main/java/it/tdlight/reactiveapi/Cli.java | 26 +++ .../it/tdlight/reactiveapi/Entrypoint.java | 37 +++- .../java/it/tdlight/reactiveapi/Event.java | 5 +- .../tdlight/reactiveapi/InstanceSettings.java | 11 +- .../LiveAtomixReactiveApiClient.java | 22 +-- .../reactiveapi/ReactiveApiPublisher.java | 165 +++++++++++++----- .../ResultingEventTransformer.java | 9 + .../transformer/CommandLineAuth.java | 54 ++++++ .../transformer/DefaultOptions.java | 69 ++++++++ .../transformer/DisableChatDatabase.java | 25 +++ .../transformer/DisableFileDatabase.java | 25 +++ .../transformer/DisableMessageDatabase.java | 25 +++ .../DummyResultingEventTransformer.java | 13 ++ .../transformer/TdlightDefaultOptions.java | 64 +++++++ src/main/resources/log4j2.xml | 4 +- 17 files changed, 548 insertions(+), 65 deletions(-) create mode 100644 src/main/java/it/tdlight/reactiveapi/Address.java create mode 100644 src/main/java/it/tdlight/reactiveapi/ResultingEventTransformer.java create mode 100644 src/main/java/it/tdlight/reactiveapi/transformer/CommandLineAuth.java create mode 100644 src/main/java/it/tdlight/reactiveapi/transformer/DefaultOptions.java create mode 100644 src/main/java/it/tdlight/reactiveapi/transformer/DisableChatDatabase.java create mode 100644 src/main/java/it/tdlight/reactiveapi/transformer/DisableFileDatabase.java create mode 100644 src/main/java/it/tdlight/reactiveapi/transformer/DisableMessageDatabase.java create mode 100644 src/main/java/it/tdlight/reactiveapi/transformer/DummyResultingEventTransformer.java create mode 100644 src/main/java/it/tdlight/reactiveapi/transformer/TdlightDefaultOptions.java diff --git a/src/main/java/it/tdlight/reactiveapi/Address.java b/src/main/java/it/tdlight/reactiveapi/Address.java new file mode 100644 index 0000000..91f45d2 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/Address.java @@ -0,0 +1,27 @@ +package it.tdlight.reactiveapi; + +import java.util.Arrays; + +public record Address(String host, int port) { + public Address { + if (host.isBlank()) { + throw new IllegalArgumentException("Host is blank"); + } + if (port < 0) { + throw new IndexOutOfBoundsException(port); + } + if (port >= 65536) { + throw new IndexOutOfBoundsException(port); + } + } + + public static Address fromString(String address) { + var parts = address.split(":"); + if (parts.length < 2) { + throw new IllegalArgumentException("Malformed client address, it must have a port (host:port)"); + } + var host = String.join(":", Arrays.copyOf(parts, parts.length - 1)); + var port = Integer.parseUnsignedInt(parts[parts.length - 1]); + return new Address(host, port); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index 77f05d5..e3142ac 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -31,6 +31,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,7 @@ public class AtomixReactiveApi implements ReactiveApi { @Nullable private final String nodeId; private final Atomix atomix; + private final Set resultingEventTransformerSet; private final AsyncAtomicIdGenerator nextSessionLiveId; private final AsyncAtomicLock sessionModificationLock; @@ -62,9 +64,13 @@ public class AtomixReactiveApi implements ReactiveApi { @Nullable private final DiskSessionsManager diskSessions; - public AtomixReactiveApi(@Nullable String nodeId, Atomix atomix, @Nullable DiskSessionsManager diskSessions) { + public AtomixReactiveApi(@Nullable String nodeId, + Atomix atomix, + @Nullable DiskSessionsManager diskSessions, + @NotNull Set resultingEventTransformerSet) { this.nodeId = nodeId; this.atomix = atomix; + this.resultingEventTransformerSet = resultingEventTransformerSet; if (nodeId == null) { if (diskSessions != null) { @@ -248,22 +254,38 @@ public class AtomixReactiveApi implements ReactiveApi { userId = createBotSessionRequest.userId(); botToken = createBotSessionRequest.token(); phoneNumber = null; - reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken); + reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, resultingEventTransformerSet, + liveId, + userId, + botToken + ); } else if (req instanceof CreateUserSessionRequest createUserSessionRequest) { loadedFromDisk = false; userId = createUserSessionRequest.userId(); botToken = null; phoneNumber = createUserSessionRequest.phoneNumber(); - reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber); + reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, resultingEventTransformerSet, + liveId, + userId, + phoneNumber + ); } else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) { loadedFromDisk = true; userId = loadSessionFromDiskRequest.userId(); botToken = loadSessionFromDiskRequest.token(); phoneNumber = loadSessionFromDiskRequest.phoneNumber(); if (loadSessionFromDiskRequest.phoneNumber() != null) { - reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber); + reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, resultingEventTransformerSet, + liveId, + userId, + phoneNumber + ); } else { - reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken); + reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, resultingEventTransformerSet, + liveId, + userId, + botToken + ); } } else { return Mono.error(new UnsupportedOperationException("Unexpected value: " + req)); diff --git a/src/main/java/it/tdlight/reactiveapi/Cli.java b/src/main/java/it/tdlight/reactiveapi/Cli.java index f28cd86..c55ad67 100644 --- a/src/main/java/it/tdlight/reactiveapi/Cli.java +++ b/src/main/java/it/tdlight/reactiveapi/Cli.java @@ -6,8 +6,11 @@ import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest; import java.io.IOException; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import net.minecrell.terminalconsole.SimpleTerminalConsole; import org.jline.reader.LineReader; import org.jline.reader.LineReaderBuilder; @@ -18,6 +21,10 @@ public class Cli { private static final Logger LOG = LoggerFactory.getLogger(Cli.class); + private static final Object parameterLock = new Object(); + private static boolean askedParameter = false; + private static CompletableFuture askedParameterResult = null; + public static void main(String[] args) throws IOException { var validArgs = Entrypoint.parseArguments(args); var atomixBuilder = Atomix.builder(); @@ -57,6 +64,15 @@ public class Cli { @Override protected void runCommand(String command) { + synchronized (parameterLock) { + if (askedParameter) { + askedParameterResult.complete(command); + askedParameterResult = null; + askedParameter = false; + return; + } + } + var parts = command.split(" ", 2); var commandName = parts[0].trim().toLowerCase(); String commandArgs; @@ -132,4 +148,14 @@ public class Cli { LOG.error("Syntax: CreateSession <\"bot\"|\"user\"> "); } } + + public static String askParameter(String question) { + var cf = new CompletableFuture(); + synchronized (parameterLock) { + LOG.info(question); + askedParameter = true; + askedParameterResult = cf; + } + return cf.join(); + } } diff --git a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java index 79b0c04..26c618c 100644 --- a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java +++ b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java @@ -1,5 +1,7 @@ package it.tdlight.reactiveapi; +import static java.util.Collections.unmodifiableSet; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import io.atomix.cluster.Node; @@ -10,10 +12,13 @@ import io.atomix.core.profile.ConsensusProfileConfig; import io.atomix.core.profile.Profile; import io.atomix.protocols.raft.partition.RaftPartitionGroup; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,13 +75,19 @@ public class Entrypoint { atomixBuilder.withCompatibleSerialization(false); atomixBuilder.withClusterId(clusterSettings.id); + Set resultingEventTransformerSet; String nodeId; if (instanceSettings.client) { if (diskSessions != null) { throw new IllegalArgumentException("A client instance can't have a session manager!"); } - atomixBuilder.withMemberId(instanceSettings.id).withAddress(instanceSettings.clientAddress); + if (instanceSettings.clientAddress == null) { + throw new IllegalArgumentException("A client instance must have an address (host:port)"); + } + var address = Address.fromString(instanceSettings.clientAddress); + atomixBuilder.withMemberId(instanceSettings.id).withHost(address.host()).withPort(address.port()); nodeId = null; + resultingEventTransformerSet = Set.of(); } else { if (diskSessions == null) { throw new IllegalArgumentException("A full instance must have a session manager!"); @@ -95,8 +106,28 @@ public class Entrypoint { var nodeSettings = nodeSettingsOptional.get(); - atomixBuilder.withMemberId(instanceSettings.id).withAddress(nodeSettings.address); + var address = Address.fromString(nodeSettings.address); + atomixBuilder.withMemberId(instanceSettings.id).withHost(address.host()).withPort(address.port()); + + resultingEventTransformerSet = new HashSet<>(); + if (instanceSettings.resultingEventTransformers != null) { + for (var resultingEventTransformer: instanceSettings.resultingEventTransformers) { + try { + var instance = resultingEventTransformer.getConstructor().newInstance(); + resultingEventTransformerSet.add(instance); + LOG.info("Loaded and applied resulting event transformer: " + resultingEventTransformer.getName()); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new IllegalArgumentException("Failed to load resulting event transformer: " + + resultingEventTransformer.getName()); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("The client transformer must declare an empty constructor: " + + resultingEventTransformer.getName()); + } + } + } + nodeId = nodeSettings.id; + resultingEventTransformerSet = unmodifiableSet(resultingEventTransformerSet); } var bootstrapDiscoveryProviderNodes = new ArrayList(); @@ -139,7 +170,7 @@ public class Entrypoint { atomix.start().join(); - var api = new AtomixReactiveApi(nodeId, atomix, diskSessions); + var api = new AtomixReactiveApi(nodeId, atomix, diskSessions, resultingEventTransformerSet); LOG.info("Starting ReactiveApi..."); diff --git a/src/main/java/it/tdlight/reactiveapi/Event.java b/src/main/java/it/tdlight/reactiveapi/Event.java index 659df71..f15c6b2 100644 --- a/src/main/java/it/tdlight/reactiveapi/Event.java +++ b/src/main/java/it/tdlight/reactiveapi/Event.java @@ -3,10 +3,8 @@ package it.tdlight.reactiveapi; import it.tdlight.jni.TdApi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.ServerBoundEvent; -import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.IOException; -import java.time.Duration; import java.time.Instant; import org.apache.commons.lang3.SerializationException; @@ -48,7 +46,8 @@ public sealed interface Event permits ClientBoundEvent, ServerBoundEvent { record OnOtherDeviceLoginRequested(long liveId, long userId, String link) implements ClientBoundEvent {} - record OnPasswordRequested(long liveId, long userId) implements ClientBoundEvent {} + record OnPasswordRequested(long liveId, long userId, String passwordHint, boolean hasRecoveryEmail, + String recoveryEmailPattern) implements ClientBoundEvent {} /** * Event received from TDLib diff --git a/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java b/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java index ac95702..f715f67 100644 --- a/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java +++ b/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java @@ -2,6 +2,7 @@ package it.tdlight.reactiveapi; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Set; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -22,12 +23,20 @@ public class InstanceSettings { */ public @Nullable String clientAddress; + /** + * If {@link #client} is false, this will transform resulting events before being sent + */ + public @Nullable Set> resultingEventTransformers; + @JsonCreator public InstanceSettings(@JsonProperty(required = true, value = "id") @NotNull String id, @JsonProperty(required = true, value = "client") boolean client, - @JsonProperty("clientAddress") @Nullable String clientAddress) { + @JsonProperty("clientAddress") @Nullable String clientAddress, + @JsonProperty("resultingEventTransformers") @Nullable + Set> resultingEventTransformers) { this.id = id; this.client = client; this.clientAddress = clientAddress; + this.resultingEventTransformers = resultingEventTransformers; } } diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index 5a9638f..46f762b 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -7,6 +7,7 @@ import it.tdlight.jni.TdApi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested; import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested; +import it.tdlight.reactiveapi.Event.OnPasswordRequested; import it.tdlight.reactiveapi.Event.OnUpdateData; import it.tdlight.reactiveapi.Event.OnUpdateError; import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested; @@ -101,16 +102,17 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient { static ClientBoundEvent deserializeEvent(byte[] bytes) { try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) { - try (var dataInputStream = new DataInputStream(byteArrayInputStream)) { - var liveId = dataInputStream.readLong(); - var userId = dataInputStream.readLong(); - return switch (dataInputStream.readByte()) { - case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(dataInputStream)); - case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(dataInputStream)); - case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, dataInputStream.readLong()); - case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, dataInputStream.readUTF()); - case 0x05 -> new OnOtherDeviceLoginRequested(liveId, userId, dataInputStream.readUTF()); - default -> throw new IllegalStateException("Unexpected value: " + dataInputStream.readByte()); + try (var is = new DataInputStream(byteArrayInputStream)) { + var liveId = is.readLong(); + var userId = is.readLong(); + return switch (is.readByte()) { + case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(is)); + case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(is)); + case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, is.readLong()); + case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, is.readUTF()); + case 0x05 -> new OnOtherDeviceLoginRequested(liveId, userId, is.readUTF()); + case 0x06 -> new OnPasswordRequested(liveId, userId, is.readUTF(), is.readBoolean(), is.readUTF()); + default -> throw new IllegalStateException("Unexpected value: " + is.readByte()); }; } } catch (IOException ex) { diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 418dd41..ae5490a 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -13,6 +13,7 @@ import it.tdlight.common.Response; import it.tdlight.common.Signal; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateWaitOtherDeviceConfirmation; +import it.tdlight.jni.TdApi.AuthorizationStateWaitPassword; import it.tdlight.jni.TdApi.CheckAuthenticationBotToken; import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey; import it.tdlight.jni.TdApi.Object; @@ -41,15 +42,18 @@ import java.io.IOException; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.commons.lang3.SerializationException; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; -import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -61,6 +65,7 @@ public abstract class ReactiveApiPublisher { private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofSeconds(10); private final ClusterEventService eventService; + private final Set resultingEventTransformerSet; private final ReactiveTelegramClient rawTelegramClient; private final Flux telegramClient; @@ -72,8 +77,12 @@ public abstract class ReactiveApiPublisher { private final AtomicReference disposable = new AtomicReference<>(); private final AtomicReference path = new AtomicReference<>(); - private ReactiveApiPublisher(Atomix atomix, long liveId, long userId) { + private ReactiveApiPublisher(Atomix atomix, + Set resultingEventTransformerSet, + long liveId, + long userId) { this.eventService = atomix.getEventService(); + this.resultingEventTransformerSet = resultingEventTransformerSet; this.userId = userId; this.liveId = liveId; this.dynamicIdResolveSubject = SubjectNaming.getDynamicIdResolveSubject(userId); @@ -89,12 +98,20 @@ public abstract class ReactiveApiPublisher { })).share(); } - public static ReactiveApiPublisher fromToken(Atomix atomix, Long liveId, long userId, String token) { - return new ReactiveApiPublisherToken(atomix, liveId, userId, token); + public static ReactiveApiPublisher fromToken(Atomix atomix, + Set resultingEventTransformerSet, + Long liveId, + long userId, + String token) { + return new ReactiveApiPublisherToken(atomix, resultingEventTransformerSet, liveId, userId, token); } - public static ReactiveApiPublisher fromPhoneNumber(Atomix atomix, Long liveId, long userId, long phoneNumber) { - return new ReactiveApiPublisherPhoneNumber(atomix, liveId, userId, phoneNumber); + public static ReactiveApiPublisher fromPhoneNumber(Atomix atomix, + Set resultingEventTransformerSet, + Long liveId, + long userId, + long phoneNumber) { + return new ReactiveApiPublisherPhoneNumber(atomix, resultingEventTransformerSet, liveId, userId, phoneNumber); } public void start(Path path, @Nullable Runnable onClose) { @@ -103,19 +120,32 @@ public abstract class ReactiveApiPublisher { var publishedResultingEvents = telegramClient .subscribeOn(Schedulers.parallel()) // Handle signals, then return a ResultingEvent - .mapNotNull(this::onSignal) + .flatMapIterable(this::onSignal) .doFinally(s -> LOG.trace("Finalized telegram client events")) + + // Transform resulting events using all the registered resulting event transformers + .transform(flux -> { + Flux transformedFlux = flux; + for (ResultingEventTransformer resultingEventTransformer : resultingEventTransformerSet) { + transformedFlux = resultingEventTransformer.transform(isBot(), transformedFlux); + } + return transformedFlux; + }) + .publish(); publishedResultingEvents // Obtain only TDLib-bound events .filter(s -> s instanceof TDLibBoundResultingEvent) .map(s -> ((TDLibBoundResultingEvent) s).action()) + + //.limitRate(4) + .onBackpressureBuffer() // Buffer up to 64 requests to avoid halting the event loop, throw an error if too many requests are buffered - .limitRate(4) - .onBackpressureBuffer(64, BufferOverflowStrategy.ERROR) + //.onBackpressureBuffer(64, BufferOverflowStrategy.ERROR) + // Send requests to tdlib - .concatMap(function -> Mono + .flatMap(function -> Mono .from(rawTelegramClient.send(function, SPECIAL_RAW_TIMEOUT_DURATION)) .mapNotNull(resp -> { if (resp.getConstructor() == TdApi.Error.CONSTRUCTOR) { @@ -146,6 +176,9 @@ public abstract class ReactiveApiPublisher { .cast(ClientBoundResultingEvent.class) .map(ClientBoundResultingEvent::event) + // Buffer requests + .onBackpressureBuffer() + // Send events to the client .subscribeOn(Schedulers.parallel()) .subscribe(clientBoundEvent -> eventService.broadcast("session-client-bound-events", @@ -156,6 +189,9 @@ public abstract class ReactiveApiPublisher { .filter(s -> s instanceof ClusterBoundResultingEvent) .cast(ClusterBoundResultingEvent.class) + // Buffer requests + .onBackpressureBuffer() + // Send events to the cluster .subscribeOn(Schedulers.parallel()) .subscribe(clusterBoundEvent -> { @@ -176,14 +212,28 @@ public abstract class ReactiveApiPublisher { } } - @Nullable - private ResultingEvent onSignal(Signal signal) { + protected abstract boolean isBot(); + + private ResultingEvent wrapUpdateSignal(Signal signal) { + var update = (TdApi.Update) signal.getUpdate(); + return new ClientBoundResultingEvent(new OnUpdateData(liveId, userId, update)); + } + + private List withUpdateSignal(Signal signal, List list) { + var result = new ArrayList(list.size() + 1); + result.add(wrapUpdateSignal(signal)); + result.addAll(list); + return result; + } + + @NotNull + private List<@NotNull ResultingEvent> onSignal(Signal signal) { // Update the state var state = this.state.updateAndGet(oldState -> oldState.withSignal(signal)); if (state.authPhase() == LOGGED_IN) { - var update = (TdApi.Update) signal.getUpdate(); - return new ClientBoundResultingEvent(new OnUpdateData(liveId, userId, update)); + ResultingEvent resultingEvent = wrapUpdateSignal(signal); + return List.of(resultingEvent); } else { LOG.trace("Signal has not been broadcast because the session {} is not logged in: {}", userId, signal); return this.handleSpecialSignal(state, signal); @@ -191,27 +241,28 @@ public abstract class ReactiveApiPublisher { } @SuppressWarnings("SwitchStatementWithTooFewBranches") - @Nullable - private ResultingEvent handleSpecialSignal(State state, Signal signal) { + @NotNull + private List<@NotNull ResultingEvent> handleSpecialSignal(State state, Signal signal) { if (signal.isException()) { LOG.error("Received an error signal", signal.getException()); - return null; + return List.of(); } if (signal.isClosed()) { signal.getClosed(); LOG.info("Received a closed signal"); - return new ResultingEventPublisherClosed(); + return List.of(new ResultingEventPublisherClosed()); } if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) { var error = ((TdApi.Error) signal.getUpdate()); LOG.error("Received a TDLib error signal! Error {}: {}", error.code, error.message); - return null; + return List.of(); } if (!signal.isUpdate()) { LOG.error("Received a signal that's not an update: {}", signal); - return null; + return List.of(); } var update = signal.getUpdate(); + var updateResult = wrapUpdateSignal(signal); switch (state.authPhase()) { case BROKEN -> {} case PARAMETERS_PHASE -> { @@ -221,7 +272,7 @@ public abstract class ReactiveApiPublisher { switch (updateAuthorizationState.authorizationState.getConstructor()) { case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> { TdlibParameters parameters = generateTDLibParameters(); - return new TDLibBoundResultingEvent<>(new SetTdlibParameters(parameters)); + return List.of(updateResult, new TDLibBoundResultingEvent<>(new SetTdlibParameters(parameters))); } } } @@ -233,7 +284,7 @@ public abstract class ReactiveApiPublisher { var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; switch (updateAuthorizationState.authorizationState.getConstructor()) { case TdApi.AuthorizationStateWaitEncryptionKey.CONSTRUCTOR -> { - return new TDLibBoundResultingEvent<>(new CheckDatabaseEncryptionKey()); + return List.of(updateResult, new TDLibBoundResultingEvent<>(new CheckDatabaseEncryptionKey())); } } } @@ -245,24 +296,33 @@ public abstract class ReactiveApiPublisher { var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; switch (updateAuthorizationState.authorizationState.getConstructor()) { case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR -> { - return onWaitCode(); + return withUpdateSignal(signal, onWaitCode()); } case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR -> { var link = ((AuthorizationStateWaitOtherDeviceConfirmation) updateAuthorizationState.authorizationState).link; - return new ClientBoundResultingEvent(new OnOtherDeviceLoginRequested(liveId, userId, link)); + return List.of(updateResult, + new ClientBoundResultingEvent(new OnOtherDeviceLoginRequested(liveId, userId, link))); } case TdApi.AuthorizationStateWaitPassword.CONSTRUCTOR -> { - return new ClientBoundResultingEvent(new OnPasswordRequested(liveId, userId)); + var authorizationStateWaitPassword = ((AuthorizationStateWaitPassword) updateAuthorizationState.authorizationState); + return List.of(updateResult, + new ClientBoundResultingEvent(new OnPasswordRequested(liveId, + userId, + authorizationStateWaitPassword.passwordHint, + authorizationStateWaitPassword.hasRecoveryEmailAddress, + authorizationStateWaitPassword.recoveryEmailAddressPattern + )) + ); } case TdApi.AuthorizationStateWaitPhoneNumber.CONSTRUCTOR -> { - return onWaitToken(); + return withUpdateSignal(signal, onWaitToken()); } } } } } } - return null; + return List.of(); } private TdlibParameters generateTDLibParameters() { @@ -287,11 +347,11 @@ public abstract class ReactiveApiPublisher { return tdlibParameters; } - protected abstract ResultingEvent onWaitToken(); + protected abstract List onWaitToken(); - protected ResultingEvent onWaitCode() { + protected List onWaitCode() { LOG.error("Wait code event is not supported"); - return null; + return List.of(); } private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) { @@ -314,6 +374,11 @@ public abstract class ReactiveApiPublisher { } else if (clientBoundEvent instanceof OnOtherDeviceLoginRequested onOtherDeviceLoginRequested) { dataOutputStream.writeByte(0x5); dataOutputStream.writeUTF(onOtherDeviceLoginRequested.link()); + } else if (clientBoundEvent instanceof OnPasswordRequested onPasswordRequested) { + dataOutputStream.writeByte(0x6); + dataOutputStream.writeUTF(onPasswordRequested.passwordHint()); + dataOutputStream.writeBoolean(onPasswordRequested.hasRecoveryEmail()); + dataOutputStream.writeUTF(onPasswordRequested.recoveryEmailPattern()); } else { throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent); } @@ -402,14 +467,23 @@ public abstract class ReactiveApiPublisher { private final String botToken; - public ReactiveApiPublisherToken(Atomix atomix, Long liveId, long userId, String botToken) { - super(atomix, liveId, userId); + public ReactiveApiPublisherToken(Atomix atomix, + Set resultingEventTransformerSet, + Long liveId, + long userId, + String botToken) { + super(atomix, resultingEventTransformerSet, liveId, userId); this.botToken = botToken; } @Override - protected ResultingEvent onWaitToken() { - return new TDLibBoundResultingEvent<>(new CheckAuthenticationBotToken(botToken)); + protected boolean isBot() { + return true; + } + + @Override + protected List onWaitToken() { + return List.of(new TDLibBoundResultingEvent<>(new CheckAuthenticationBotToken(botToken))); } @Override @@ -426,25 +500,34 @@ public abstract class ReactiveApiPublisher { private final long phoneNumber; - public ReactiveApiPublisherPhoneNumber(Atomix atomix, Long liveId, long userId, long phoneNumber) { - super(atomix, liveId, userId); + public ReactiveApiPublisherPhoneNumber(Atomix atomix, + Set resultingEventTransformerSet, + Long liveId, + long userId, + long phoneNumber) { + super(atomix, resultingEventTransformerSet, liveId, userId); this.phoneNumber = phoneNumber; } @Override - protected ResultingEvent onWaitToken() { + protected boolean isBot() { + return false; + } + + @Override + protected List onWaitToken() { var authSettings = new PhoneNumberAuthenticationSettings(); authSettings.allowFlashCall = false; authSettings.allowSmsRetrieverApi = false; authSettings.isCurrentPhoneNumber = false; - return new TDLibBoundResultingEvent<>(new SetAuthenticationPhoneNumber("+" + phoneNumber, + return List.of(new TDLibBoundResultingEvent<>(new SetAuthenticationPhoneNumber("+" + phoneNumber, authSettings - )); + ))); } @Override - public ClientBoundResultingEvent onWaitCode() { - return new ClientBoundResultingEvent(new OnUserLoginCodeRequested(liveId, userId, phoneNumber)); + public List onWaitCode() { + return List.of(new ClientBoundResultingEvent(new OnUserLoginCodeRequested(liveId, userId, phoneNumber))); } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/ResultingEventTransformer.java b/src/main/java/it/tdlight/reactiveapi/ResultingEventTransformer.java new file mode 100644 index 0000000..b2c391e --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ResultingEventTransformer.java @@ -0,0 +1,9 @@ +package it.tdlight.reactiveapi; + +import reactor.core.publisher.Flux; + + +public interface ResultingEventTransformer { + + Flux transform(boolean isBot, Flux events); +} diff --git a/src/main/java/it/tdlight/reactiveapi/transformer/CommandLineAuth.java b/src/main/java/it/tdlight/reactiveapi/transformer/CommandLineAuth.java new file mode 100644 index 0000000..9d69582 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/transformer/CommandLineAuth.java @@ -0,0 +1,54 @@ +package it.tdlight.reactiveapi.transformer; + +import it.tdlight.jni.TdApi.CheckAuthenticationCode; +import it.tdlight.jni.TdApi.CheckAuthenticationPassword; +import it.tdlight.reactiveapi.Cli; +import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested; +import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested; +import it.tdlight.reactiveapi.Event.OnPasswordRequested; +import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested; +import it.tdlight.reactiveapi.ResultingEvent; +import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent; +import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; +import it.tdlight.reactiveapi.ResultingEventTransformer; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class CommandLineAuth implements ResultingEventTransformer { + + @Override + public Flux transform(boolean isBot, Flux events) { + return events.flatMapSequential(event -> { + if (event instanceof ClientBoundResultingEvent clientBoundResultingEvent) { + if (clientBoundResultingEvent.event() instanceof OnUserLoginCodeRequested requested) { + return this + .askParam("Please type the login code of " + requested.phoneNumber(), requested.userId()) + .map(code -> new TDLibBoundResultingEvent<>(new CheckAuthenticationCode(code))); + } else if (clientBoundResultingEvent.event() instanceof OnBotLoginCodeRequested requested) { + return this + .askParam("Please type the login code of " + requested.token(), requested.userId()) + .map(code -> new TDLibBoundResultingEvent<>(new CheckAuthenticationCode(code))); + } else if (clientBoundResultingEvent.event() instanceof OnPasswordRequested onPasswordRequested) { + return this + .askParam("Please type the password. Hint: " + onPasswordRequested.passwordHint() + ( + onPasswordRequested.hasRecoveryEmail() ? " Recovery e-mail: " + + onPasswordRequested.recoveryEmailPattern() : ""), onPasswordRequested.userId()) + .map(password -> new TDLibBoundResultingEvent<>(new CheckAuthenticationPassword(password))); + } else if (clientBoundResultingEvent.event() instanceof OnOtherDeviceLoginRequested onOtherDeviceLoginRequested) { + return this + .askParam("Please confirm the login on another other device, (after copying the link press enter): " + + onOtherDeviceLoginRequested.link(), onOtherDeviceLoginRequested.userId()) + .then(Mono.empty()); + } + } + return Mono.just(event); + }); + } + + private Mono askParam(String text, long userId) { + return Mono + .fromCallable(() -> Cli.askParameter("[#IDU" + userId + "]" + text)) + .subscribeOn(Schedulers.boundedElastic()); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/transformer/DefaultOptions.java b/src/main/java/it/tdlight/reactiveapi/transformer/DefaultOptions.java new file mode 100644 index 0000000..2c94e44 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/transformer/DefaultOptions.java @@ -0,0 +1,69 @@ +package it.tdlight.reactiveapi.transformer; + +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.OptionValueBoolean; +import it.tdlight.jni.TdApi.OptionValueInteger; +import it.tdlight.reactiveapi.Event.OnUpdateData; +import it.tdlight.reactiveapi.ResultingEvent; +import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent; +import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; +import it.tdlight.reactiveapi.ResultingEventTransformer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import reactor.core.publisher.Flux; + +public class DefaultOptions implements ResultingEventTransformer { + + private static final Collection DEFAULT_OPTIONS = List.of( + setInt("message_unload_delay", 1800), + setBoolean("disable_persistent_network_statistics", true), + setBoolean("disable_time_adjustment_protection", true), + setBoolean("ignore_inline_thumbnails", true), + setBoolean("ignore_platform_restrictions", true), + setBoolean("use_storage_optimizer", true) + ); + + private static final Collection DEFAULT_USER_OPTIONS = List.of( + setBoolean("disable_animated_emoji", true), + setBoolean("disable_contact_registered_notifications", true), + setBoolean("disable_top_chats", true), + setInt("notification_group_count_max", 0), + setInt("notification_group_size_max", 1) + ); + + @Override + public Flux transform(boolean isBot, Flux events) { + return events.flatMapIterable(event -> { + + // Append the options if the initial auth state is intercepted + if (event instanceof ClientBoundResultingEvent clientBoundResultingEvent + && clientBoundResultingEvent.event() instanceof OnUpdateData onUpdate + && onUpdate.update() instanceof TdApi.UpdateAuthorizationState authorizationState + && authorizationState.authorizationState instanceof TdApi.AuthorizationStateWaitEncryptionKey) { + + var resultingEvent = new ArrayList(1 + DEFAULT_OPTIONS.size() + DEFAULT_USER_OPTIONS.size()); + // Add the intercepted event + resultingEvent.add(event); + // Add the default options + resultingEvent.addAll(DEFAULT_OPTIONS); + // Add user-only default options + if (!isBot) { + resultingEvent.addAll(DEFAULT_USER_OPTIONS); + } + return resultingEvent; + } else { + // Return just the intercepted event as-is + return List.of(event); + } + }); + } + + private static ResultingEvent setBoolean(String optionName, boolean value) { + return new TDLibBoundResultingEvent<>(new TdApi.SetOption(optionName, new OptionValueBoolean(value))); + } + + private static ResultingEvent setInt(String optionName, int value) { + return new TDLibBoundResultingEvent<>(new TdApi.SetOption(optionName, new OptionValueInteger(value))); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/transformer/DisableChatDatabase.java b/src/main/java/it/tdlight/reactiveapi/transformer/DisableChatDatabase.java new file mode 100644 index 0000000..f7889bb --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/transformer/DisableChatDatabase.java @@ -0,0 +1,25 @@ +package it.tdlight.reactiveapi.transformer; + +import it.tdlight.jni.TdApi; +import it.tdlight.reactiveapi.ResultingEvent; +import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; +import it.tdlight.reactiveapi.ResultingEventTransformer; +import java.util.List; +import reactor.core.publisher.Flux; + +public class DisableChatDatabase implements ResultingEventTransformer { + + @Override + public Flux transform(boolean isBot, Flux events) { + return events.flatMapIterable(event -> { + + // Change option + if (event instanceof TDLibBoundResultingEvent tdLibBoundResultingEvent + && tdLibBoundResultingEvent.action() instanceof TdApi.SetTdlibParameters setTdlibParameters) { + setTdlibParameters.parameters.useChatInfoDatabase = false; + } + + return List.of(event); + }); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/transformer/DisableFileDatabase.java b/src/main/java/it/tdlight/reactiveapi/transformer/DisableFileDatabase.java new file mode 100644 index 0000000..d93985b --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/transformer/DisableFileDatabase.java @@ -0,0 +1,25 @@ +package it.tdlight.reactiveapi.transformer; + +import it.tdlight.jni.TdApi; +import it.tdlight.reactiveapi.ResultingEvent; +import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; +import it.tdlight.reactiveapi.ResultingEventTransformer; +import java.util.List; +import reactor.core.publisher.Flux; + +public class DisableFileDatabase implements ResultingEventTransformer { + + @Override + public Flux transform(boolean isBot, Flux events) { + return events.flatMapIterable(event -> { + + // Change option + if (event instanceof TDLibBoundResultingEvent tdLibBoundResultingEvent + && tdLibBoundResultingEvent.action() instanceof TdApi.SetTdlibParameters setTdlibParameters) { + setTdlibParameters.parameters.useFileDatabase = false; + } + + return List.of(event); + }); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/transformer/DisableMessageDatabase.java b/src/main/java/it/tdlight/reactiveapi/transformer/DisableMessageDatabase.java new file mode 100644 index 0000000..aeff675 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/transformer/DisableMessageDatabase.java @@ -0,0 +1,25 @@ +package it.tdlight.reactiveapi.transformer; + +import it.tdlight.jni.TdApi; +import it.tdlight.reactiveapi.ResultingEvent; +import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; +import it.tdlight.reactiveapi.ResultingEventTransformer; +import java.util.List; +import reactor.core.publisher.Flux; + +public class DisableMessageDatabase implements ResultingEventTransformer { + + @Override + public Flux transform(boolean isBot, Flux events) { + return events.flatMapIterable(event -> { + + // Change option + if (event instanceof TDLibBoundResultingEvent tdLibBoundResultingEvent + && tdLibBoundResultingEvent.action() instanceof TdApi.SetTdlibParameters setTdlibParameters) { + setTdlibParameters.parameters.useMessageDatabase = false; + } + + return List.of(event); + }); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/transformer/DummyResultingEventTransformer.java b/src/main/java/it/tdlight/reactiveapi/transformer/DummyResultingEventTransformer.java new file mode 100644 index 0000000..2598386 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/transformer/DummyResultingEventTransformer.java @@ -0,0 +1,13 @@ +package it.tdlight.reactiveapi.transformer; + +import it.tdlight.reactiveapi.ResultingEvent; +import it.tdlight.reactiveapi.ResultingEventTransformer; +import reactor.core.publisher.Flux; + +public class DummyResultingEventTransformer implements ResultingEventTransformer { + + @Override + public Flux transform(boolean isBot, Flux events) { + return events; + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/transformer/TdlightDefaultOptions.java b/src/main/java/it/tdlight/reactiveapi/transformer/TdlightDefaultOptions.java new file mode 100644 index 0000000..7aa2593 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/transformer/TdlightDefaultOptions.java @@ -0,0 +1,64 @@ +package it.tdlight.reactiveapi.transformer; + +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.OptionValueBoolean; +import it.tdlight.jni.TdApi.OptionValueInteger; +import it.tdlight.reactiveapi.Event.OnUpdateData; +import it.tdlight.reactiveapi.ResultingEvent; +import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent; +import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; +import it.tdlight.reactiveapi.ResultingEventTransformer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import reactor.core.publisher.Flux; + +public class TdlightDefaultOptions implements ResultingEventTransformer { + + private static final Collection DEFAULT_OPTIONS = List.of( + setBoolean("disable_document_filenames", true), + setBoolean("disable_minithumbnails", true), + setBoolean("disable_notifications", true), + setBoolean("ignore_update_chat_last_message", true), + setBoolean("ignore_update_chat_read_inbox", true), + setBoolean("ignore_update_user_chat_action", true), + setBoolean("ignore_server_deletes_and_reads", true) + ); + + private static final Collection DEFAULT_USER_OPTIONS = List.of(); + + @Override + public Flux transform(boolean isBot, Flux events) { + return events.flatMapIterable(event -> { + + // Append the options if the initial auth state is intercepted + if (event instanceof ClientBoundResultingEvent clientBoundResultingEvent + && clientBoundResultingEvent.event() instanceof OnUpdateData onUpdate + && onUpdate.update() instanceof TdApi.UpdateAuthorizationState authorizationState + && authorizationState.authorizationState instanceof TdApi.AuthorizationStateWaitEncryptionKey) { + + var resultingEvent = new ArrayList(1 + DEFAULT_OPTIONS.size() + DEFAULT_USER_OPTIONS.size()); + // Add the intercepted event + resultingEvent.add(event); + // Add the default options + resultingEvent.addAll(DEFAULT_OPTIONS); + // Add user-only default options + if (!isBot) { + resultingEvent.addAll(DEFAULT_USER_OPTIONS); + } + return resultingEvent; + } else { + // Return just the intercepted event as-is + return List.of(event); + } + }); + } + + private static ResultingEvent setBoolean(String optionName, boolean value) { + return new TDLibBoundResultingEvent<>(new TdApi.SetOption(optionName, new OptionValueBoolean(value))); + } + + private static ResultingEvent setInt(String optionName, int value) { + return new TDLibBoundResultingEvent<>(new TdApi.SetOption(optionName, new OptionValueInteger(value))); + } +} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 049acfa..3adab6f 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -1,5 +1,5 @@ - + - +