From 351979fe0b5c2b0d4b778a418248bc612a4e4dc6 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 19 Oct 2020 16:00:16 +0200 Subject: [PATCH] Update log4j2.xml, LogServiceImpl.java, and 4 more files... --- log4j2.xml | 38 +++++++++++++++++++ .../java/it/cavallium/LogServiceImpl.java | 2 +- .../java/it/cavallium/TransferClient.java | 2 +- .../it/cavallium/TransferServiceImpl.java | 10 +++-- src/main/java/it/cavallium/TransferUtils.java | 26 +++++++++---- tdlib-session-container | 2 +- 6 files changed, 65 insertions(+), 15 deletions(-) create mode 100644 log4j2.xml diff --git a/log4j2.xml b/log4j2.xml new file mode 100644 index 0000000..c50acc8 --- /dev/null +++ b/log4j2.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/it/cavallium/LogServiceImpl.java b/src/main/java/it/cavallium/LogServiceImpl.java index 1624714..15f9845 100644 --- a/src/main/java/it/cavallium/LogServiceImpl.java +++ b/src/main/java/it/cavallium/LogServiceImpl.java @@ -21,7 +21,7 @@ public class LogServiceImpl implements LogService { @Override public void append(Level level, String message) { - logs.onNext(new LogEntry(System.currentTimeMillis(), level, message)); + logs.onNext(new LogEntry(System.currentTimeMillis(), level, message.length() > 512 ? message.substring(0, 512) : message)); } @Override diff --git a/src/main/java/it/cavallium/TransferClient.java b/src/main/java/it/cavallium/TransferClient.java index a23b9ac..348770a 100644 --- a/src/main/java/it/cavallium/TransferClient.java +++ b/src/main/java/it/cavallium/TransferClient.java @@ -35,7 +35,7 @@ public class TransferClient { this.client = client; this.client.getIncomingUpdates() - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.newParallel("bot_updates")) .flatMap(this::onUpdate) .subscribe(); } diff --git a/src/main/java/it/cavallium/TransferServiceImpl.java b/src/main/java/it/cavallium/TransferServiceImpl.java index 4b49445..02fcee6 100644 --- a/src/main/java/it/cavallium/TransferServiceImpl.java +++ b/src/main/java/it/cavallium/TransferServiceImpl.java @@ -128,7 +128,7 @@ public class TransferServiceImpl implements TransferService { }) .build()) .then(Mono.defer(() -> { - var clientStateFlux = client.getState().publish().autoConnect(2); + var clientStateFlux = client.getState().skip(1).publish().autoConnect(2); clientStateFlux .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR @@ -140,12 +140,14 @@ public class TransferServiceImpl implements TransferService { .materialize() .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) .dematerialize() - .subscribe(state -> System.out.println("State: " + state)); + .subscribe(state -> System.out.println("Userbot closed with state: " + state)); + + clientStateFlux.subscribe(state -> System.out.println("state: " + state)); client .getIncomingUpdates() - .flatMap(this::onClientUpdate) - .subscribe(u -> System.out.println(u), e -> System.err.println(e)); + .flatMap(update -> onClientUpdate(update)) + .subscribe(_v -> {}, e -> System.err.println(e)); return clientStateFlux .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR diff --git a/src/main/java/it/cavallium/TransferUtils.java b/src/main/java/it/cavallium/TransferUtils.java index 44eda10..486c809 100644 --- a/src/main/java/it/cavallium/TransferUtils.java +++ b/src/main/java/it/cavallium/TransferUtils.java @@ -15,14 +15,13 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class TransferUtils { - private static final Logger logger = LoggerFactory.getLogger(TransferUtils.class); - public static int chatIdToChatEntityId(long id) { if (id <= -1000000000000L) { return (int) (Math.abs(id) - 1000000000000L); @@ -49,6 +48,7 @@ public class TransferUtils { private static class ChatIdAndOrderOffsets { + private final long chatIdOffset; private final long orderOffset; @@ -98,7 +98,7 @@ public class TransferUtils { * @return flux of home chats. They can repeat themselves */ public static Flux getAllHomeChats(TransferClient client) { - logger.debug("Getting the full chat list"); + App.getLogService().append(Level.DEBUG, "Getting the full chat list"); var singleScheduler = Schedulers.newSingle("getallchats"); @@ -106,20 +106,27 @@ public class TransferUtils { .deferWithContext((context) -> { var offsets = Objects.requireNonNull(context.>get("offsets")); var offsetsValue = offsets.get(); - return client.send(new GetChats(new ChatListMain(), - offsetsValue.getOrderOffset(), - offsetsValue.getChatIdOffset(), - 100)) + App.getLogService().append(Level.TRACE, "Requesting GetChats"); + return client.send(new GetChats(new ChatListMain(), + offsetsValue.getOrderOffset(), + offsetsValue.getChatIdOffset(), + 100 + )) .flatMap(MonoUtils::orElseThrow) .publishOn(singleScheduler) .flatMapMany(chats -> Flux.fromStream(Arrays.stream(chats.chatIds).boxed())) .flatMap(chatId -> { + App.getLogService().append(Level.TRACE, "Received ChatId: " + chatId); return client.send(new GetChat(chatId)) .publishOn(singleScheduler) .flatMap(MonoUtils::orElseThrow); }) + .doOnNext(chat -> { + App.getLogService().append(Level.TRACE, "Received Chat: " + chat.toString().replace('\n', ' ').replace(" ", "").replace("\t", "")); + }) .collectList() .doOnNext(chats -> { + App.getLogService().append(Level.TRACE, "Received Chats: " + chats.toString().replace('\n', ' ').replace(" ", "").replace("\t", "")); if (!chats.isEmpty()) { var lastChat = chats.get(chats.size() - 1); getMainChatListPosition(lastChat.positions).ifPresentOrElse(lastChatPosition -> { @@ -136,7 +143,10 @@ public class TransferUtils { }) .repeatWhen(nFlux -> nFlux.takeWhile(n -> n > 0)) .flatMap(Flux::fromIterable) - .subscriberContext(ctx -> ctx.put("offsets", new AtomicReference<>(new ChatIdAndOrderOffsets(0L, 9223372036854775807L)))); + .subscriberContext(ctx -> ctx.put("offsets", + new AtomicReference<>(new ChatIdAndOrderOffsets(0L, 9223372036854775807L)) + )) + .doOnTerminate(() -> App.getLogService().append(Level.DEBUG, "Home chats retrieved")); } private static Optional getMainChatListPosition(ChatPosition[] positions) { diff --git a/tdlib-session-container b/tdlib-session-container index bc40e2b..a7223c4 160000 --- a/tdlib-session-container +++ b/tdlib-session-container @@ -1 +1 @@ -Subproject commit bc40e2b9e60857e3b84850d11ba399a2afc51073 +Subproject commit a7223c4d834c91829b2eace051c0ab5526720aed