Update log4j, tdlib, lucene

This commit is contained in:
Andrea Cavalli 2021-12-11 13:21:09 +01:00
parent 430dbeb261
commit e76a596b85
4 changed files with 79 additions and 31 deletions

View File

@ -78,12 +78,12 @@
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId> <artifactId>log4j-core</artifactId>
<version>2.14.1</version> <version>2.15.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId> <artifactId>log4j-slf4j-impl</artifactId>
<version>2.14.1</version> <version>2.15.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId> <groupId>com.fasterxml.jackson.dataformat</groupId>
@ -98,7 +98,7 @@
<dependency> <dependency>
<groupId>it.tdlight</groupId> <groupId>it.tdlight</groupId>
<artifactId>tdlight-java-bom</artifactId> <artifactId>tdlight-java-bom</artifactId>
<version>2.7.10.6</version> <version>2.7.10.7</version>
<type>pom</type> <type>pom</type>
<scope>import</scope> <scope>import</scope>
</dependency> </dependency>

View File

@ -21,6 +21,7 @@ import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
@ -239,11 +240,12 @@ public class ReactiveApi {
// Register the session instance to the distributed nodes map // Register the session instance to the distributed nodes map
return Mono return Mono
.fromCompletionStage(() -> userIdToNodeId.put(userId, nodeId)) .fromCompletionStage(() -> userIdToNodeId.put(userId, nodeId).thenApply(Optional::ofNullable))
.flatMap(prevDistributed -> { .flatMap(prevDistributed -> {
if (prevDistributed != null && prevDistributed.value() != null && if (prevDistributed.isPresent() && prevDistributed.get().value() != null &&
!Objects.equals(this.nodeId, prevDistributed.value())) { !Objects.equals(this.nodeId, prevDistributed.get().value())) {
LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId, prevDistributed.value()); LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId,
prevDistributed.get().value());
} }
var saveToDiskMono = Mono var saveToDiskMono = Mono
@ -264,7 +266,8 @@ public class ReactiveApi {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
synchronized (diskSessions) { synchronized (diskSessions) {
return Paths.get(diskSessions.getSettings().path); return Objects.requireNonNull(Paths.get(diskSessions.getSettings().path),
"Session " + userId + " path is missing");
} }
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
@ -285,7 +288,9 @@ public class ReactiveApi {
return Mono.just(sessionPath); return Mono.just(sessionPath);
} }
}) })
.doOnNext(reactiveApiPublisher::start) .doOnNext(path -> reactiveApiPublisher.start(path,
() -> ReactiveApi.this.onPublisherClosed(userId)
))
.thenReturn(new CreateSessionResponse(liveId)); .thenReturn(new CreateSessionResponse(liveId));
}); });
}); });
@ -303,6 +308,16 @@ public class ReactiveApi {
.doOnError(ex -> LOG.debug("Handled session request {}, the response is: error", req, ex)); .doOnError(ex -> LOG.debug("Handled session request {}, the response is: error", req, ex));
} }
private void onPublisherClosed(long userId) {
this.destroySession(userId, nodeId).whenComplete((result, ex) -> {
if (ex != null) {
LOG.error("Failed to close the session for user {} after it was closed itself", userId);
} else {
LOG.debug("Closed the session for user {} after it was closed itself", userId);
}
});
}
public Mono<Long> nextFreeLiveId() { public Mono<Long> nextFreeLiveId() {
return Mono.fromCompletionStage(nextSessionLiveId::nextId); return Mono.fromCompletionStage(nextSessionLiveId::nextId);
} }

View File

@ -26,6 +26,8 @@ import it.tdlight.reactiveapi.Event.OnUpdateError;
import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested; import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested;
import it.tdlight.reactiveapi.Event.Request; import it.tdlight.reactiveapi.Event.Request;
import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent; import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent;
import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent;
import it.tdlight.reactiveapi.ResultingEvent.ResultingEventPublisherClosed;
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
import it.tdlight.tdlight.ClientManager; import it.tdlight.tdlight.ClientManager;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -90,13 +92,16 @@ public abstract class ReactiveApiPublisher {
return new ReactiveApiPublisherPhoneNumber(atomix, liveId, userId, phoneNumber); return new ReactiveApiPublisherPhoneNumber(atomix, liveId, userId, phoneNumber);
} }
public void start(Path path) { public void start(Path path, @Nullable Runnable onClose) {
this.path.set(path); this.path.set(path);
LOG.info("Starting session \"{}\" in path \"{}\"", this, path); LOG.info("Starting session \"{}\" in path \"{}\"", this, path);
var publishedResultingEvents = telegramClient var publishedResultingEvents = telegramClient
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
// Handle signals, then return a ResultingEvent // Handle signals, then return a ResultingEvent
.mapNotNull(this::onSignal) .mapNotNull(this::onSignal)
.doFinally(s -> {
LOG.trace("Finalized telegram client events");
})
.publish(); .publish();
publishedResultingEvents publishedResultingEvents
@ -143,6 +148,23 @@ public abstract class ReactiveApiPublisher {
.subscribe(clientBoundEvent -> eventService.broadcast("session-" + liveId + "-client-bound-events", .subscribe(clientBoundEvent -> eventService.broadcast("session-" + liveId + "-client-bound-events",
clientBoundEvent, ReactiveApiPublisher::serializeEvent)); clientBoundEvent, ReactiveApiPublisher::serializeEvent));
publishedResultingEvents
// Obtain only cluster-bound events
.filter(s -> s instanceof ClusterBoundResultingEvent)
.cast(ClusterBoundResultingEvent.class)
// Send events to the cluster
.subscribeOn(Schedulers.parallel())
.subscribe(clusterBoundEvent -> {
if (clusterBoundEvent instanceof ResultingEventPublisherClosed) {
if (onClose != null) {
onClose.run();
}
} else {
LOG.error("Unknown cluster-bound event: {}", clusterBoundEvent);
}
});
var prev = this.disposable.getAndSet(publishedResultingEvents.connect()); var prev = this.disposable.getAndSet(publishedResultingEvents.connect());
if (prev != null) { if (prev != null) {
@ -175,7 +197,7 @@ public abstract class ReactiveApiPublisher {
if (signal.isClosed()) { if (signal.isClosed()) {
signal.getClosed(); signal.getClosed();
LOG.info("Received a closed signal"); LOG.info("Received a closed signal");
return null; return new ResultingEventPublisherClosed();
} }
if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) { if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) {
var error = ((TdApi.Error) signal.getUpdate()); var error = ((TdApi.Error) signal.getUpdate());
@ -195,25 +217,8 @@ public abstract class ReactiveApiPublisher {
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
switch (updateAuthorizationState.authorizationState.getConstructor()) { switch (updateAuthorizationState.authorizationState.getConstructor()) {
case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> { case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> {
var tdlibParameters = new TdlibParameters(); TdlibParameters parameters = generateTDLibParameters();
var path = requireNonNull(this.path.get(), "Path must not be null"); return new TDLibBoundResultingEvent<>(new SetTdlibParameters(parameters));
tdlibParameters.databaseDirectory = path.resolve("database").toString();
tdlibParameters.apiId = 94575;
tdlibParameters.apiHash = "a3406de8d171bb422bb6ddf3bbd800e2";
tdlibParameters.filesDirectory = path.resolve("files").toString();
tdlibParameters.applicationVersion = it.tdlight.reactiveapi.generated.LibraryVersion.VERSION;
tdlibParameters.deviceModel = System.getProperty("os.name");
tdlibParameters.systemVersion = System.getProperty("os.version");
tdlibParameters.enableStorageOptimizer = true;
tdlibParameters.ignoreFileNames = true;
tdlibParameters.useTestDc = false;
tdlibParameters.useSecretChats = false;
tdlibParameters.useMessageDatabase = true;
tdlibParameters.useFileDatabase = true;
tdlibParameters.useChatInfoDatabase = true;
tdlibParameters.systemLanguageCode = System.getProperty("user.language", "en");
return new TDLibBoundResultingEvent<>(new SetTdlibParameters(tdlibParameters));
} }
} }
} }
@ -256,6 +261,28 @@ public abstract class ReactiveApiPublisher {
return null; return null;
} }
private TdlibParameters generateTDLibParameters() {
var tdlibParameters = new TdlibParameters();
var path = requireNonNull(this.path.get(), "Path must not be null");
tdlibParameters.databaseDirectory = path.resolve("database").toString();
tdlibParameters.apiId = 94575;
tdlibParameters.apiHash = "a3406de8d171bb422bb6ddf3bbd800e2";
tdlibParameters.filesDirectory = path.resolve("files").toString();
tdlibParameters.applicationVersion = it.tdlight.reactiveapi.generated.LibraryVersion.VERSION;
tdlibParameters.deviceModel = System.getProperty("os.name");
tdlibParameters.systemVersion = System.getProperty("os.version");
tdlibParameters.enableStorageOptimizer = true;
tdlibParameters.ignoreFileNames = true;
tdlibParameters.useTestDc = false;
tdlibParameters.useSecretChats = false;
tdlibParameters.useMessageDatabase = true;
tdlibParameters.useFileDatabase = true;
tdlibParameters.useChatInfoDatabase = true;
tdlibParameters.systemLanguageCode = System.getProperty("user.language", "en");
return tdlibParameters;
}
protected abstract ResultingEvent onWaitToken(); protected abstract ResultingEvent onWaitToken();
protected ResultingEvent onWaitCode() { protected ResultingEvent onWaitCode() {

View File

@ -3,11 +3,17 @@ package it.tdlight.reactiveapi;
import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent; import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent;
import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent;
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
public sealed interface ResultingEvent permits ClientBoundResultingEvent, TDLibBoundResultingEvent { public sealed interface ResultingEvent permits ClientBoundResultingEvent, TDLibBoundResultingEvent,
ClusterBoundResultingEvent {
record ClientBoundResultingEvent(ClientBoundEvent event) implements ResultingEvent {} record ClientBoundResultingEvent(ClientBoundEvent event) implements ResultingEvent {}
record TDLibBoundResultingEvent<T extends TdApi.Object>(TdApi.Function<T> action) implements ResultingEvent {} record TDLibBoundResultingEvent<T extends TdApi.Object>(TdApi.Function<T> action) implements ResultingEvent {}
sealed interface ClusterBoundResultingEvent extends ResultingEvent permits ResultingEventPublisherClosed {}
record ResultingEventPublisherClosed() implements ClusterBoundResultingEvent {}
} }