From 824b362710d9eaa865ab9b7aaafcf4c9b3ac4f44 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 17 Oct 2020 13:44:59 +0200 Subject: [PATCH] Get chats on boot --- .../java/it/cavallium/TransferClient.java | 14 ++ .../it/cavallium/TransferServiceImpl.java | 167 +++++++++--------- src/main/java/it/cavallium/TransferUtils.java | 122 +++++++++++++ 3 files changed, 220 insertions(+), 83 deletions(-) diff --git a/src/main/java/it/cavallium/TransferClient.java b/src/main/java/it/cavallium/TransferClient.java index a4bd72d..a23b9ac 100644 --- a/src/main/java/it/cavallium/TransferClient.java +++ b/src/main/java/it/cavallium/TransferClient.java @@ -1,6 +1,8 @@ package it.cavallium; import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.AuthorizationState; +import it.tdlight.jni.TdApi.AuthorizationStateReady; import it.tdlight.jni.TdApi.Chat; import it.tdlight.jni.TdApi.ChatMemberStatusAdministrator; import it.tdlight.jni.TdApi.GetSupergroupFullInfo; @@ -8,6 +10,7 @@ import it.tdlight.jni.TdApi.Object; import it.tdlight.jni.TdApi.Supergroup; import it.tdlight.jni.TdApi.SupergroupFullInfo; import it.tdlight.jni.TdApi.Update; +import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.jni.TdApi.UpdateNewChat; import it.tdlight.jni.TdApi.UpdateSupergroup; import it.tdlight.jni.TdApi.UpdateSupergroupFullInfo; @@ -47,6 +50,17 @@ public class TransferClient { ((UpdateSupergroupFullInfo) update).supergroupFullInfo); case UpdateNewChat.CONSTRUCTOR: return this.onChat(((UpdateNewChat) update).chat); + case UpdateAuthorizationState.CONSTRUCTOR: + return this.onUpdateAuthorizationState(((UpdateAuthorizationState) update).authorizationState); + default: + return Mono.empty(); + } + } + + private Mono onUpdateAuthorizationState(AuthorizationState authorizationState) { + switch (authorizationState.getConstructor()) { + case AuthorizationStateReady.CONSTRUCTOR: + return TransferUtils.getAllHomeChats(this).flatMap(this::onChat).then(); default: return Mono.empty(); } diff --git a/src/main/java/it/cavallium/TransferServiceImpl.java b/src/main/java/it/cavallium/TransferServiceImpl.java index 5b4ef08..810d274 100644 --- a/src/main/java/it/cavallium/TransferServiceImpl.java +++ b/src/main/java/it/cavallium/TransferServiceImpl.java @@ -76,93 +76,94 @@ public class TransferServiceImpl implements TransferService { }) .subscribeOn(Schedulers.boundedElastic()) .flatMap(v -> v) - .map(middle -> new AsyncTdEasy(middle, alias)) - .flatMap(client -> { + .map(middle -> new AsyncTdEasy(middle, alias)).flatMap(client -> { return client - .create(TdEasySettings - .newBuilder() - .setUseMessageDatabase(false) - .setUseFileDatabase(false) - .setUseChatInfoDatabase(false) - .setApiId(apiId) - .setApiHash(apiHash) - .setEnableStorageOptimizer(false) - .setApplicationVersion(App.VERSION) - .setDatabaseDirectory("sessions" + File.separator + "userbot_" + phoneNumberLong) - .setIgnoreFileNames(true) - .setPhoneNumber(phoneNumberLong) - .setSystemLanguageCode("en") - .setDeviceModel(System.getProperty("os.name")) - .setSystemVersion(System.getProperty("os.version")) - .setParameterRequestHandler((parameter, parameterInfo) -> { - switch (parameter) { - case ASK_FIRST_NAME: - return Mono.just("FirstName"); - case ASK_LAST_NAME: - return Mono.just("LastName"); - case ASK_CODE: - return codeSupplier - .apply(client) - .map(i -> "" + i) - .switchIfEmpty(client - .send(new TdApi.Close()) - .materialize() - .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) - .dematerialize() - .then(Mono.empty())); - case ASK_PASSWORD: - return otpSupplier - .apply(client, ((ParameterInfoPasswordHint) parameterInfo).getHint()) - .switchIfEmpty(client - .send(new TdApi.Close()) - .materialize() - .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) - .dematerialize() - .then(Mono.empty())); - case NOTIFY_LINK: - default: - return Mono.empty(); - } - }) - .build()) - .then(Mono.defer(() -> { - var clientStateFlux = client.getState().skip(1).publish().autoConnect(2); + .execute(new TdApi.SetLogVerbosityLevel(0)) + .then(client + .create(TdEasySettings + .newBuilder() + .setUseMessageDatabase(false) + .setUseFileDatabase(false) + .setUseChatInfoDatabase(false) + .setApiId(apiId) + .setApiHash(apiHash) + .setEnableStorageOptimizer(false) + .setApplicationVersion(App.VERSION) + .setDatabaseDirectory("sessions" + File.separator + "userbot_" + phoneNumberLong) + .setIgnoreFileNames(true) + .setPhoneNumber(phoneNumberLong) + .setSystemLanguageCode("en") + .setDeviceModel(System.getProperty("os.name")) + .setSystemVersion(System.getProperty("os.version")) + .setParameterRequestHandler((parameter, parameterInfo) -> { + switch (parameter) { + case ASK_FIRST_NAME: + return Mono.just("FirstName"); + case ASK_LAST_NAME: + return Mono.just("LastName"); + case ASK_CODE: + return codeSupplier + .apply(client) + .map(i -> "" + i) + .switchIfEmpty(client + .send(new TdApi.Close()) + .materialize() + .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) + .dematerialize() + .then(Mono.empty())); + case ASK_PASSWORD: + return otpSupplier + .apply(client, ((ParameterInfoPasswordHint) parameterInfo).getHint()) + .switchIfEmpty(client + .send(new TdApi.Close()) + .materialize() + .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) + .dematerialize() + .then(Mono.empty())); + case NOTIFY_LINK: + default: + return Mono.empty(); + } + }) + .build()) + .then(Mono.defer(() -> { + var clientStateFlux = client.getState().skip(1).publish().autoConnect(2); - clientStateFlux - .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR - || state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR - || state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR) - .take(1) - .singleOrEmpty() - .then() - .materialize() - .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) - .dematerialize() - .subscribe(state -> System.out.println("State: " + state)); + clientStateFlux + .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR + || state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR + || state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR) + .take(1) + .singleOrEmpty() + .then() + .materialize() + .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) + .dematerialize() + .subscribe(state -> System.out.println("State: " + state)); - client.getIncomingUpdates() - .flatMap(this::onClientUpdate) - .subscribe(u -> System.out.println(u), e-> System.err.println(e)); - - return clientStateFlux - .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR - || state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR - || state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR - || state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR) - .take(1) - .singleOrEmpty() - .doOnNext(state -> System.out.println("aState: " + state)) - .handle((state, sink) -> { - if (state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR) { - sink.complete(); - } else { - sink.error(new Exception(state.getClass().getSimpleName())); - } - }) - .then(); - })) - .doOnSuccess((_v) -> clients.put(phoneNumberLong, new TransferClient(client))); + client + .getIncomingUpdates() + .flatMap(this::onClientUpdate) + .subscribe(u -> System.out.println(u), e -> System.err.println(e)); + return clientStateFlux + .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR + || state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR + || state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR + || state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR) + .take(1) + .singleOrEmpty() + .doOnNext(state -> System.out.println("aState: " + state)) + .handle((state, sink) -> { + if (state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR) { + sink.complete(); + } else { + sink.error(new Exception(state.getClass().getSimpleName())); + } + }) + .then(); + })) + .doOnSuccess((_v) -> clients.put(phoneNumberLong, new TransferClient(client)))); }) .map(_v -> AddUserBotResult.newSuccess()); } diff --git a/src/main/java/it/cavallium/TransferUtils.java b/src/main/java/it/cavallium/TransferUtils.java index 892c4d0..44eda10 100644 --- a/src/main/java/it/cavallium/TransferUtils.java +++ b/src/main/java/it/cavallium/TransferUtils.java @@ -1,7 +1,28 @@ package it.cavallium; +import it.tdlight.jni.TdApi.Chat; +import it.tdlight.jni.TdApi.ChatListMain; +import it.tdlight.jni.TdApi.ChatPosition; +import it.tdlight.jni.TdApi.Chats; +import it.tdlight.jni.TdApi.GetChat; +import it.tdlight.jni.TdApi.GetChats; +import it.tdlight.utils.MonoUtils; +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + public class TransferUtils { + private static final Logger logger = LoggerFactory.getLogger(TransferUtils.class); + public static int chatIdToChatEntityId(long id) { if (id <= -1000000000000L) { return (int) (Math.abs(id) - 1000000000000L); @@ -25,4 +46,105 @@ public class TransferUtils { throw new UnsupportedOperationException("Unsupported chat id type: " + chatEntityId); } } + + + private static class ChatIdAndOrderOffsets { + private final long chatIdOffset; + private final long orderOffset; + + private ChatIdAndOrderOffsets(long chatIdOffset, long orderOffset) { + this.chatIdOffset = chatIdOffset; + this.orderOffset = orderOffset; + } + + public long getChatIdOffset() { + return chatIdOffset; + } + + public long getOrderOffset() { + return orderOffset; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ChatIdAndOrderOffsets that = (ChatIdAndOrderOffsets) o; + + if (chatIdOffset != that.chatIdOffset) { + return false; + } + return orderOffset == that.orderOffset; + } + + @Override + public int hashCode() { + int result = (int) (chatIdOffset ^ (chatIdOffset >>> 32)); + result = 31 * result + (int) (orderOffset ^ (orderOffset >>> 32)); + return result; + } + } + + public static Mono> getAllHomeChatsSet(TransferClient client) { + return getAllHomeChats(client).collect(Collectors.toSet()); + } + + /** + * @return flux of home chats. They can repeat themselves + */ + public static Flux getAllHomeChats(TransferClient client) { + logger.debug("Getting the full chat list"); + + var singleScheduler = Schedulers.newSingle("getallchats"); + + return Mono + .deferWithContext((context) -> { + var offsets = Objects.requireNonNull(context.>get("offsets")); + var offsetsValue = offsets.get(); + return client.send(new GetChats(new ChatListMain(), + offsetsValue.getOrderOffset(), + offsetsValue.getChatIdOffset(), + 100)) + .flatMap(MonoUtils::orElseThrow) + .publishOn(singleScheduler) + .flatMapMany(chats -> Flux.fromStream(Arrays.stream(chats.chatIds).boxed())) + .flatMap(chatId -> { + return client.send(new GetChat(chatId)) + .publishOn(singleScheduler) + .flatMap(MonoUtils::orElseThrow); + }) + .collectList() + .doOnNext(chats -> { + if (!chats.isEmpty()) { + var lastChat = chats.get(chats.size() - 1); + getMainChatListPosition(lastChat.positions).ifPresentOrElse(lastChatPosition -> { + offsets.set(new ChatIdAndOrderOffsets(lastChat.id, lastChatPosition.order)); + }, () -> { + offsets.set(new ChatIdAndOrderOffsets(lastChat.id, 0L)); + }); + } else { + offsets.set(new ChatIdAndOrderOffsets(9223372036854775807L, 0L)); + } + }) + .filter(chats1 -> !chats1.isEmpty()) + .subscriberContext(context); + }) + .repeatWhen(nFlux -> nFlux.takeWhile(n -> n > 0)) + .flatMap(Flux::fromIterable) + .subscriberContext(ctx -> ctx.put("offsets", new AtomicReference<>(new ChatIdAndOrderOffsets(0L, 9223372036854775807L)))); + } + + private static Optional getMainChatListPosition(ChatPosition[] positions) { + for (ChatPosition position : positions) { + if (position.list.getConstructor() == ChatListMain.CONSTRUCTOR) { + return Optional.of(position); + } + } + return Optional.empty(); + } }