systemPartitionGroupMembers = new ArrayList<>();
+ for (NodeSettings node : clusterSettings.nodes) {
+ bootstrapDiscoveryProviderNodes.add(Node.builder().withId(node.id).withAddress(node.address).build());
+ systemPartitionGroupMembers.add(node.id);
+ }
+
+ var bootstrapDiscoveryProviderBuilder = BootstrapDiscoveryProvider.builder();
+ bootstrapDiscoveryProviderBuilder.withNodes(bootstrapDiscoveryProviderNodes).build();
+
+ atomixBuilder.withMembershipProvider(bootstrapDiscoveryProviderBuilder.build());
+
+ atomixBuilder.withManagementGroup(RaftPartitionGroup
+ .builder("system")
+ .withNumPartitions(1)
+ .withMembers(systemPartitionGroupMembers)
+ .build());
+
+ atomixBuilder.withPartitionGroups(PrimaryBackupPartitionGroup.builder("data").withNumPartitions(32).build());
+
+ atomixBuilder.withShutdownHook(false);
+ atomixBuilder.withTypeRegistrationRequired();
+
+ if (instanceSettings.client) {
+ atomixBuilder.addProfile(Profile.consensus(systemPartitionGroupMembers));
+ atomixBuilder.addProfile(Profile.dataGrid(32));
+ } else {
+ atomixBuilder.addProfile(Profile.client());
+ }
+
+ atomixBuilder.withCompatibleSerialization(false);
+
+ var atomix = atomixBuilder.build();
+
+ TdSerializer.register(atomix.getSerializationService());
+
+ atomix.start().join();
+
+ var api = new ReactiveApi(atomix, diskSessions);
+
+ LOG.info("Starting ReactiveApi...");
+
+ api.start();
+
+ return api;
+ }
+
+ public static void main(String[] args) throws IOException {
+ var validArgs = parseArguments(args);
+ var atomixBuilder = Atomix.builder().withShutdownHookEnabled();
+ start(validArgs, atomixBuilder);
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/Event.java b/src/main/java/it/tdlight/reactiveapi/Event.java
new file mode 100644
index 0000000..dde248c
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/Event.java
@@ -0,0 +1,47 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.jni.TdApi;
+import it.tdlight.reactiveapi.Event.AuthenticatedEvent;
+
+/**
+ * Any event received from a session
+ */
+public sealed interface Event permits AuthenticatedEvent {
+
+ /**
+ *
+ * @return temporary unique identifier of the session
+ */
+ long sessionId();
+
+ /**
+ * Event received after choosing the user id of the session
+ */
+ sealed interface AuthenticatedEvent extends Event permits OnLoginCodeRequested, OnUpdate {
+
+ /**
+ *
+ * @return telegram user id of the session
+ */
+ long userId();
+ }
+
+ /**
+ * TDLib is asking for an authorization code
+ */
+ sealed interface OnLoginCodeRequested extends AuthenticatedEvent
+ permits OnBotLoginCodeRequested, OnUserLoginCodeRequested {}
+
+ final record OnUserLoginCodeRequested(long sessionId, long userId, long phoneNumber) implements OnLoginCodeRequested {}
+
+ final record OnBotLoginCodeRequested(long sessionId, long userId, String token) implements OnLoginCodeRequested {}
+
+ /**
+ * Event received from TDLib
+ */
+ sealed interface OnUpdate extends AuthenticatedEvent permits OnUpdateData, OnUpdateError {}
+
+ final record OnUpdateData(long sessionId, long userId, TdApi.Update update) implements OnUpdate {}
+
+ final record OnUpdateError(long sessionId, long userId, TdApi.Error error) implements OnUpdate {}
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java b/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java
new file mode 100644
index 0000000..ac95702
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java
@@ -0,0 +1,33 @@
+package it.tdlight.reactiveapi;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+public class InstanceSettings {
+
+ @NotNull
+ public String id;
+
+ /**
+ * True if this is just a client, false if this is a complete node
+ *
+ * A client is a lightweight node
+ */
+ public boolean client;
+
+ /**
+ * If {@link #client} is true, this will be the address of this client
+ */
+ public @Nullable String clientAddress;
+
+ @JsonCreator
+ public InstanceSettings(@JsonProperty(required = true, value = "id") @NotNull String id,
+ @JsonProperty(required = true, value = "client") boolean client,
+ @JsonProperty("clientAddress") @Nullable String clientAddress) {
+ this.id = id;
+ this.client = client;
+ this.clientAddress = clientAddress;
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/NodeSettings.java b/src/main/java/it/tdlight/reactiveapi/NodeSettings.java
new file mode 100644
index 0000000..bc1dc67
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/NodeSettings.java
@@ -0,0 +1,18 @@
+package it.tdlight.reactiveapi;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonProperty.Access;
+import org.jetbrains.annotations.NotNull;
+
+public class NodeSettings {
+
+ public @NotNull String id;
+ public @NotNull String address;
+
+ public NodeSettings(@JsonProperty(required = true, value = "id") @NotNull String id,
+ @JsonProperty(required = true, value = "address") @NotNull String address) {
+ this.id = id;
+ this.address = address;
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java
new file mode 100644
index 0000000..ee23a38
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java
@@ -0,0 +1,199 @@
+package it.tdlight.reactiveapi;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import io.atomix.core.Atomix;
+import io.atomix.core.idgenerator.AsyncAtomicIdGenerator;
+import io.atomix.core.lock.AsyncAtomicLock;
+import io.atomix.core.map.AsyncAtomicMap;
+import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest;
+import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
+import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.scheduler.Schedulers;
+
+public class ReactiveApi {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReactiveApi.class);
+
+ private final Atomix atomix;
+ private static final SchedulerExecutor SCHEDULER_EXECUTOR = new SchedulerExecutor(Schedulers.parallel());
+ private static final SchedulerExecutor BOUNDED_ELASTIC_EXECUTOR = new SchedulerExecutor(Schedulers.boundedElastic());
+ private final AsyncAtomicIdGenerator nextSessionId;
+
+ private final AsyncAtomicLock sessionModificationLock;
+ private final AsyncAtomicMap sessionIdToUserId;
+ private final ConcurrentMap localNodeSessions = new ConcurrentHashMap<>();
+ private final DiskSessionsManager diskSessions;
+
+ public ReactiveApi(Atomix atomix, DiskSessionsManager diskSessions) {
+ this.atomix = atomix;
+ this.nextSessionId = atomix.getAtomicIdGenerator("session-id").async();
+ this.sessionIdToUserId = atomix.getAtomicMap("session-id-to-user-id").async();
+ this.sessionModificationLock = atomix.getAtomicLock("session-modification").async();
+ this.diskSessions = diskSessions;
+ }
+
+ public void start() {
+ CompletableFuture.runAsync(() -> {
+ List> requests = new ArrayList<>();
+ synchronized (diskSessions) {
+ for (Entry entry : diskSessions.getSettings().sessions.entrySet()) {
+ try {
+ entry.getValue().validate();
+ } catch (Throwable ex) {
+ LOG.error("Failed to load disk session {}", entry.getKey(), ex);
+ }
+ var sessionFolderName = entry.getKey();
+ var diskSession = entry.getValue();
+ requests.add(createSession(new LoadSessionFromDiskRequest(diskSession.userId,
+ sessionFolderName,
+ diskSession.token,
+ diskSession.phoneNumber
+ )));
+ }
+ }
+ CompletableFuture
+ .allOf(requests.toArray(CompletableFuture>[]::new))
+ .thenAccept(responses -> LOG.info("Loaded all saved sessions from disk"));
+ }, BOUNDED_ELASTIC_EXECUTOR);
+
+ // Listen for create-session signals
+ atomix.getEventService().subscribe("create-session", CreateSessionRequest::deserializeBytes, req -> {
+ if (req instanceof LoadSessionFromDiskRequest) {
+ return failedFuture(new IllegalArgumentException("Can't pass a local request through the cluster"));
+ } else {
+ return createSession(req);
+ }
+ }, CreateSessionResponse::serializeBytes);
+ }
+
+ public CompletableFuture createSession(CreateSessionRequest req) {
+ // Lock sessions creation
+ return sessionModificationLock.lock().thenCompose(lockVersion -> {
+ // Generate session id
+ return this.nextFreeId().thenCompose(sessionId -> {
+ // Create the session instance
+ ReactiveApiPublisher reactiveApiPublisher;
+ boolean loadedFromDisk;
+ long userId;
+ String botToken;
+ Long phoneNumber;
+ if (req instanceof CreateBotSessionRequest createBotSessionRequest) {
+ loadedFromDisk = false;
+ userId = createBotSessionRequest.userId();
+ botToken = createBotSessionRequest.token();
+ phoneNumber = null;
+ reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, sessionId, userId, botToken);
+ } else if (req instanceof CreateUserSessionRequest createUserSessionRequest) {
+ loadedFromDisk = false;
+ userId = createUserSessionRequest.userId();
+ botToken = null;
+ phoneNumber = createUserSessionRequest.phoneNumber();
+ reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, sessionId, userId, phoneNumber);
+ } else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) {
+ loadedFromDisk = true;
+ userId = loadSessionFromDiskRequest.userId();
+ botToken = loadSessionFromDiskRequest.token();
+ phoneNumber = loadSessionFromDiskRequest.phoneNumber();
+ if (loadSessionFromDiskRequest.phoneNumber() != null) {
+ reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, sessionId, userId, phoneNumber);
+ } else {
+ reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, sessionId, userId, botToken);
+ }
+ } else {
+ return failedFuture(new UnsupportedOperationException("Unexpected value: " + req));
+ }
+
+ // Register the session instance to the local nodes map
+ var prev = localNodeSessions.put(sessionId, reactiveApiPublisher);
+ if (prev != null) {
+ LOG.error("Session id \"{}\" was already registered locally!", sessionId);
+ }
+
+ // Register the session instance to the distributed nodes map
+ return sessionIdToUserId.put(sessionId, req.userId()).thenComposeAsync(prevDistributed -> {
+ if (prevDistributed != null) {
+ LOG.error("Session id \"{}\" was already registered in the cluster!", sessionId);
+ }
+
+ CompletableFuture> saveToDiskFuture;
+ if (!loadedFromDisk) {
+ // Load existing session paths
+ HashSet alreadyExistingPaths = new HashSet<>();
+ synchronized (diskSessions) {
+ for (var entry : diskSessions.getSettings().sessions.entrySet()) {
+ var path = entry.getKey();
+ var diskSessionSettings = entry.getValue();
+ if (diskSessionSettings.userId == userId) {
+ LOG.warn("User id \"{}\" session already exists in path: \"{}\"", userId, path);
+ }
+ alreadyExistingPaths.add(entry.getKey());
+ }
+ }
+
+ // Get a new disk session folder name
+ String diskSessionFolderName;
+ do {
+ diskSessionFolderName = UUID.randomUUID().toString();
+ } while (alreadyExistingPaths.contains(diskSessionFolderName));
+
+ // Create the disk session configuration
+ var diskSession = new DiskSession(userId, botToken, phoneNumber);
+ Path path;
+ synchronized (diskSessions) {
+ diskSessions.getSettings().sessions.put(diskSessionFolderName, diskSession);
+ path = Paths.get(diskSessions.getSettings().path).resolve(diskSessionFolderName);
+ }
+
+ // Start the session instance
+ reactiveApiPublisher.start(path);
+
+ saveToDiskFuture = CompletableFuture.runAsync(() -> {
+ // Save updated sessions configuration to disk
+ try {
+ synchronized (diskSessions) {
+ diskSessions.save();
+ }
+ } catch (IOException e) {
+ throw new CompletionException("Failed to save disk sessions configuration", e);
+ }
+ }, BOUNDED_ELASTIC_EXECUTOR);
+ } else {
+ saveToDiskFuture = completedFuture(null);
+ }
+
+ return saveToDiskFuture.thenApply(ignored -> new CreateSessionResponse(sessionId));
+ }, BOUNDED_ELASTIC_EXECUTOR);
+ });
+ });
+ }
+
+ public CompletableFuture nextFreeId() {
+ return nextSessionId.nextId().thenCompose(id -> sessionIdToUserId.containsKey(id).thenCompose(exists -> {
+ if (exists) {
+ return nextFreeId();
+ } else {
+ return completedFuture(id);
+ }
+ }));
+ }
+
+ public Atomix getAtomix() {
+ return atomix;
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
new file mode 100644
index 0000000..c081cf9
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
@@ -0,0 +1,44 @@
+package it.tdlight.reactiveapi;
+
+import io.atomix.core.Atomix;
+import it.tdlight.jni.TdApi;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.Executors;
+import org.apache.commons.lang3.SerializationException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink.OverflowStrategy;
+import reactor.core.scheduler.Schedulers;
+
+public class ReactiveApiPublisher {
+
+ private static final SchedulerExecutor SCHEDULER_EXECUTOR = new SchedulerExecutor(Schedulers.boundedElastic());
+
+ private final Atomix atomix;
+ private final long userId;
+ private final long sessionId;
+ private final String botToken;
+ private final Long phoneNumber;
+
+ private ReactiveApiPublisher(Atomix atomix, long sessionId, long userId, String botToken, Long phoneNumber) {
+ this.atomix = atomix;
+ this.userId = userId;
+ this.sessionId = sessionId;
+ this.botToken = botToken;
+ this.phoneNumber = phoneNumber;
+ }
+
+ public static ReactiveApiPublisher fromToken(Atomix atomix, Long sessionId, long userId, String token) {
+ return new ReactiveApiPublisher(atomix, sessionId, userId, token, null);
+ }
+
+ public static ReactiveApiPublisher fromPhoneNumber(Atomix atomix, Long sessionId, long userId, long phoneNumber) {
+ return new ReactiveApiPublisher(atomix, sessionId, userId, null, phoneNumber);
+ }
+
+ public void start(Path path) {
+
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiSubscriber.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiSubscriber.java
new file mode 100644
index 0000000..88045d9
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiSubscriber.java
@@ -0,0 +1,5 @@
+package it.tdlight.reactiveapi;
+
+public class ReactiveApiSubscriber {
+
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiUpdate.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiUpdate.java
new file mode 100644
index 0000000..914c1d4
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiUpdate.java
@@ -0,0 +1,8 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.jni.TdApi;
+
+/**
+ * {@link #sessionUuid} changes every time a session is restarted
+ */
+public record ReactiveApiUpdate(long sessionUuid, TdApi.Object update) {}
diff --git a/src/main/java/it/tdlight/reactiveapi/SchedulerExecutor.java b/src/main/java/it/tdlight/reactiveapi/SchedulerExecutor.java
new file mode 100644
index 0000000..1bc8140
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/SchedulerExecutor.java
@@ -0,0 +1,19 @@
+package it.tdlight.reactiveapi;
+
+import java.util.concurrent.Executor;
+import org.jetbrains.annotations.NotNull;
+import reactor.core.scheduler.Scheduler;
+
+public class SchedulerExecutor implements Executor {
+
+ private final Scheduler scheduler;
+
+ public SchedulerExecutor(Scheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ @Override
+ public void execute(@NotNull Runnable command) {
+ scheduler.schedule(command);
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/TdSerializer.java b/src/main/java/it/tdlight/reactiveapi/TdSerializer.java
new file mode 100644
index 0000000..e3a5118
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/TdSerializer.java
@@ -0,0 +1,63 @@
+package it.tdlight.reactiveapi;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.KryoDataInput;
+import com.esotericsoftware.kryo.io.KryoDataOutput;
+import com.esotericsoftware.kryo.io.Output;
+import io.atomix.primitive.serialization.SerializationService;
+import it.tdlight.common.ConstructorDetector;
+import it.tdlight.jni.TdApi;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Modifier;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.SerializationException;
+
+public class TdSerializer extends Serializer {
+
+ private TdSerializer() {
+
+ }
+
+ public static void register(SerializationService serializationService) {
+ var serializerBuilder = serializationService.newBuilder("TdApi");
+ var tdApiClasses = TdApi.class.getDeclaredClasses();
+ // Add types
+ Class>[] classes = Stream
+ .of(tdApiClasses)
+ .filter(clazz -> clazz.isAssignableFrom(TdApi.Object.class))
+ .toArray(Class>[]::new);
+
+ serializerBuilder.addSerializer(new TdSerializer(), classes);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, TdApi.Object object) {
+ try {
+ object.serialize(new KryoDataOutput(output));
+ } catch (IOException e) {
+ throw new SerializationException(e);
+ }
+ }
+
+ @Override
+ public TdApi.Object read(Kryo kryo, Input input, Class type) {
+ try {
+ return TdApi.Deserializer.deserialize(new KryoDataInput(input));
+ } catch (IOException e) {
+ throw new SerializationException(e);
+ }
+ }
+
+ public static TdApi.Object deserializeBytes(byte[] bytes) {
+ var din = new DataInputStream(new ByteArrayInputStream(bytes));
+ try {
+ return TdApi.Deserializer.deserialize(din);
+ } catch (IOException e) {
+ throw new SerializationException(e);
+ }
+ }
+}
diff --git a/src/main/java/it/tdlight/tdlibsession/FatalErrorType.java b/src/main/java/it/tdlight/tdlibsession/FatalErrorType.java
deleted file mode 100644
index 9bb2bdf..0000000
--- a/src/main/java/it/tdlight/tdlibsession/FatalErrorType.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package it.tdlight.tdlibsession;
-
-public enum FatalErrorType {
- ACCESS_TOKEN_INVALID, PHONE_NUMBER_INVALID, CONNECTION_KILLED, INVALID_UPDATE, PHONE_NUMBER_BANNED
-}
diff --git a/src/main/java/it/tdlight/tdlibsession/SignalMessage.java b/src/main/java/it/tdlight/tdlibsession/SignalMessage.java
deleted file mode 100644
index 78d31b9..0000000
--- a/src/main/java/it/tdlight/tdlibsession/SignalMessage.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package it.tdlight.tdlibsession;
-
-import java.util.Objects;
-import java.util.StringJoiner;
-
-class SignalMessage {
-
- private final SignalType signalType;
- private final T item;
- private final String errorMessage;
-
- private SignalMessage(SignalType signalType, T item, String errorMessage) {
- this.signalType = signalType;
- this.item = item;
- this.errorMessage = errorMessage;
- }
-
- public static SignalMessage onNext(T item) {
- return new SignalMessage<>(SignalType.ITEM, Objects.requireNonNull(item), null);
- }
-
- public static SignalMessage onError(Throwable throwable) {
- return new SignalMessage(SignalType.ERROR, null, Objects.requireNonNull(throwable.getMessage()));
- }
-
- static SignalMessage onDecodedError(String throwable) {
- return new SignalMessage(SignalType.ERROR, null, Objects.requireNonNull(throwable));
- }
-
- public static SignalMessage onComplete() {
- return new SignalMessage(SignalType.COMPLETE, null, null);
- }
-
- public SignalType getSignalType() {
- return signalType;
- }
-
- public String getErrorMessage() {
- return Objects.requireNonNull(errorMessage);
- }
-
- public T getItem() {
- return Objects.requireNonNull(item);
- }
-
- @Override
- public String toString() {
- return new StringJoiner(", ", SignalMessage.class.getSimpleName() + "[", "]")
- .add("signalType=" + signalType)
- .add("item=" + item)
- .add("errorMessage='" + errorMessage + "'")
- .toString();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- SignalMessage> that = (SignalMessage>) o;
-
- if (signalType != that.signalType) {
- return false;
- }
- if (!Objects.equals(item, that.item)) {
- return false;
- }
- return Objects.equals(errorMessage, that.errorMessage);
- }
-
- @Override
- public int hashCode() {
- int result = signalType != null ? signalType.hashCode() : 0;
- result = 31 * result + (item != null ? item.hashCode() : 0);
- result = 31 * result + (errorMessage != null ? errorMessage.hashCode() : 0);
- return result;
- }
-}
diff --git a/src/main/java/it/tdlight/tdlibsession/SignalMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/SignalMessageCodec.java
deleted file mode 100644
index 7af66f6..0000000
--- a/src/main/java/it/tdlight/tdlibsession/SignalMessageCodec.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package it.tdlight.tdlibsession;
-
-import io.vertx.core.buffer.Buffer;
-import io.vertx.core.eventbus.MessageCodec;
-import it.tdlight.utils.VertxBufferInputStream;
-import it.tdlight.utils.VertxBufferOutputStream;
-import java.nio.charset.StandardCharsets;
-import org.warp.commonutils.stream.SafeDataInputStream;
-import org.warp.commonutils.stream.SafeDataOutputStream;
-
-public class SignalMessageCodec implements MessageCodec, SignalMessage> {
-
- private final String codecName;
- private final MessageCodec typeCodec;
-
- public SignalMessageCodec(MessageCodec typeCodec) {
- super();
- this.codecName = "SignalCodec-" + typeCodec.name();
- this.typeCodec = typeCodec;
- }
-
- @Override
- public void encodeToWire(Buffer buffer, SignalMessage t) {
- try (var bos = new VertxBufferOutputStream(buffer)) {
- try (var dos = new SafeDataOutputStream(bos)) {
- switch (t.getSignalType()) {
- case ITEM:
- dos.writeByte(0x01);
- break;
- case ERROR:
- dos.writeByte(0x02);
- break;
- case COMPLETE:
- dos.writeByte(0x03);
- break;
- default:
- throw new IllegalStateException("Unexpected value: " + t.getSignalType());
- }
- }
- switch (t.getSignalType()) {
- case ITEM:
- typeCodec.encodeToWire(buffer, t.getItem());
- break;
- case ERROR:
- var stringBytes = t.getErrorMessage().getBytes(StandardCharsets.UTF_8);
- buffer.appendInt(stringBytes.length);
- buffer.appendBytes(stringBytes);
- break;
- }
- }
- }
-
- @Override
- public SignalMessage decodeFromWire(int pos, Buffer buffer) {
- try (var fis = new VertxBufferInputStream(buffer, pos)) {
- try (var dis = new SafeDataInputStream(fis)) {
- switch (dis.readByte()) {
- case 0x01:
- return SignalMessage.onNext(typeCodec.decodeFromWire(pos + 1, buffer));
- case 0x02:
- var size = dis.readInt();
- return SignalMessage.onDecodedError(new String(dis.readNBytes(size), StandardCharsets.UTF_8));
- case 0x03:
- return SignalMessage.onComplete();
- default:
- throw new IllegalStateException("Unexpected value: " + dis.readByte());
- }
- }
- }
- }
-
- @Override
- public SignalMessage transform(SignalMessage t) {
- // If a message is sent *locally* across the event bus.
- // This sends message just as is
- return t;
- }
-
- @Override
- public String name() {
- return codecName;
- }
-
- @Override
- public byte systemCodecID() {
- // Always -1
- return -1;
- }
-}
diff --git a/src/main/java/it/tdlight/tdlibsession/SignalType.java b/src/main/java/it/tdlight/tdlibsession/SignalType.java
deleted file mode 100644
index 3492115..0000000
--- a/src/main/java/it/tdlight/tdlibsession/SignalType.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package it.tdlight.tdlibsession;
-
-enum SignalType {
- COMPLETE, ERROR, ITEM
-}
diff --git a/src/main/java/it/tdlight/tdlibsession/VariableWrapper.java b/src/main/java/it/tdlight/tdlibsession/VariableWrapper.java
deleted file mode 100644
index 1d510c1..0000000
--- a/src/main/java/it/tdlight/tdlibsession/VariableWrapper.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package it.tdlight.tdlibsession;
-
-public class VariableWrapper {
-
- public volatile T var;
-
- public VariableWrapper(T value) {
- this.var = value;
- }
-}
diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/BinlogManager.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/BinlogManager.java
deleted file mode 100644
index ca23615..0000000
--- a/src/main/java/it/tdlight/tdlibsession/remoteclient/BinlogManager.java
+++ /dev/null
@@ -1,3 +0,0 @@
-package it.tdlight.tdlibsession.remoteclient;
-
-public class BinlogManager {}
diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/DeployClientResult.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/DeployClientResult.java
deleted file mode 100644
index b4fd4d7..0000000
--- a/src/main/java/it/tdlight/tdlibsession/remoteclient/DeployClientResult.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package it.tdlight.tdlibsession.remoteclient;
-
-public enum DeployClientResult {
- DEPLOYED,
- IGNORED,
- FAILED
-}
diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/RemoteClientBotAddresses.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/RemoteClientBotAddresses.java
deleted file mode 100644
index af1ded3..0000000
--- a/src/main/java/it/tdlight/tdlibsession/remoteclient/RemoteClientBotAddresses.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package it.tdlight.tdlibsession.remoteclient;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class RemoteClientBotAddresses {
-
- private final LinkedHashSet addresses;
- private final LinkedHashSet tempAddresses;
- private final Path addressesFilePath;
-
- public RemoteClientBotAddresses(Path addressesFilePath) throws IOException {
- this.addressesFilePath = addressesFilePath;
- if (Files.notExists(addressesFilePath)) {
- Files.createFile(addressesFilePath);
- }
- tempAddresses = new LinkedHashSet<>();
- addresses = Files
- .readAllLines(addressesFilePath, StandardCharsets.UTF_8)
- .stream()
- .filter(address -> !address.isBlank())
- .collect(Collectors.toCollection(LinkedHashSet::new));
- }
-
- public synchronized void putAddress(String address) throws IOException {
- tempAddresses.remove(address);
- addresses.add(address);
- Files.write(addressesFilePath, addresses, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.SYNC);
- }
-
- public synchronized void putTempAddress(String address) {
- tempAddresses.add(address);
- }
-
- public synchronized void removeAddress(String address) throws IOException {
- tempAddresses.remove(address);
- addresses.remove(address);
- Files.write(addressesFilePath, addresses, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.SYNC);
- }
-
- public synchronized boolean has(String botAddress) {
- return addresses.contains(botAddress) || tempAddresses.contains(botAddress);
- }
-
- public synchronized Set values() {
- return new HashSet<>(addresses);
- }
-}
diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/SecurityInfo.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/SecurityInfo.java
deleted file mode 100644
index bc5a4ea..0000000
--- a/src/main/java/it/tdlight/tdlibsession/remoteclient/SecurityInfo.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package it.tdlight.tdlibsession.remoteclient;
-
-import io.vertx.core.file.FileSystemException;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.NoSuchElementException;
-import java.util.StringJoiner;
-
-public class SecurityInfo {
-
- private final Path keyStorePath;
- private final Path keyStorePasswordPath;
- private final Path trustStorePath;
- private final Path trustStorePasswordPath;
-
- public SecurityInfo(Path keyStorePath, Path keyStorePasswordPath, Path trustStorePath, Path trustStorePasswordPath) {
- this.keyStorePath = keyStorePath;
- this.keyStorePasswordPath = keyStorePasswordPath;
- this.trustStorePath = trustStorePath;
- this.trustStorePasswordPath = trustStorePasswordPath;
- }
-
- public Path getKeyStorePath() {
- return keyStorePath;
- }
-
- public Path getKeyStorePasswordPath() {
- return keyStorePasswordPath;
- }
-
- public String getKeyStorePassword(boolean required) {
- try {
- if (Files.isReadable(keyStorePasswordPath) && Files.size(keyStorePasswordPath) >= 6) {
- return Files.readString(keyStorePasswordPath, StandardCharsets.UTF_8).split("\n")[0];
- } else if (required) {
- throw new NoSuchElementException("No keystore password is set on '" + keyStorePasswordPath.toString() + "'");
- }
- } catch (IOException ex) {
- throw new FileSystemException(ex);
- }
- return null;
- }
-
- public Path getTrustStorePath() {
- return trustStorePath;
- }
-
- public Path getTrustStorePasswordPath() {
- return trustStorePasswordPath;
- }
-
- public String getTrustStorePassword(boolean required) {
- try {
- if (Files.isReadable(trustStorePasswordPath) && Files.size(trustStorePasswordPath) >= 6) {
- return Files.readString(trustStorePasswordPath, StandardCharsets.UTF_8).split("\n")[0];
- } else if (required) {
- throw new NoSuchElementException("No truststore password is set on '" + trustStorePasswordPath.toString() + "'");
- }
- } catch (IOException ex) {
- throw new FileSystemException(ex);
- }
- return null;
- }
-
- @Override
- public String toString() {
- return new StringJoiner(", ", SecurityInfo.class.getSimpleName() + "[", "]")
- .add("keyStorePath=" + keyStorePath)
- .add("keyStorePasswordPath=" + keyStorePasswordPath)
- .add("trustStorePath=" + trustStorePath)
- .add("trustStorePasswordPath=" + trustStorePasswordPath)
- .toString();
- }
-}
diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java
deleted file mode 100644
index cedd4c4..0000000
--- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java
+++ /dev/null
@@ -1,233 +0,0 @@
-package it.tdlight.tdlibsession.remoteclient;
-
-import com.akaita.java.rxjava2debug.RxJava2Debug;
-import io.vertx.core.DeploymentOptions;
-import io.vertx.core.json.JsonObject;
-import io.vertx.core.net.JksOptions;
-import io.vertx.reactivex.core.eventbus.Message;
-import io.vertx.reactivex.core.eventbus.MessageConsumer;
-import it.tdlight.common.Init;
-import it.tdlight.common.Log;
-import it.tdlight.common.utils.CantLoadLibrary;
-import it.tdlight.tdlibsession.td.middle.StartSessionMessage;
-import it.tdlight.tdlibsession.td.middle.TdClusterManager;
-import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer;
-import it.tdlight.tdnative.NativeLog;
-import it.tdlight.utils.BinlogUtils;
-import it.tdlight.utils.MonoUtils;
-import java.net.URISyntaxException;
-import java.nio.file.FileAlreadyExistsException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.logging.log4j.LogManager;
-import org.jetbrains.annotations.Nullable;
-import org.warp.commonutils.log.Logger;
-import org.warp.commonutils.log.LoggerFactory;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks;
-import reactor.core.publisher.Sinks.One;
-import reactor.core.scheduler.Schedulers;
-import reactor.tools.agent.ReactorDebugAgent;
-
-public class TDLibRemoteClient implements AutoCloseable {
-
- private static final Logger logger = LoggerFactory.getLogger(TDLibRemoteClient.class);
-
- @Nullable
- private final SecurityInfo securityInfo;
- private final String masterHostname;
- private final String netInterface;
- private final int port;
- private final Set membersAddresses;
- private final AtomicReference clusterManager = new AtomicReference<>();
-
- public TDLibRemoteClient(@Nullable SecurityInfo securityInfo,
- String masterHostname,
- String netInterface,
- int port,
- Set membersAddresses,
- boolean enableAsyncStacktraces,
- boolean enableFullAsyncStacktraces) {
- this.securityInfo = securityInfo;
- this.masterHostname = masterHostname;
- this.netInterface = netInterface;
- this.port = port;
- this.membersAddresses = membersAddresses;
-
- if (enableAsyncStacktraces && enableFullAsyncStacktraces) {
- RxJava2Debug.enableRxJava2AssemblyTracking(new String[]{"it.tdlight.utils", "it.tdlight.tdlibsession"});
- }
-
- try {
- Init.start();
- //noinspection deprecation
- Log.setVerbosityLevel(2);
- } catch (CantLoadLibrary ex) {
- throw new RuntimeException(ex);
- }
- }
-
- public static void main(String[] args) throws URISyntaxException {
- if (args.length < 1) {
- return;
- }
-
- String masterHostname = args[0];
-
- String[] interfaceAndPort = args[1].split(":", 2);
-
- String netInterface = interfaceAndPort[0];
-
- int port = Integer.parseInt(interfaceAndPort[1]);
-
- Set membersAddresses = Set.of(args[2].split(","));
-
- Path keyStorePath = Paths.get(args[3]);
- Path keyStorePasswordPath = Paths.get(args[4]);
- Path trustStorePath = Paths.get(args[5]);
- Path trustStorePasswordPath = Paths.get(args[6]);
- boolean enableAsyncStacktraces = Boolean.parseBoolean(args[7]);
- boolean enableFullAsyncStacktraces = Boolean.parseBoolean(args[8]);
-
- var loggerContext = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false);
- loggerContext.setConfigLocation(Objects
- .requireNonNull(TDLibRemoteClient.class.getResource("/tdlib-session-container-log4j2.xml"),
- "tdlib-session-container-log4j2.xml doesn't exist")
- .toURI());
-
- var securityInfo = new SecurityInfo(keyStorePath, keyStorePasswordPath, trustStorePath, trustStorePasswordPath);
-
- var client = new TDLibRemoteClient(securityInfo,
- masterHostname,
- netInterface,
- port,
- membersAddresses,
- enableAsyncStacktraces,
- enableFullAsyncStacktraces
- );
-
- client
- .start()
- .block();
-
- // Close vert.x on shutdown
- var vertxMono = Mono.fromCallable(() -> client.clusterManager.get().getVertx());
- Runtime
- .getRuntime()
- .addShutdownHook(new Thread(() -> vertxMono
- .flatMap(vertx -> MonoUtils.toMono(vertx.rxClose()))
- .blockOptional())
- );
- }
-
- public Mono start() {
- var ksp = securityInfo == null ? null : securityInfo.getKeyStorePassword(false);
- var keyStoreOptions = securityInfo == null || ksp == null ? null : new JksOptions()
- .setPath(securityInfo.getKeyStorePath().toAbsolutePath().toString())
- .setPassword(ksp);
-
- var tsp = securityInfo == null ? null : securityInfo.getTrustStorePassword(false);
- var trustStoreOptions = securityInfo == null || tsp == null ? null : new JksOptions()
- .setPath(securityInfo.getTrustStorePath().toAbsolutePath().toString())
- .setPassword(tsp);
-
- return MonoUtils
- .fromBlockingEmpty(() -> {
- // Set verbosity level here, before creating the bots
- if (Files.notExists(Paths.get("logs"))) {
- try {
- Files.createDirectory(Paths.get("logs"));
- } catch (FileAlreadyExistsException ignored) {
- }
- }
-
- logger.info(
- "TDLib remote client is being hosted on" + netInterface + ":" + port + ". Master: " + masterHostname);
- logger.info(
- "TDLib remote client SSL enabled: " + (keyStoreOptions != null && trustStoreOptions != null));
- })
- .then(TdClusterManager.ofNodes(keyStoreOptions,
- trustStoreOptions,
- false,
- masterHostname,
- netInterface,
- port,
- membersAddresses
- ))
- .doOnSuccess(clusterManager::set)
- .single()
- .doOnError(ex -> logger.error("Failed to set cluster manager", ex))
- .flatMap(clusterManager -> {
- MessageConsumer startBotConsumer
- = clusterManager.getEventBus().consumer("bots.start-bot");
-
- return this.listenForStartBotsCommand(
- clusterManager,
- MonoUtils.fromReplyableMessageConsumer(Mono.empty(), startBotConsumer)
- );
- })
- .then();
- }
-
- private Mono listenForStartBotsCommand(TdClusterManager clusterManager,
- Flux> messages) {
- return MonoUtils
- .fromBlockingEmpty(() -> messages
- .flatMapSequential(msg -> {
- StartSessionMessage req = msg.body();
- DeploymentOptions deploymentOptions = clusterManager
- .newDeploymentOpts()
- .setConfig(new JsonObject()
- .put("botId", req.id())
- .put("botAlias", req.alias())
- .put("local", false)
- .put("implementationDetails", req.implementationDetails()));
- var verticle = new AsyncTdMiddleEventBusServer();
-
- // Binlog path
- var sessPath = getSessionDirectory(req.id());
- var mediaPath = getMediaDirectory(req.id());
- var blPath = getSessionBinlogDirectory(req.id());
-
- return BinlogUtils
- .chooseBinlog(clusterManager.getVertx().fileSystem(), blPath, req.binlog(), req.binlogDate())
- .then(BinlogUtils.cleanSessionPath(clusterManager.getVertx().fileSystem(), blPath, sessPath, mediaPath))
- .then(clusterManager.getVertx().rxDeployVerticle(verticle, deploymentOptions).as(MonoUtils::toMono))
- .then(MonoUtils.fromBlockingEmpty(() -> msg.reply(new byte[0])))
- .onErrorResume(ex -> {
- msg.fail(500, "Failed to deploy bot verticle: " + ex.getMessage());
- logger.error("Failed to deploy bot verticle", ex);
- return Mono.empty();
- });
- })
- .subscribeOn(Schedulers.parallel())
- .subscribe(
- v -> {},
- ex -> logger.error("Bots starter activity crashed. From now on, no new bots can be started anymore", ex)
- )
- );
- }
-
- public static Path getSessionDirectory(long botId) {
- return Paths.get(".sessions-cache").resolve("id" + botId);
- }
-
- public static Path getMediaDirectory(long botId) {
- return Paths.get(".cache").resolve("media").resolve("id" + botId);
- }
-
- public static Path getSessionBinlogDirectory(long botId) {
- return getSessionDirectory(botId).resolve("td.binlog");
- }
-
- @Override
- public void close() {
- this.clusterManager.get().getVertx().rxClose().blockingAwait();
- }
-}
diff --git a/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java
deleted file mode 100644
index a4f1612..0000000
--- a/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package it.tdlight.tdlibsession.td;
-
-import it.tdlight.jni.TdApi;
-import it.tdlight.jni.TdApi.Object;
-import java.time.Duration;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-public interface ReactorTelegramClient {
-
- Mono initialize();
-
- Flux