diff --git a/log4j2.xml b/log4j2.xml
new file mode 100644
index 0000000..c50acc8
--- /dev/null
+++ b/log4j2.xml
@@ -0,0 +1,38 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/it/cavallium/LogServiceImpl.java b/src/main/java/it/cavallium/LogServiceImpl.java
index 1624714..15f9845 100644
--- a/src/main/java/it/cavallium/LogServiceImpl.java
+++ b/src/main/java/it/cavallium/LogServiceImpl.java
@@ -21,7 +21,7 @@ public class LogServiceImpl implements LogService {
@Override
public void append(Level level, String message) {
- logs.onNext(new LogEntry(System.currentTimeMillis(), level, message));
+ logs.onNext(new LogEntry(System.currentTimeMillis(), level, message.length() > 512 ? message.substring(0, 512) : message));
}
@Override
diff --git a/src/main/java/it/cavallium/TransferClient.java b/src/main/java/it/cavallium/TransferClient.java
index a23b9ac..348770a 100644
--- a/src/main/java/it/cavallium/TransferClient.java
+++ b/src/main/java/it/cavallium/TransferClient.java
@@ -35,7 +35,7 @@ public class TransferClient {
this.client = client;
this.client.getIncomingUpdates()
- .subscribeOn(Schedulers.boundedElastic())
+ .subscribeOn(Schedulers.newParallel("bot_updates"))
.flatMap(this::onUpdate)
.subscribe();
}
diff --git a/src/main/java/it/cavallium/TransferServiceImpl.java b/src/main/java/it/cavallium/TransferServiceImpl.java
index 4b49445..02fcee6 100644
--- a/src/main/java/it/cavallium/TransferServiceImpl.java
+++ b/src/main/java/it/cavallium/TransferServiceImpl.java
@@ -128,7 +128,7 @@ public class TransferServiceImpl implements TransferService {
})
.build())
.then(Mono.defer(() -> {
- var clientStateFlux = client.getState().publish().autoConnect(2);
+ var clientStateFlux = client.getState().skip(1).publish().autoConnect(2);
clientStateFlux
.filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR
@@ -140,12 +140,14 @@ public class TransferServiceImpl implements TransferService {
.materialize()
.flatMap(signal -> onUserbotClosed.get().thenReturn(signal))
.dematerialize()
- .subscribe(state -> System.out.println("State: " + state));
+ .subscribe(state -> System.out.println("Userbot closed with state: " + state));
+
+ clientStateFlux.subscribe(state -> System.out.println("state: " + state));
client
.getIncomingUpdates()
- .flatMap(this::onClientUpdate)
- .subscribe(u -> System.out.println(u), e -> System.err.println(e));
+ .flatMap(update -> onClientUpdate(update))
+ .subscribe(_v -> {}, e -> System.err.println(e));
return clientStateFlux
.filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR
diff --git a/src/main/java/it/cavallium/TransferUtils.java b/src/main/java/it/cavallium/TransferUtils.java
index 44eda10..486c809 100644
--- a/src/main/java/it/cavallium/TransferUtils.java
+++ b/src/main/java/it/cavallium/TransferUtils.java
@@ -15,14 +15,13 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class TransferUtils {
- private static final Logger logger = LoggerFactory.getLogger(TransferUtils.class);
-
public static int chatIdToChatEntityId(long id) {
if (id <= -1000000000000L) {
return (int) (Math.abs(id) - 1000000000000L);
@@ -49,6 +48,7 @@ public class TransferUtils {
private static class ChatIdAndOrderOffsets {
+
private final long chatIdOffset;
private final long orderOffset;
@@ -98,7 +98,7 @@ public class TransferUtils {
* @return flux of home chats. They can repeat themselves
*/
public static Flux getAllHomeChats(TransferClient client) {
- logger.debug("Getting the full chat list");
+ App.getLogService().append(Level.DEBUG, "Getting the full chat list");
var singleScheduler = Schedulers.newSingle("getallchats");
@@ -106,20 +106,27 @@ public class TransferUtils {
.deferWithContext((context) -> {
var offsets = Objects.requireNonNull(context.>get("offsets"));
var offsetsValue = offsets.get();
- return client.send(new GetChats(new ChatListMain(),
- offsetsValue.getOrderOffset(),
- offsetsValue.getChatIdOffset(),
- 100))
+ App.getLogService().append(Level.TRACE, "Requesting GetChats");
+ return client.send(new GetChats(new ChatListMain(),
+ offsetsValue.getOrderOffset(),
+ offsetsValue.getChatIdOffset(),
+ 100
+ ))
.flatMap(MonoUtils::orElseThrow)
.publishOn(singleScheduler)
.flatMapMany(chats -> Flux.fromStream(Arrays.stream(chats.chatIds).boxed()))
.flatMap(chatId -> {
+ App.getLogService().append(Level.TRACE, "Received ChatId: " + chatId);
return client.send(new GetChat(chatId))
.publishOn(singleScheduler)
.flatMap(MonoUtils::orElseThrow);
})
+ .doOnNext(chat -> {
+ App.getLogService().append(Level.TRACE, "Received Chat: " + chat.toString().replace('\n', ' ').replace(" ", "").replace("\t", ""));
+ })
.collectList()
.doOnNext(chats -> {
+ App.getLogService().append(Level.TRACE, "Received Chats: " + chats.toString().replace('\n', ' ').replace(" ", "").replace("\t", ""));
if (!chats.isEmpty()) {
var lastChat = chats.get(chats.size() - 1);
getMainChatListPosition(lastChat.positions).ifPresentOrElse(lastChatPosition -> {
@@ -136,7 +143,10 @@ public class TransferUtils {
})
.repeatWhen(nFlux -> nFlux.takeWhile(n -> n > 0))
.flatMap(Flux::fromIterable)
- .subscriberContext(ctx -> ctx.put("offsets", new AtomicReference<>(new ChatIdAndOrderOffsets(0L, 9223372036854775807L))));
+ .subscriberContext(ctx -> ctx.put("offsets",
+ new AtomicReference<>(new ChatIdAndOrderOffsets(0L, 9223372036854775807L))
+ ))
+ .doOnTerminate(() -> App.getLogService().append(Level.DEBUG, "Home chats retrieved"));
}
private static Optional getMainChatListPosition(ChatPosition[] positions) {
diff --git a/tdlib-session-container b/tdlib-session-container
index bc40e2b..a7223c4 160000
--- a/tdlib-session-container
+++ b/tdlib-session-container
@@ -1 +1 @@
-Subproject commit bc40e2b9e60857e3b84850d11ba399a2afc51073
+Subproject commit a7223c4d834c91829b2eace051c0ab5526720aed