diff --git a/pom.xml b/pom.xml
index d4b778e..1cd56db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,12 +78,12 @@
org.apache.logging.log4j
log4j-core
- 2.14.1
+ 2.15.0
org.apache.logging.log4j
log4j-slf4j-impl
- 2.14.1
+ 2.15.0
com.fasterxml.jackson.dataformat
@@ -98,7 +98,7 @@
it.tdlight
tdlight-java-bom
- 2.7.10.6
+ 2.7.10.7
pom
import
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java
index 78e04a1..a6c8438 100644
--- a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java
+++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java
@@ -21,6 +21,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -239,11 +240,12 @@ public class ReactiveApi {
// Register the session instance to the distributed nodes map
return Mono
- .fromCompletionStage(() -> userIdToNodeId.put(userId, nodeId))
+ .fromCompletionStage(() -> userIdToNodeId.put(userId, nodeId).thenApply(Optional::ofNullable))
.flatMap(prevDistributed -> {
- if (prevDistributed != null && prevDistributed.value() != null &&
- !Objects.equals(this.nodeId, prevDistributed.value())) {
- LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId, prevDistributed.value());
+ if (prevDistributed.isPresent() && prevDistributed.get().value() != null &&
+ !Objects.equals(this.nodeId, prevDistributed.get().value())) {
+ LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId,
+ prevDistributed.get().value());
}
var saveToDiskMono = Mono
@@ -264,7 +266,8 @@ public class ReactiveApi {
return Mono
.fromCallable(() -> {
synchronized (diskSessions) {
- return Paths.get(diskSessions.getSettings().path);
+ return Objects.requireNonNull(Paths.get(diskSessions.getSettings().path),
+ "Session " + userId + " path is missing");
}
})
.subscribeOn(Schedulers.boundedElastic())
@@ -285,7 +288,9 @@ public class ReactiveApi {
return Mono.just(sessionPath);
}
})
- .doOnNext(reactiveApiPublisher::start)
+ .doOnNext(path -> reactiveApiPublisher.start(path,
+ () -> ReactiveApi.this.onPublisherClosed(userId)
+ ))
.thenReturn(new CreateSessionResponse(liveId));
});
});
@@ -303,6 +308,16 @@ public class ReactiveApi {
.doOnError(ex -> LOG.debug("Handled session request {}, the response is: error", req, ex));
}
+ private void onPublisherClosed(long userId) {
+ this.destroySession(userId, nodeId).whenComplete((result, ex) -> {
+ if (ex != null) {
+ LOG.error("Failed to close the session for user {} after it was closed itself", userId);
+ } else {
+ LOG.debug("Closed the session for user {} after it was closed itself", userId);
+ }
+ });
+ }
+
public Mono nextFreeLiveId() {
return Mono.fromCompletionStage(nextSessionLiveId::nextId);
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
index c2167b7..13f56cb 100644
--- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
+++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
@@ -26,6 +26,8 @@ import it.tdlight.reactiveapi.Event.OnUpdateError;
import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested;
import it.tdlight.reactiveapi.Event.Request;
import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent;
+import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent;
+import it.tdlight.reactiveapi.ResultingEvent.ResultingEventPublisherClosed;
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
import it.tdlight.tdlight.ClientManager;
import java.io.ByteArrayInputStream;
@@ -90,13 +92,16 @@ public abstract class ReactiveApiPublisher {
return new ReactiveApiPublisherPhoneNumber(atomix, liveId, userId, phoneNumber);
}
- public void start(Path path) {
+ public void start(Path path, @Nullable Runnable onClose) {
this.path.set(path);
LOG.info("Starting session \"{}\" in path \"{}\"", this, path);
var publishedResultingEvents = telegramClient
.subscribeOn(Schedulers.parallel())
// Handle signals, then return a ResultingEvent
.mapNotNull(this::onSignal)
+ .doFinally(s -> {
+ LOG.trace("Finalized telegram client events");
+ })
.publish();
publishedResultingEvents
@@ -143,6 +148,23 @@ public abstract class ReactiveApiPublisher {
.subscribe(clientBoundEvent -> eventService.broadcast("session-" + liveId + "-client-bound-events",
clientBoundEvent, ReactiveApiPublisher::serializeEvent));
+ publishedResultingEvents
+ // Obtain only cluster-bound events
+ .filter(s -> s instanceof ClusterBoundResultingEvent)
+ .cast(ClusterBoundResultingEvent.class)
+
+ // Send events to the cluster
+ .subscribeOn(Schedulers.parallel())
+ .subscribe(clusterBoundEvent -> {
+ if (clusterBoundEvent instanceof ResultingEventPublisherClosed) {
+ if (onClose != null) {
+ onClose.run();
+ }
+ } else {
+ LOG.error("Unknown cluster-bound event: {}", clusterBoundEvent);
+ }
+ });
+
var prev = this.disposable.getAndSet(publishedResultingEvents.connect());
if (prev != null) {
@@ -175,7 +197,7 @@ public abstract class ReactiveApiPublisher {
if (signal.isClosed()) {
signal.getClosed();
LOG.info("Received a closed signal");
- return null;
+ return new ResultingEventPublisherClosed();
}
if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) {
var error = ((TdApi.Error) signal.getUpdate());
@@ -195,25 +217,8 @@ public abstract class ReactiveApiPublisher {
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
switch (updateAuthorizationState.authorizationState.getConstructor()) {
case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> {
- var tdlibParameters = new TdlibParameters();
- var path = requireNonNull(this.path.get(), "Path must not be null");
- tdlibParameters.databaseDirectory = path.resolve("database").toString();
- tdlibParameters.apiId = 94575;
- tdlibParameters.apiHash = "a3406de8d171bb422bb6ddf3bbd800e2";
- tdlibParameters.filesDirectory = path.resolve("files").toString();
- tdlibParameters.applicationVersion = it.tdlight.reactiveapi.generated.LibraryVersion.VERSION;
- tdlibParameters.deviceModel = System.getProperty("os.name");
- tdlibParameters.systemVersion = System.getProperty("os.version");
- tdlibParameters.enableStorageOptimizer = true;
- tdlibParameters.ignoreFileNames = true;
- tdlibParameters.useTestDc = false;
- tdlibParameters.useSecretChats = false;
-
- tdlibParameters.useMessageDatabase = true;
- tdlibParameters.useFileDatabase = true;
- tdlibParameters.useChatInfoDatabase = true;
- tdlibParameters.systemLanguageCode = System.getProperty("user.language", "en");
- return new TDLibBoundResultingEvent<>(new SetTdlibParameters(tdlibParameters));
+ TdlibParameters parameters = generateTDLibParameters();
+ return new TDLibBoundResultingEvent<>(new SetTdlibParameters(parameters));
}
}
}
@@ -256,6 +261,28 @@ public abstract class ReactiveApiPublisher {
return null;
}
+ private TdlibParameters generateTDLibParameters() {
+ var tdlibParameters = new TdlibParameters();
+ var path = requireNonNull(this.path.get(), "Path must not be null");
+ tdlibParameters.databaseDirectory = path.resolve("database").toString();
+ tdlibParameters.apiId = 94575;
+ tdlibParameters.apiHash = "a3406de8d171bb422bb6ddf3bbd800e2";
+ tdlibParameters.filesDirectory = path.resolve("files").toString();
+ tdlibParameters.applicationVersion = it.tdlight.reactiveapi.generated.LibraryVersion.VERSION;
+ tdlibParameters.deviceModel = System.getProperty("os.name");
+ tdlibParameters.systemVersion = System.getProperty("os.version");
+ tdlibParameters.enableStorageOptimizer = true;
+ tdlibParameters.ignoreFileNames = true;
+ tdlibParameters.useTestDc = false;
+ tdlibParameters.useSecretChats = false;
+
+ tdlibParameters.useMessageDatabase = true;
+ tdlibParameters.useFileDatabase = true;
+ tdlibParameters.useChatInfoDatabase = true;
+ tdlibParameters.systemLanguageCode = System.getProperty("user.language", "en");
+ return tdlibParameters;
+ }
+
protected abstract ResultingEvent onWaitToken();
protected ResultingEvent onWaitCode() {
diff --git a/src/main/java/it/tdlight/reactiveapi/ResultingEvent.java b/src/main/java/it/tdlight/reactiveapi/ResultingEvent.java
index 146b1e8..95af6d7 100644
--- a/src/main/java/it/tdlight/reactiveapi/ResultingEvent.java
+++ b/src/main/java/it/tdlight/reactiveapi/ResultingEvent.java
@@ -3,11 +3,17 @@ package it.tdlight.reactiveapi;
import it.tdlight.jni.TdApi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent;
+import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent;
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
-public sealed interface ResultingEvent permits ClientBoundResultingEvent, TDLibBoundResultingEvent {
+public sealed interface ResultingEvent permits ClientBoundResultingEvent, TDLibBoundResultingEvent,
+ ClusterBoundResultingEvent {
record ClientBoundResultingEvent(ClientBoundEvent event) implements ResultingEvent {}
record TDLibBoundResultingEvent(TdApi.Function action) implements ResultingEvent {}
+
+ sealed interface ClusterBoundResultingEvent extends ResultingEvent permits ResultingEventPublisherClosed {}
+
+ record ResultingEventPublisherClosed() implements ClusterBoundResultingEvent {}
}