Implement event transformers
This commit is contained in:
parent
4bbb9cd762
commit
5b9fec980e
27
src/main/java/it/tdlight/reactiveapi/Address.java
Normal file
27
src/main/java/it/tdlight/reactiveapi/Address.java
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
public record Address(String host, int port) {
|
||||||
|
public Address {
|
||||||
|
if (host.isBlank()) {
|
||||||
|
throw new IllegalArgumentException("Host is blank");
|
||||||
|
}
|
||||||
|
if (port < 0) {
|
||||||
|
throw new IndexOutOfBoundsException(port);
|
||||||
|
}
|
||||||
|
if (port >= 65536) {
|
||||||
|
throw new IndexOutOfBoundsException(port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Address fromString(String address) {
|
||||||
|
var parts = address.split(":");
|
||||||
|
if (parts.length < 2) {
|
||||||
|
throw new IllegalArgumentException("Malformed client address, it must have a port (host:port)");
|
||||||
|
}
|
||||||
|
var host = String.join(":", Arrays.copyOf(parts, parts.length - 1));
|
||||||
|
var port = Integer.parseUnsignedInt(parts[parts.length - 1]);
|
||||||
|
return new Address(host, port);
|
||||||
|
}
|
||||||
|
}
|
@ -31,6 +31,7 @@ import java.util.concurrent.CompletionException;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -48,6 +49,7 @@ public class AtomixReactiveApi implements ReactiveApi {
|
|||||||
@Nullable
|
@Nullable
|
||||||
private final String nodeId;
|
private final String nodeId;
|
||||||
private final Atomix atomix;
|
private final Atomix atomix;
|
||||||
|
private final Set<ResultingEventTransformer> resultingEventTransformerSet;
|
||||||
private final AsyncAtomicIdGenerator nextSessionLiveId;
|
private final AsyncAtomicIdGenerator nextSessionLiveId;
|
||||||
|
|
||||||
private final AsyncAtomicLock sessionModificationLock;
|
private final AsyncAtomicLock sessionModificationLock;
|
||||||
@ -62,9 +64,13 @@ public class AtomixReactiveApi implements ReactiveApi {
|
|||||||
@Nullable
|
@Nullable
|
||||||
private final DiskSessionsManager diskSessions;
|
private final DiskSessionsManager diskSessions;
|
||||||
|
|
||||||
public AtomixReactiveApi(@Nullable String nodeId, Atomix atomix, @Nullable DiskSessionsManager diskSessions) {
|
public AtomixReactiveApi(@Nullable String nodeId,
|
||||||
|
Atomix atomix,
|
||||||
|
@Nullable DiskSessionsManager diskSessions,
|
||||||
|
@NotNull Set<ResultingEventTransformer> resultingEventTransformerSet) {
|
||||||
this.nodeId = nodeId;
|
this.nodeId = nodeId;
|
||||||
this.atomix = atomix;
|
this.atomix = atomix;
|
||||||
|
this.resultingEventTransformerSet = resultingEventTransformerSet;
|
||||||
|
|
||||||
if (nodeId == null) {
|
if (nodeId == null) {
|
||||||
if (diskSessions != null) {
|
if (diskSessions != null) {
|
||||||
@ -248,22 +254,38 @@ public class AtomixReactiveApi implements ReactiveApi {
|
|||||||
userId = createBotSessionRequest.userId();
|
userId = createBotSessionRequest.userId();
|
||||||
botToken = createBotSessionRequest.token();
|
botToken = createBotSessionRequest.token();
|
||||||
phoneNumber = null;
|
phoneNumber = null;
|
||||||
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken);
|
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, resultingEventTransformerSet,
|
||||||
|
liveId,
|
||||||
|
userId,
|
||||||
|
botToken
|
||||||
|
);
|
||||||
} else if (req instanceof CreateUserSessionRequest createUserSessionRequest) {
|
} else if (req instanceof CreateUserSessionRequest createUserSessionRequest) {
|
||||||
loadedFromDisk = false;
|
loadedFromDisk = false;
|
||||||
userId = createUserSessionRequest.userId();
|
userId = createUserSessionRequest.userId();
|
||||||
botToken = null;
|
botToken = null;
|
||||||
phoneNumber = createUserSessionRequest.phoneNumber();
|
phoneNumber = createUserSessionRequest.phoneNumber();
|
||||||
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber);
|
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, resultingEventTransformerSet,
|
||||||
|
liveId,
|
||||||
|
userId,
|
||||||
|
phoneNumber
|
||||||
|
);
|
||||||
} else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) {
|
} else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) {
|
||||||
loadedFromDisk = true;
|
loadedFromDisk = true;
|
||||||
userId = loadSessionFromDiskRequest.userId();
|
userId = loadSessionFromDiskRequest.userId();
|
||||||
botToken = loadSessionFromDiskRequest.token();
|
botToken = loadSessionFromDiskRequest.token();
|
||||||
phoneNumber = loadSessionFromDiskRequest.phoneNumber();
|
phoneNumber = loadSessionFromDiskRequest.phoneNumber();
|
||||||
if (loadSessionFromDiskRequest.phoneNumber() != null) {
|
if (loadSessionFromDiskRequest.phoneNumber() != null) {
|
||||||
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber);
|
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, resultingEventTransformerSet,
|
||||||
|
liveId,
|
||||||
|
userId,
|
||||||
|
phoneNumber
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken);
|
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, resultingEventTransformerSet,
|
||||||
|
liveId,
|
||||||
|
userId,
|
||||||
|
botToken
|
||||||
|
);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Mono.error(new UnsupportedOperationException("Unexpected value: " + req));
|
return Mono.error(new UnsupportedOperationException("Unexpected value: " + req));
|
||||||
|
@ -6,8 +6,11 @@ import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ForkJoinPool;
|
import java.util.concurrent.ForkJoinPool;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.locks.LockSupport;
|
||||||
import net.minecrell.terminalconsole.SimpleTerminalConsole;
|
import net.minecrell.terminalconsole.SimpleTerminalConsole;
|
||||||
import org.jline.reader.LineReader;
|
import org.jline.reader.LineReader;
|
||||||
import org.jline.reader.LineReaderBuilder;
|
import org.jline.reader.LineReaderBuilder;
|
||||||
@ -18,6 +21,10 @@ public class Cli {
|
|||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(Cli.class);
|
private static final Logger LOG = LoggerFactory.getLogger(Cli.class);
|
||||||
|
|
||||||
|
private static final Object parameterLock = new Object();
|
||||||
|
private static boolean askedParameter = false;
|
||||||
|
private static CompletableFuture<String> askedParameterResult = null;
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
var validArgs = Entrypoint.parseArguments(args);
|
var validArgs = Entrypoint.parseArguments(args);
|
||||||
var atomixBuilder = Atomix.builder();
|
var atomixBuilder = Atomix.builder();
|
||||||
@ -57,6 +64,15 @@ public class Cli {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void runCommand(String command) {
|
protected void runCommand(String command) {
|
||||||
|
synchronized (parameterLock) {
|
||||||
|
if (askedParameter) {
|
||||||
|
askedParameterResult.complete(command);
|
||||||
|
askedParameterResult = null;
|
||||||
|
askedParameter = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var parts = command.split(" ", 2);
|
var parts = command.split(" ", 2);
|
||||||
var commandName = parts[0].trim().toLowerCase();
|
var commandName = parts[0].trim().toLowerCase();
|
||||||
String commandArgs;
|
String commandArgs;
|
||||||
@ -132,4 +148,14 @@ public class Cli {
|
|||||||
LOG.error("Syntax: CreateSession <\"bot\"|\"user\"> <userid> <token|phoneNumber>");
|
LOG.error("Syntax: CreateSession <\"bot\"|\"user\"> <userid> <token|phoneNumber>");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String askParameter(String question) {
|
||||||
|
var cf = new CompletableFuture<String>();
|
||||||
|
synchronized (parameterLock) {
|
||||||
|
LOG.info(question);
|
||||||
|
askedParameter = true;
|
||||||
|
askedParameterResult = cf;
|
||||||
|
}
|
||||||
|
return cf.join();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
package it.tdlight.reactiveapi;
|
package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
|
import static java.util.Collections.unmodifiableSet;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
|
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
|
||||||
import io.atomix.cluster.Node;
|
import io.atomix.cluster.Node;
|
||||||
@ -10,10 +12,13 @@ import io.atomix.core.profile.ConsensusProfileConfig;
|
|||||||
import io.atomix.core.profile.Profile;
|
import io.atomix.core.profile.Profile;
|
||||||
import io.atomix.protocols.raft.partition.RaftPartitionGroup;
|
import io.atomix.protocols.raft.partition.RaftPartitionGroup;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -70,13 +75,19 @@ public class Entrypoint {
|
|||||||
atomixBuilder.withCompatibleSerialization(false);
|
atomixBuilder.withCompatibleSerialization(false);
|
||||||
atomixBuilder.withClusterId(clusterSettings.id);
|
atomixBuilder.withClusterId(clusterSettings.id);
|
||||||
|
|
||||||
|
Set<ResultingEventTransformer> resultingEventTransformerSet;
|
||||||
String nodeId;
|
String nodeId;
|
||||||
if (instanceSettings.client) {
|
if (instanceSettings.client) {
|
||||||
if (diskSessions != null) {
|
if (diskSessions != null) {
|
||||||
throw new IllegalArgumentException("A client instance can't have a session manager!");
|
throw new IllegalArgumentException("A client instance can't have a session manager!");
|
||||||
}
|
}
|
||||||
atomixBuilder.withMemberId(instanceSettings.id).withAddress(instanceSettings.clientAddress);
|
if (instanceSettings.clientAddress == null) {
|
||||||
|
throw new IllegalArgumentException("A client instance must have an address (host:port)");
|
||||||
|
}
|
||||||
|
var address = Address.fromString(instanceSettings.clientAddress);
|
||||||
|
atomixBuilder.withMemberId(instanceSettings.id).withHost(address.host()).withPort(address.port());
|
||||||
nodeId = null;
|
nodeId = null;
|
||||||
|
resultingEventTransformerSet = Set.of();
|
||||||
} else {
|
} else {
|
||||||
if (diskSessions == null) {
|
if (diskSessions == null) {
|
||||||
throw new IllegalArgumentException("A full instance must have a session manager!");
|
throw new IllegalArgumentException("A full instance must have a session manager!");
|
||||||
@ -95,8 +106,28 @@ public class Entrypoint {
|
|||||||
|
|
||||||
var nodeSettings = nodeSettingsOptional.get();
|
var nodeSettings = nodeSettingsOptional.get();
|
||||||
|
|
||||||
atomixBuilder.withMemberId(instanceSettings.id).withAddress(nodeSettings.address);
|
var address = Address.fromString(nodeSettings.address);
|
||||||
|
atomixBuilder.withMemberId(instanceSettings.id).withHost(address.host()).withPort(address.port());
|
||||||
|
|
||||||
|
resultingEventTransformerSet = new HashSet<>();
|
||||||
|
if (instanceSettings.resultingEventTransformers != null) {
|
||||||
|
for (var resultingEventTransformer: instanceSettings.resultingEventTransformers) {
|
||||||
|
try {
|
||||||
|
var instance = resultingEventTransformer.getConstructor().newInstance();
|
||||||
|
resultingEventTransformerSet.add(instance);
|
||||||
|
LOG.info("Loaded and applied resulting event transformer: " + resultingEventTransformer.getName());
|
||||||
|
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
|
||||||
|
throw new IllegalArgumentException("Failed to load resulting event transformer: "
|
||||||
|
+ resultingEventTransformer.getName());
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
throw new IllegalArgumentException("The client transformer must declare an empty constructor: "
|
||||||
|
+ resultingEventTransformer.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
nodeId = nodeSettings.id;
|
nodeId = nodeSettings.id;
|
||||||
|
resultingEventTransformerSet = unmodifiableSet(resultingEventTransformerSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
var bootstrapDiscoveryProviderNodes = new ArrayList<Node>();
|
var bootstrapDiscoveryProviderNodes = new ArrayList<Node>();
|
||||||
@ -139,7 +170,7 @@ public class Entrypoint {
|
|||||||
|
|
||||||
atomix.start().join();
|
atomix.start().join();
|
||||||
|
|
||||||
var api = new AtomixReactiveApi(nodeId, atomix, diskSessions);
|
var api = new AtomixReactiveApi(nodeId, atomix, diskSessions, resultingEventTransformerSet);
|
||||||
|
|
||||||
LOG.info("Starting ReactiveApi...");
|
LOG.info("Starting ReactiveApi...");
|
||||||
|
|
||||||
|
@ -3,10 +3,8 @@ package it.tdlight.reactiveapi;
|
|||||||
import it.tdlight.jni.TdApi;
|
import it.tdlight.jni.TdApi;
|
||||||
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
|
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
|
||||||
import it.tdlight.reactiveapi.Event.ServerBoundEvent;
|
import it.tdlight.reactiveapi.Event.ServerBoundEvent;
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.time.Duration;
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import org.apache.commons.lang3.SerializationException;
|
import org.apache.commons.lang3.SerializationException;
|
||||||
|
|
||||||
@ -48,7 +46,8 @@ public sealed interface Event permits ClientBoundEvent, ServerBoundEvent {
|
|||||||
|
|
||||||
record OnOtherDeviceLoginRequested(long liveId, long userId, String link) implements ClientBoundEvent {}
|
record OnOtherDeviceLoginRequested(long liveId, long userId, String link) implements ClientBoundEvent {}
|
||||||
|
|
||||||
record OnPasswordRequested(long liveId, long userId) implements ClientBoundEvent {}
|
record OnPasswordRequested(long liveId, long userId, String passwordHint, boolean hasRecoveryEmail,
|
||||||
|
String recoveryEmailPattern) implements ClientBoundEvent {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Event received from TDLib
|
* Event received from TDLib
|
||||||
|
@ -2,6 +2,7 @@ package it.tdlight.reactiveapi;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import java.util.Set;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
|
||||||
@ -22,12 +23,20 @@ public class InstanceSettings {
|
|||||||
*/
|
*/
|
||||||
public @Nullable String clientAddress;
|
public @Nullable String clientAddress;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If {@link #client} is false, this will transform resulting events <b>before</b> being sent
|
||||||
|
*/
|
||||||
|
public @Nullable Set<Class<? extends ResultingEventTransformer>> resultingEventTransformers;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public InstanceSettings(@JsonProperty(required = true, value = "id") @NotNull String id,
|
public InstanceSettings(@JsonProperty(required = true, value = "id") @NotNull String id,
|
||||||
@JsonProperty(required = true, value = "client") boolean client,
|
@JsonProperty(required = true, value = "client") boolean client,
|
||||||
@JsonProperty("clientAddress") @Nullable String clientAddress) {
|
@JsonProperty("clientAddress") @Nullable String clientAddress,
|
||||||
|
@JsonProperty("resultingEventTransformers") @Nullable
|
||||||
|
Set<Class<? extends ResultingEventTransformer>> resultingEventTransformers) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.clientAddress = clientAddress;
|
this.clientAddress = clientAddress;
|
||||||
|
this.resultingEventTransformers = resultingEventTransformers;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@ import it.tdlight.jni.TdApi;
|
|||||||
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
|
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
|
||||||
import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested;
|
import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested;
|
||||||
import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested;
|
import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested;
|
||||||
|
import it.tdlight.reactiveapi.Event.OnPasswordRequested;
|
||||||
import it.tdlight.reactiveapi.Event.OnUpdateData;
|
import it.tdlight.reactiveapi.Event.OnUpdateData;
|
||||||
import it.tdlight.reactiveapi.Event.OnUpdateError;
|
import it.tdlight.reactiveapi.Event.OnUpdateError;
|
||||||
import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested;
|
import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested;
|
||||||
@ -101,16 +102,17 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
|
|||||||
|
|
||||||
static ClientBoundEvent deserializeEvent(byte[] bytes) {
|
static ClientBoundEvent deserializeEvent(byte[] bytes) {
|
||||||
try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) {
|
try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) {
|
||||||
try (var dataInputStream = new DataInputStream(byteArrayInputStream)) {
|
try (var is = new DataInputStream(byteArrayInputStream)) {
|
||||||
var liveId = dataInputStream.readLong();
|
var liveId = is.readLong();
|
||||||
var userId = dataInputStream.readLong();
|
var userId = is.readLong();
|
||||||
return switch (dataInputStream.readByte()) {
|
return switch (is.readByte()) {
|
||||||
case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(dataInputStream));
|
case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(is));
|
||||||
case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(dataInputStream));
|
case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(is));
|
||||||
case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, dataInputStream.readLong());
|
case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, is.readLong());
|
||||||
case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, dataInputStream.readUTF());
|
case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, is.readUTF());
|
||||||
case 0x05 -> new OnOtherDeviceLoginRequested(liveId, userId, dataInputStream.readUTF());
|
case 0x05 -> new OnOtherDeviceLoginRequested(liveId, userId, is.readUTF());
|
||||||
default -> throw new IllegalStateException("Unexpected value: " + dataInputStream.readByte());
|
case 0x06 -> new OnPasswordRequested(liveId, userId, is.readUTF(), is.readBoolean(), is.readUTF());
|
||||||
|
default -> throw new IllegalStateException("Unexpected value: " + is.readByte());
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
|
@ -13,6 +13,7 @@ import it.tdlight.common.Response;
|
|||||||
import it.tdlight.common.Signal;
|
import it.tdlight.common.Signal;
|
||||||
import it.tdlight.jni.TdApi;
|
import it.tdlight.jni.TdApi;
|
||||||
import it.tdlight.jni.TdApi.AuthorizationStateWaitOtherDeviceConfirmation;
|
import it.tdlight.jni.TdApi.AuthorizationStateWaitOtherDeviceConfirmation;
|
||||||
|
import it.tdlight.jni.TdApi.AuthorizationStateWaitPassword;
|
||||||
import it.tdlight.jni.TdApi.CheckAuthenticationBotToken;
|
import it.tdlight.jni.TdApi.CheckAuthenticationBotToken;
|
||||||
import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey;
|
import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey;
|
||||||
import it.tdlight.jni.TdApi.Object;
|
import it.tdlight.jni.TdApi.Object;
|
||||||
@ -41,15 +42,18 @@ import java.io.IOException;
|
|||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.StringJoiner;
|
import java.util.StringJoiner;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import org.apache.commons.lang3.SerializationException;
|
import org.apache.commons.lang3.SerializationException;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import reactor.core.Disposable;
|
import reactor.core.Disposable;
|
||||||
import reactor.core.publisher.BufferOverflowStrategy;
|
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
@ -61,6 +65,7 @@ public abstract class ReactiveApiPublisher {
|
|||||||
private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofSeconds(10);
|
private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofSeconds(10);
|
||||||
|
|
||||||
private final ClusterEventService eventService;
|
private final ClusterEventService eventService;
|
||||||
|
private final Set<ResultingEventTransformer> resultingEventTransformerSet;
|
||||||
private final ReactiveTelegramClient rawTelegramClient;
|
private final ReactiveTelegramClient rawTelegramClient;
|
||||||
private final Flux<Signal> telegramClient;
|
private final Flux<Signal> telegramClient;
|
||||||
|
|
||||||
@ -72,8 +77,12 @@ public abstract class ReactiveApiPublisher {
|
|||||||
private final AtomicReference<Disposable> disposable = new AtomicReference<>();
|
private final AtomicReference<Disposable> disposable = new AtomicReference<>();
|
||||||
private final AtomicReference<Path> path = new AtomicReference<>();
|
private final AtomicReference<Path> path = new AtomicReference<>();
|
||||||
|
|
||||||
private ReactiveApiPublisher(Atomix atomix, long liveId, long userId) {
|
private ReactiveApiPublisher(Atomix atomix,
|
||||||
|
Set<ResultingEventTransformer> resultingEventTransformerSet,
|
||||||
|
long liveId,
|
||||||
|
long userId) {
|
||||||
this.eventService = atomix.getEventService();
|
this.eventService = atomix.getEventService();
|
||||||
|
this.resultingEventTransformerSet = resultingEventTransformerSet;
|
||||||
this.userId = userId;
|
this.userId = userId;
|
||||||
this.liveId = liveId;
|
this.liveId = liveId;
|
||||||
this.dynamicIdResolveSubject = SubjectNaming.getDynamicIdResolveSubject(userId);
|
this.dynamicIdResolveSubject = SubjectNaming.getDynamicIdResolveSubject(userId);
|
||||||
@ -89,12 +98,20 @@ public abstract class ReactiveApiPublisher {
|
|||||||
})).share();
|
})).share();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ReactiveApiPublisher fromToken(Atomix atomix, Long liveId, long userId, String token) {
|
public static ReactiveApiPublisher fromToken(Atomix atomix,
|
||||||
return new ReactiveApiPublisherToken(atomix, liveId, userId, token);
|
Set<ResultingEventTransformer> resultingEventTransformerSet,
|
||||||
|
Long liveId,
|
||||||
|
long userId,
|
||||||
|
String token) {
|
||||||
|
return new ReactiveApiPublisherToken(atomix, resultingEventTransformerSet, liveId, userId, token);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ReactiveApiPublisher fromPhoneNumber(Atomix atomix, Long liveId, long userId, long phoneNumber) {
|
public static ReactiveApiPublisher fromPhoneNumber(Atomix atomix,
|
||||||
return new ReactiveApiPublisherPhoneNumber(atomix, liveId, userId, phoneNumber);
|
Set<ResultingEventTransformer> resultingEventTransformerSet,
|
||||||
|
Long liveId,
|
||||||
|
long userId,
|
||||||
|
long phoneNumber) {
|
||||||
|
return new ReactiveApiPublisherPhoneNumber(atomix, resultingEventTransformerSet, liveId, userId, phoneNumber);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start(Path path, @Nullable Runnable onClose) {
|
public void start(Path path, @Nullable Runnable onClose) {
|
||||||
@ -103,19 +120,32 @@ public abstract class ReactiveApiPublisher {
|
|||||||
var publishedResultingEvents = telegramClient
|
var publishedResultingEvents = telegramClient
|
||||||
.subscribeOn(Schedulers.parallel())
|
.subscribeOn(Schedulers.parallel())
|
||||||
// Handle signals, then return a ResultingEvent
|
// Handle signals, then return a ResultingEvent
|
||||||
.mapNotNull(this::onSignal)
|
.flatMapIterable(this::onSignal)
|
||||||
.doFinally(s -> LOG.trace("Finalized telegram client events"))
|
.doFinally(s -> LOG.trace("Finalized telegram client events"))
|
||||||
|
|
||||||
|
// Transform resulting events using all the registered resulting event transformers
|
||||||
|
.transform(flux -> {
|
||||||
|
Flux<ResultingEvent> transformedFlux = flux;
|
||||||
|
for (ResultingEventTransformer resultingEventTransformer : resultingEventTransformerSet) {
|
||||||
|
transformedFlux = resultingEventTransformer.transform(isBot(), transformedFlux);
|
||||||
|
}
|
||||||
|
return transformedFlux;
|
||||||
|
})
|
||||||
|
|
||||||
.publish();
|
.publish();
|
||||||
|
|
||||||
publishedResultingEvents
|
publishedResultingEvents
|
||||||
// Obtain only TDLib-bound events
|
// Obtain only TDLib-bound events
|
||||||
.filter(s -> s instanceof TDLibBoundResultingEvent<?>)
|
.filter(s -> s instanceof TDLibBoundResultingEvent<?>)
|
||||||
.map(s -> ((TDLibBoundResultingEvent<?>) s).action())
|
.map(s -> ((TDLibBoundResultingEvent<?>) s).action())
|
||||||
|
|
||||||
|
//.limitRate(4)
|
||||||
|
.onBackpressureBuffer()
|
||||||
// Buffer up to 64 requests to avoid halting the event loop, throw an error if too many requests are buffered
|
// Buffer up to 64 requests to avoid halting the event loop, throw an error if too many requests are buffered
|
||||||
.limitRate(4)
|
//.onBackpressureBuffer(64, BufferOverflowStrategy.ERROR)
|
||||||
.onBackpressureBuffer(64, BufferOverflowStrategy.ERROR)
|
|
||||||
// Send requests to tdlib
|
// Send requests to tdlib
|
||||||
.concatMap(function -> Mono
|
.flatMap(function -> Mono
|
||||||
.from(rawTelegramClient.send(function, SPECIAL_RAW_TIMEOUT_DURATION))
|
.from(rawTelegramClient.send(function, SPECIAL_RAW_TIMEOUT_DURATION))
|
||||||
.mapNotNull(resp -> {
|
.mapNotNull(resp -> {
|
||||||
if (resp.getConstructor() == TdApi.Error.CONSTRUCTOR) {
|
if (resp.getConstructor() == TdApi.Error.CONSTRUCTOR) {
|
||||||
@ -146,6 +176,9 @@ public abstract class ReactiveApiPublisher {
|
|||||||
.cast(ClientBoundResultingEvent.class)
|
.cast(ClientBoundResultingEvent.class)
|
||||||
.map(ClientBoundResultingEvent::event)
|
.map(ClientBoundResultingEvent::event)
|
||||||
|
|
||||||
|
// Buffer requests
|
||||||
|
.onBackpressureBuffer()
|
||||||
|
|
||||||
// Send events to the client
|
// Send events to the client
|
||||||
.subscribeOn(Schedulers.parallel())
|
.subscribeOn(Schedulers.parallel())
|
||||||
.subscribe(clientBoundEvent -> eventService.broadcast("session-client-bound-events",
|
.subscribe(clientBoundEvent -> eventService.broadcast("session-client-bound-events",
|
||||||
@ -156,6 +189,9 @@ public abstract class ReactiveApiPublisher {
|
|||||||
.filter(s -> s instanceof ClusterBoundResultingEvent)
|
.filter(s -> s instanceof ClusterBoundResultingEvent)
|
||||||
.cast(ClusterBoundResultingEvent.class)
|
.cast(ClusterBoundResultingEvent.class)
|
||||||
|
|
||||||
|
// Buffer requests
|
||||||
|
.onBackpressureBuffer()
|
||||||
|
|
||||||
// Send events to the cluster
|
// Send events to the cluster
|
||||||
.subscribeOn(Schedulers.parallel())
|
.subscribeOn(Schedulers.parallel())
|
||||||
.subscribe(clusterBoundEvent -> {
|
.subscribe(clusterBoundEvent -> {
|
||||||
@ -176,14 +212,28 @@ public abstract class ReactiveApiPublisher {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
protected abstract boolean isBot();
|
||||||
private ResultingEvent onSignal(Signal signal) {
|
|
||||||
|
private ResultingEvent wrapUpdateSignal(Signal signal) {
|
||||||
|
var update = (TdApi.Update) signal.getUpdate();
|
||||||
|
return new ClientBoundResultingEvent(new OnUpdateData(liveId, userId, update));
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<ResultingEvent> withUpdateSignal(Signal signal, List<ResultingEvent> list) {
|
||||||
|
var result = new ArrayList<ResultingEvent>(list.size() + 1);
|
||||||
|
result.add(wrapUpdateSignal(signal));
|
||||||
|
result.addAll(list);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@NotNull
|
||||||
|
private List<@NotNull ResultingEvent> onSignal(Signal signal) {
|
||||||
// Update the state
|
// Update the state
|
||||||
var state = this.state.updateAndGet(oldState -> oldState.withSignal(signal));
|
var state = this.state.updateAndGet(oldState -> oldState.withSignal(signal));
|
||||||
|
|
||||||
if (state.authPhase() == LOGGED_IN) {
|
if (state.authPhase() == LOGGED_IN) {
|
||||||
var update = (TdApi.Update) signal.getUpdate();
|
ResultingEvent resultingEvent = wrapUpdateSignal(signal);
|
||||||
return new ClientBoundResultingEvent(new OnUpdateData(liveId, userId, update));
|
return List.of(resultingEvent);
|
||||||
} else {
|
} else {
|
||||||
LOG.trace("Signal has not been broadcast because the session {} is not logged in: {}", userId, signal);
|
LOG.trace("Signal has not been broadcast because the session {} is not logged in: {}", userId, signal);
|
||||||
return this.handleSpecialSignal(state, signal);
|
return this.handleSpecialSignal(state, signal);
|
||||||
@ -191,27 +241,28 @@ public abstract class ReactiveApiPublisher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("SwitchStatementWithTooFewBranches")
|
@SuppressWarnings("SwitchStatementWithTooFewBranches")
|
||||||
@Nullable
|
@NotNull
|
||||||
private ResultingEvent handleSpecialSignal(State state, Signal signal) {
|
private List<@NotNull ResultingEvent> handleSpecialSignal(State state, Signal signal) {
|
||||||
if (signal.isException()) {
|
if (signal.isException()) {
|
||||||
LOG.error("Received an error signal", signal.getException());
|
LOG.error("Received an error signal", signal.getException());
|
||||||
return null;
|
return List.of();
|
||||||
}
|
}
|
||||||
if (signal.isClosed()) {
|
if (signal.isClosed()) {
|
||||||
signal.getClosed();
|
signal.getClosed();
|
||||||
LOG.info("Received a closed signal");
|
LOG.info("Received a closed signal");
|
||||||
return new ResultingEventPublisherClosed();
|
return List.of(new ResultingEventPublisherClosed());
|
||||||
}
|
}
|
||||||
if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) {
|
if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) {
|
||||||
var error = ((TdApi.Error) signal.getUpdate());
|
var error = ((TdApi.Error) signal.getUpdate());
|
||||||
LOG.error("Received a TDLib error signal! Error {}: {}", error.code, error.message);
|
LOG.error("Received a TDLib error signal! Error {}: {}", error.code, error.message);
|
||||||
return null;
|
return List.of();
|
||||||
}
|
}
|
||||||
if (!signal.isUpdate()) {
|
if (!signal.isUpdate()) {
|
||||||
LOG.error("Received a signal that's not an update: {}", signal);
|
LOG.error("Received a signal that's not an update: {}", signal);
|
||||||
return null;
|
return List.of();
|
||||||
}
|
}
|
||||||
var update = signal.getUpdate();
|
var update = signal.getUpdate();
|
||||||
|
var updateResult = wrapUpdateSignal(signal);
|
||||||
switch (state.authPhase()) {
|
switch (state.authPhase()) {
|
||||||
case BROKEN -> {}
|
case BROKEN -> {}
|
||||||
case PARAMETERS_PHASE -> {
|
case PARAMETERS_PHASE -> {
|
||||||
@ -221,7 +272,7 @@ public abstract class ReactiveApiPublisher {
|
|||||||
switch (updateAuthorizationState.authorizationState.getConstructor()) {
|
switch (updateAuthorizationState.authorizationState.getConstructor()) {
|
||||||
case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> {
|
case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> {
|
||||||
TdlibParameters parameters = generateTDLibParameters();
|
TdlibParameters parameters = generateTDLibParameters();
|
||||||
return new TDLibBoundResultingEvent<>(new SetTdlibParameters(parameters));
|
return List.of(updateResult, new TDLibBoundResultingEvent<>(new SetTdlibParameters(parameters)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -233,7 +284,7 @@ public abstract class ReactiveApiPublisher {
|
|||||||
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
|
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
|
||||||
switch (updateAuthorizationState.authorizationState.getConstructor()) {
|
switch (updateAuthorizationState.authorizationState.getConstructor()) {
|
||||||
case TdApi.AuthorizationStateWaitEncryptionKey.CONSTRUCTOR -> {
|
case TdApi.AuthorizationStateWaitEncryptionKey.CONSTRUCTOR -> {
|
||||||
return new TDLibBoundResultingEvent<>(new CheckDatabaseEncryptionKey());
|
return List.of(updateResult, new TDLibBoundResultingEvent<>(new CheckDatabaseEncryptionKey()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -245,24 +296,33 @@ public abstract class ReactiveApiPublisher {
|
|||||||
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
|
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
|
||||||
switch (updateAuthorizationState.authorizationState.getConstructor()) {
|
switch (updateAuthorizationState.authorizationState.getConstructor()) {
|
||||||
case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR -> {
|
case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR -> {
|
||||||
return onWaitCode();
|
return withUpdateSignal(signal, onWaitCode());
|
||||||
}
|
}
|
||||||
case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR -> {
|
case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR -> {
|
||||||
var link = ((AuthorizationStateWaitOtherDeviceConfirmation) updateAuthorizationState.authorizationState).link;
|
var link = ((AuthorizationStateWaitOtherDeviceConfirmation) updateAuthorizationState.authorizationState).link;
|
||||||
return new ClientBoundResultingEvent(new OnOtherDeviceLoginRequested(liveId, userId, link));
|
return List.of(updateResult,
|
||||||
|
new ClientBoundResultingEvent(new OnOtherDeviceLoginRequested(liveId, userId, link)));
|
||||||
}
|
}
|
||||||
case TdApi.AuthorizationStateWaitPassword.CONSTRUCTOR -> {
|
case TdApi.AuthorizationStateWaitPassword.CONSTRUCTOR -> {
|
||||||
return new ClientBoundResultingEvent(new OnPasswordRequested(liveId, userId));
|
var authorizationStateWaitPassword = ((AuthorizationStateWaitPassword) updateAuthorizationState.authorizationState);
|
||||||
|
return List.of(updateResult,
|
||||||
|
new ClientBoundResultingEvent(new OnPasswordRequested(liveId,
|
||||||
|
userId,
|
||||||
|
authorizationStateWaitPassword.passwordHint,
|
||||||
|
authorizationStateWaitPassword.hasRecoveryEmailAddress,
|
||||||
|
authorizationStateWaitPassword.recoveryEmailAddressPattern
|
||||||
|
))
|
||||||
|
);
|
||||||
}
|
}
|
||||||
case TdApi.AuthorizationStateWaitPhoneNumber.CONSTRUCTOR -> {
|
case TdApi.AuthorizationStateWaitPhoneNumber.CONSTRUCTOR -> {
|
||||||
return onWaitToken();
|
return withUpdateSignal(signal, onWaitToken());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return List.of();
|
||||||
}
|
}
|
||||||
|
|
||||||
private TdlibParameters generateTDLibParameters() {
|
private TdlibParameters generateTDLibParameters() {
|
||||||
@ -287,11 +347,11 @@ public abstract class ReactiveApiPublisher {
|
|||||||
return tdlibParameters;
|
return tdlibParameters;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract ResultingEvent onWaitToken();
|
protected abstract List<ResultingEvent> onWaitToken();
|
||||||
|
|
||||||
protected ResultingEvent onWaitCode() {
|
protected List<ResultingEvent> onWaitCode() {
|
||||||
LOG.error("Wait code event is not supported");
|
LOG.error("Wait code event is not supported");
|
||||||
return null;
|
return List.of();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) {
|
private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) {
|
||||||
@ -314,6 +374,11 @@ public abstract class ReactiveApiPublisher {
|
|||||||
} else if (clientBoundEvent instanceof OnOtherDeviceLoginRequested onOtherDeviceLoginRequested) {
|
} else if (clientBoundEvent instanceof OnOtherDeviceLoginRequested onOtherDeviceLoginRequested) {
|
||||||
dataOutputStream.writeByte(0x5);
|
dataOutputStream.writeByte(0x5);
|
||||||
dataOutputStream.writeUTF(onOtherDeviceLoginRequested.link());
|
dataOutputStream.writeUTF(onOtherDeviceLoginRequested.link());
|
||||||
|
} else if (clientBoundEvent instanceof OnPasswordRequested onPasswordRequested) {
|
||||||
|
dataOutputStream.writeByte(0x6);
|
||||||
|
dataOutputStream.writeUTF(onPasswordRequested.passwordHint());
|
||||||
|
dataOutputStream.writeBoolean(onPasswordRequested.hasRecoveryEmail());
|
||||||
|
dataOutputStream.writeUTF(onPasswordRequested.recoveryEmailPattern());
|
||||||
} else {
|
} else {
|
||||||
throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent);
|
throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent);
|
||||||
}
|
}
|
||||||
@ -402,14 +467,23 @@ public abstract class ReactiveApiPublisher {
|
|||||||
|
|
||||||
private final String botToken;
|
private final String botToken;
|
||||||
|
|
||||||
public ReactiveApiPublisherToken(Atomix atomix, Long liveId, long userId, String botToken) {
|
public ReactiveApiPublisherToken(Atomix atomix,
|
||||||
super(atomix, liveId, userId);
|
Set<ResultingEventTransformer> resultingEventTransformerSet,
|
||||||
|
Long liveId,
|
||||||
|
long userId,
|
||||||
|
String botToken) {
|
||||||
|
super(atomix, resultingEventTransformerSet, liveId, userId);
|
||||||
this.botToken = botToken;
|
this.botToken = botToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ResultingEvent onWaitToken() {
|
protected boolean isBot() {
|
||||||
return new TDLibBoundResultingEvent<>(new CheckAuthenticationBotToken(botToken));
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<ResultingEvent> onWaitToken() {
|
||||||
|
return List.of(new TDLibBoundResultingEvent<>(new CheckAuthenticationBotToken(botToken)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -426,25 +500,34 @@ public abstract class ReactiveApiPublisher {
|
|||||||
|
|
||||||
private final long phoneNumber;
|
private final long phoneNumber;
|
||||||
|
|
||||||
public ReactiveApiPublisherPhoneNumber(Atomix atomix, Long liveId, long userId, long phoneNumber) {
|
public ReactiveApiPublisherPhoneNumber(Atomix atomix,
|
||||||
super(atomix, liveId, userId);
|
Set<ResultingEventTransformer> resultingEventTransformerSet,
|
||||||
|
Long liveId,
|
||||||
|
long userId,
|
||||||
|
long phoneNumber) {
|
||||||
|
super(atomix, resultingEventTransformerSet, liveId, userId);
|
||||||
this.phoneNumber = phoneNumber;
|
this.phoneNumber = phoneNumber;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ResultingEvent onWaitToken() {
|
protected boolean isBot() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<ResultingEvent> onWaitToken() {
|
||||||
var authSettings = new PhoneNumberAuthenticationSettings();
|
var authSettings = new PhoneNumberAuthenticationSettings();
|
||||||
authSettings.allowFlashCall = false;
|
authSettings.allowFlashCall = false;
|
||||||
authSettings.allowSmsRetrieverApi = false;
|
authSettings.allowSmsRetrieverApi = false;
|
||||||
authSettings.isCurrentPhoneNumber = false;
|
authSettings.isCurrentPhoneNumber = false;
|
||||||
return new TDLibBoundResultingEvent<>(new SetAuthenticationPhoneNumber("+" + phoneNumber,
|
return List.of(new TDLibBoundResultingEvent<>(new SetAuthenticationPhoneNumber("+" + phoneNumber,
|
||||||
authSettings
|
authSettings
|
||||||
));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientBoundResultingEvent onWaitCode() {
|
public List<ResultingEvent> onWaitCode() {
|
||||||
return new ClientBoundResultingEvent(new OnUserLoginCodeRequested(liveId, userId, phoneNumber));
|
return List.of(new ClientBoundResultingEvent(new OnUserLoginCodeRequested(liveId, userId, phoneNumber)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -0,0 +1,9 @@
|
|||||||
|
package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
|
|
||||||
|
public interface ResultingEventTransformer {
|
||||||
|
|
||||||
|
Flux<ResultingEvent> transform(boolean isBot, Flux<ResultingEvent> events);
|
||||||
|
}
|
@ -0,0 +1,54 @@
|
|||||||
|
package it.tdlight.reactiveapi.transformer;
|
||||||
|
|
||||||
|
import it.tdlight.jni.TdApi.CheckAuthenticationCode;
|
||||||
|
import it.tdlight.jni.TdApi.CheckAuthenticationPassword;
|
||||||
|
import it.tdlight.reactiveapi.Cli;
|
||||||
|
import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested;
|
||||||
|
import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested;
|
||||||
|
import it.tdlight.reactiveapi.Event.OnPasswordRequested;
|
||||||
|
import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEventTransformer;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
|
||||||
|
public class CommandLineAuth implements ResultingEventTransformer {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Flux<ResultingEvent> transform(boolean isBot, Flux<ResultingEvent> events) {
|
||||||
|
return events.flatMapSequential(event -> {
|
||||||
|
if (event instanceof ClientBoundResultingEvent clientBoundResultingEvent) {
|
||||||
|
if (clientBoundResultingEvent.event() instanceof OnUserLoginCodeRequested requested) {
|
||||||
|
return this
|
||||||
|
.askParam("Please type the login code of " + requested.phoneNumber(), requested.userId())
|
||||||
|
.<ResultingEvent>map(code -> new TDLibBoundResultingEvent<>(new CheckAuthenticationCode(code)));
|
||||||
|
} else if (clientBoundResultingEvent.event() instanceof OnBotLoginCodeRequested requested) {
|
||||||
|
return this
|
||||||
|
.askParam("Please type the login code of " + requested.token(), requested.userId())
|
||||||
|
.<ResultingEvent>map(code -> new TDLibBoundResultingEvent<>(new CheckAuthenticationCode(code)));
|
||||||
|
} else if (clientBoundResultingEvent.event() instanceof OnPasswordRequested onPasswordRequested) {
|
||||||
|
return this
|
||||||
|
.askParam("Please type the password. Hint: " + onPasswordRequested.passwordHint() + (
|
||||||
|
onPasswordRequested.hasRecoveryEmail() ? " Recovery e-mail: "
|
||||||
|
+ onPasswordRequested.recoveryEmailPattern() : ""), onPasswordRequested.userId())
|
||||||
|
.<ResultingEvent>map(password -> new TDLibBoundResultingEvent<>(new CheckAuthenticationPassword(password)));
|
||||||
|
} else if (clientBoundResultingEvent.event() instanceof OnOtherDeviceLoginRequested onOtherDeviceLoginRequested) {
|
||||||
|
return this
|
||||||
|
.askParam("Please confirm the login on another other device, (after copying the link press enter): "
|
||||||
|
+ onOtherDeviceLoginRequested.link(), onOtherDeviceLoginRequested.userId())
|
||||||
|
.then(Mono.empty());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Mono.just(event);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private Mono<String> askParam(String text, long userId) {
|
||||||
|
return Mono
|
||||||
|
.fromCallable(() -> Cli.askParameter("[#IDU" + userId + "]" + text))
|
||||||
|
.subscribeOn(Schedulers.boundedElastic());
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,69 @@
|
|||||||
|
package it.tdlight.reactiveapi.transformer;
|
||||||
|
|
||||||
|
import it.tdlight.jni.TdApi;
|
||||||
|
import it.tdlight.jni.TdApi.OptionValueBoolean;
|
||||||
|
import it.tdlight.jni.TdApi.OptionValueInteger;
|
||||||
|
import it.tdlight.reactiveapi.Event.OnUpdateData;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEventTransformer;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
|
public class DefaultOptions implements ResultingEventTransformer {
|
||||||
|
|
||||||
|
private static final Collection<ResultingEvent> DEFAULT_OPTIONS = List.of(
|
||||||
|
setInt("message_unload_delay", 1800),
|
||||||
|
setBoolean("disable_persistent_network_statistics", true),
|
||||||
|
setBoolean("disable_time_adjustment_protection", true),
|
||||||
|
setBoolean("ignore_inline_thumbnails", true),
|
||||||
|
setBoolean("ignore_platform_restrictions", true),
|
||||||
|
setBoolean("use_storage_optimizer", true)
|
||||||
|
);
|
||||||
|
|
||||||
|
private static final Collection<ResultingEvent> DEFAULT_USER_OPTIONS = List.of(
|
||||||
|
setBoolean("disable_animated_emoji", true),
|
||||||
|
setBoolean("disable_contact_registered_notifications", true),
|
||||||
|
setBoolean("disable_top_chats", true),
|
||||||
|
setInt("notification_group_count_max", 0),
|
||||||
|
setInt("notification_group_size_max", 1)
|
||||||
|
);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Flux<ResultingEvent> transform(boolean isBot, Flux<ResultingEvent> events) {
|
||||||
|
return events.flatMapIterable(event -> {
|
||||||
|
|
||||||
|
// Append the options if the initial auth state is intercepted
|
||||||
|
if (event instanceof ClientBoundResultingEvent clientBoundResultingEvent
|
||||||
|
&& clientBoundResultingEvent.event() instanceof OnUpdateData onUpdate
|
||||||
|
&& onUpdate.update() instanceof TdApi.UpdateAuthorizationState authorizationState
|
||||||
|
&& authorizationState.authorizationState instanceof TdApi.AuthorizationStateWaitEncryptionKey) {
|
||||||
|
|
||||||
|
var resultingEvent = new ArrayList<ResultingEvent>(1 + DEFAULT_OPTIONS.size() + DEFAULT_USER_OPTIONS.size());
|
||||||
|
// Add the intercepted event
|
||||||
|
resultingEvent.add(event);
|
||||||
|
// Add the default options
|
||||||
|
resultingEvent.addAll(DEFAULT_OPTIONS);
|
||||||
|
// Add user-only default options
|
||||||
|
if (!isBot) {
|
||||||
|
resultingEvent.addAll(DEFAULT_USER_OPTIONS);
|
||||||
|
}
|
||||||
|
return resultingEvent;
|
||||||
|
} else {
|
||||||
|
// Return just the intercepted event as-is
|
||||||
|
return List.of(event);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ResultingEvent setBoolean(String optionName, boolean value) {
|
||||||
|
return new TDLibBoundResultingEvent<>(new TdApi.SetOption(optionName, new OptionValueBoolean(value)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ResultingEvent setInt(String optionName, int value) {
|
||||||
|
return new TDLibBoundResultingEvent<>(new TdApi.SetOption(optionName, new OptionValueInteger(value)));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
package it.tdlight.reactiveapi.transformer;
|
||||||
|
|
||||||
|
import it.tdlight.jni.TdApi;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEventTransformer;
|
||||||
|
import java.util.List;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
|
public class DisableChatDatabase implements ResultingEventTransformer {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Flux<ResultingEvent> transform(boolean isBot, Flux<ResultingEvent> events) {
|
||||||
|
return events.flatMapIterable(event -> {
|
||||||
|
|
||||||
|
// Change option
|
||||||
|
if (event instanceof TDLibBoundResultingEvent tdLibBoundResultingEvent
|
||||||
|
&& tdLibBoundResultingEvent.action() instanceof TdApi.SetTdlibParameters setTdlibParameters) {
|
||||||
|
setTdlibParameters.parameters.useChatInfoDatabase = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return List.of(event);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
package it.tdlight.reactiveapi.transformer;
|
||||||
|
|
||||||
|
import it.tdlight.jni.TdApi;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEventTransformer;
|
||||||
|
import java.util.List;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
|
public class DisableFileDatabase implements ResultingEventTransformer {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Flux<ResultingEvent> transform(boolean isBot, Flux<ResultingEvent> events) {
|
||||||
|
return events.flatMapIterable(event -> {
|
||||||
|
|
||||||
|
// Change option
|
||||||
|
if (event instanceof TDLibBoundResultingEvent tdLibBoundResultingEvent
|
||||||
|
&& tdLibBoundResultingEvent.action() instanceof TdApi.SetTdlibParameters setTdlibParameters) {
|
||||||
|
setTdlibParameters.parameters.useFileDatabase = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return List.of(event);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
package it.tdlight.reactiveapi.transformer;
|
||||||
|
|
||||||
|
import it.tdlight.jni.TdApi;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEventTransformer;
|
||||||
|
import java.util.List;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
|
public class DisableMessageDatabase implements ResultingEventTransformer {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Flux<ResultingEvent> transform(boolean isBot, Flux<ResultingEvent> events) {
|
||||||
|
return events.flatMapIterable(event -> {
|
||||||
|
|
||||||
|
// Change option
|
||||||
|
if (event instanceof TDLibBoundResultingEvent tdLibBoundResultingEvent
|
||||||
|
&& tdLibBoundResultingEvent.action() instanceof TdApi.SetTdlibParameters setTdlibParameters) {
|
||||||
|
setTdlibParameters.parameters.useMessageDatabase = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return List.of(event);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,13 @@
|
|||||||
|
package it.tdlight.reactiveapi.transformer;
|
||||||
|
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEventTransformer;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
|
public class DummyResultingEventTransformer implements ResultingEventTransformer {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Flux<ResultingEvent> transform(boolean isBot, Flux<ResultingEvent> events) {
|
||||||
|
return events;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,64 @@
|
|||||||
|
package it.tdlight.reactiveapi.transformer;
|
||||||
|
|
||||||
|
import it.tdlight.jni.TdApi;
|
||||||
|
import it.tdlight.jni.TdApi.OptionValueBoolean;
|
||||||
|
import it.tdlight.jni.TdApi.OptionValueInteger;
|
||||||
|
import it.tdlight.reactiveapi.Event.OnUpdateData;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEventTransformer;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
|
public class TdlightDefaultOptions implements ResultingEventTransformer {
|
||||||
|
|
||||||
|
private static final Collection<ResultingEvent> DEFAULT_OPTIONS = List.of(
|
||||||
|
setBoolean("disable_document_filenames", true),
|
||||||
|
setBoolean("disable_minithumbnails", true),
|
||||||
|
setBoolean("disable_notifications", true),
|
||||||
|
setBoolean("ignore_update_chat_last_message", true),
|
||||||
|
setBoolean("ignore_update_chat_read_inbox", true),
|
||||||
|
setBoolean("ignore_update_user_chat_action", true),
|
||||||
|
setBoolean("ignore_server_deletes_and_reads", true)
|
||||||
|
);
|
||||||
|
|
||||||
|
private static final Collection<ResultingEvent> DEFAULT_USER_OPTIONS = List.of();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Flux<ResultingEvent> transform(boolean isBot, Flux<ResultingEvent> events) {
|
||||||
|
return events.flatMapIterable(event -> {
|
||||||
|
|
||||||
|
// Append the options if the initial auth state is intercepted
|
||||||
|
if (event instanceof ClientBoundResultingEvent clientBoundResultingEvent
|
||||||
|
&& clientBoundResultingEvent.event() instanceof OnUpdateData onUpdate
|
||||||
|
&& onUpdate.update() instanceof TdApi.UpdateAuthorizationState authorizationState
|
||||||
|
&& authorizationState.authorizationState instanceof TdApi.AuthorizationStateWaitEncryptionKey) {
|
||||||
|
|
||||||
|
var resultingEvent = new ArrayList<ResultingEvent>(1 + DEFAULT_OPTIONS.size() + DEFAULT_USER_OPTIONS.size());
|
||||||
|
// Add the intercepted event
|
||||||
|
resultingEvent.add(event);
|
||||||
|
// Add the default options
|
||||||
|
resultingEvent.addAll(DEFAULT_OPTIONS);
|
||||||
|
// Add user-only default options
|
||||||
|
if (!isBot) {
|
||||||
|
resultingEvent.addAll(DEFAULT_USER_OPTIONS);
|
||||||
|
}
|
||||||
|
return resultingEvent;
|
||||||
|
} else {
|
||||||
|
// Return just the intercepted event as-is
|
||||||
|
return List.of(event);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ResultingEvent setBoolean(String optionName, boolean value) {
|
||||||
|
return new TDLibBoundResultingEvent<>(new TdApi.SetOption(optionName, new OptionValueBoolean(value)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ResultingEvent setInt(String optionName, int value) {
|
||||||
|
return new TDLibBoundResultingEvent<>(new TdApi.SetOption(optionName, new OptionValueInteger(value)));
|
||||||
|
}
|
||||||
|
}
|
@ -1,5 +1,5 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<Configuration status="WARN">
|
<Configuration>
|
||||||
<Appenders>
|
<Appenders>
|
||||||
<TerminalConsole name="ConsoleAppender">
|
<TerminalConsole name="ConsoleAppender">
|
||||||
<PatternLayout disableAnsi="false"
|
<PatternLayout disableAnsi="false"
|
||||||
@ -18,7 +18,7 @@
|
|||||||
<!-- log only INFO, WARN, ERROR and FATAL logging by classes in this package -->
|
<!-- log only INFO, WARN, ERROR and FATAL logging by classes in this package -->
|
||||||
<Logger name="io.net5" level="INFO" />
|
<Logger name="io.net5" level="INFO" />
|
||||||
|
|
||||||
<Root level="ALL">
|
<Root level="INFO">
|
||||||
<filters>
|
<filters>
|
||||||
<MarkerFilter marker="NETWORK_PACKETS" onMatch="DENY" onMismatch="NEUTRAL"/>
|
<MarkerFilter marker="NETWORK_PACKETS" onMatch="DENY" onMismatch="NEUTRAL"/>
|
||||||
</filters>
|
</filters>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user