Get chats on boot

This commit is contained in:
Andrea Cavalli 2020-10-17 13:44:59 +02:00
parent d152f1a5a1
commit 824b362710
3 changed files with 220 additions and 83 deletions

View File

@ -1,6 +1,8 @@
package it.cavallium; package it.cavallium;
import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationState;
import it.tdlight.jni.TdApi.AuthorizationStateReady;
import it.tdlight.jni.TdApi.Chat; import it.tdlight.jni.TdApi.Chat;
import it.tdlight.jni.TdApi.ChatMemberStatusAdministrator; import it.tdlight.jni.TdApi.ChatMemberStatusAdministrator;
import it.tdlight.jni.TdApi.GetSupergroupFullInfo; import it.tdlight.jni.TdApi.GetSupergroupFullInfo;
@ -8,6 +10,7 @@ import it.tdlight.jni.TdApi.Object;
import it.tdlight.jni.TdApi.Supergroup; import it.tdlight.jni.TdApi.Supergroup;
import it.tdlight.jni.TdApi.SupergroupFullInfo; import it.tdlight.jni.TdApi.SupergroupFullInfo;
import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.Update;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.jni.TdApi.UpdateNewChat; import it.tdlight.jni.TdApi.UpdateNewChat;
import it.tdlight.jni.TdApi.UpdateSupergroup; import it.tdlight.jni.TdApi.UpdateSupergroup;
import it.tdlight.jni.TdApi.UpdateSupergroupFullInfo; import it.tdlight.jni.TdApi.UpdateSupergroupFullInfo;
@ -47,6 +50,17 @@ public class TransferClient {
((UpdateSupergroupFullInfo) update).supergroupFullInfo); ((UpdateSupergroupFullInfo) update).supergroupFullInfo);
case UpdateNewChat.CONSTRUCTOR: case UpdateNewChat.CONSTRUCTOR:
return this.onChat(((UpdateNewChat) update).chat); return this.onChat(((UpdateNewChat) update).chat);
case UpdateAuthorizationState.CONSTRUCTOR:
return this.onUpdateAuthorizationState(((UpdateAuthorizationState) update).authorizationState);
default:
return Mono.empty();
}
}
private Mono<Void> onUpdateAuthorizationState(AuthorizationState authorizationState) {
switch (authorizationState.getConstructor()) {
case AuthorizationStateReady.CONSTRUCTOR:
return TransferUtils.getAllHomeChats(this).flatMap(this::onChat).then();
default: default:
return Mono.empty(); return Mono.empty();
} }

View File

@ -76,93 +76,94 @@ public class TransferServiceImpl implements TransferService {
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.flatMap(v -> v) .flatMap(v -> v)
.map(middle -> new AsyncTdEasy(middle, alias)) .map(middle -> new AsyncTdEasy(middle, alias)).flatMap(client -> {
.flatMap(client -> {
return client return client
.create(TdEasySettings .execute(new TdApi.SetLogVerbosityLevel(0))
.newBuilder() .then(client
.setUseMessageDatabase(false) .create(TdEasySettings
.setUseFileDatabase(false) .newBuilder()
.setUseChatInfoDatabase(false) .setUseMessageDatabase(false)
.setApiId(apiId) .setUseFileDatabase(false)
.setApiHash(apiHash) .setUseChatInfoDatabase(false)
.setEnableStorageOptimizer(false) .setApiId(apiId)
.setApplicationVersion(App.VERSION) .setApiHash(apiHash)
.setDatabaseDirectory("sessions" + File.separator + "userbot_" + phoneNumberLong) .setEnableStorageOptimizer(false)
.setIgnoreFileNames(true) .setApplicationVersion(App.VERSION)
.setPhoneNumber(phoneNumberLong) .setDatabaseDirectory("sessions" + File.separator + "userbot_" + phoneNumberLong)
.setSystemLanguageCode("en") .setIgnoreFileNames(true)
.setDeviceModel(System.getProperty("os.name")) .setPhoneNumber(phoneNumberLong)
.setSystemVersion(System.getProperty("os.version")) .setSystemLanguageCode("en")
.setParameterRequestHandler((parameter, parameterInfo) -> { .setDeviceModel(System.getProperty("os.name"))
switch (parameter) { .setSystemVersion(System.getProperty("os.version"))
case ASK_FIRST_NAME: .setParameterRequestHandler((parameter, parameterInfo) -> {
return Mono.just("FirstName"); switch (parameter) {
case ASK_LAST_NAME: case ASK_FIRST_NAME:
return Mono.just("LastName"); return Mono.just("FirstName");
case ASK_CODE: case ASK_LAST_NAME:
return codeSupplier return Mono.just("LastName");
.apply(client) case ASK_CODE:
.map(i -> "" + i) return codeSupplier
.switchIfEmpty(client .apply(client)
.send(new TdApi.Close()) .map(i -> "" + i)
.materialize() .switchIfEmpty(client
.flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) .send(new TdApi.Close())
.dematerialize() .materialize()
.then(Mono.empty())); .flatMap(signal -> onUserbotClosed.get().thenReturn(signal))
case ASK_PASSWORD: .dematerialize()
return otpSupplier .then(Mono.empty()));
.apply(client, ((ParameterInfoPasswordHint) parameterInfo).getHint()) case ASK_PASSWORD:
.switchIfEmpty(client return otpSupplier
.send(new TdApi.Close()) .apply(client, ((ParameterInfoPasswordHint) parameterInfo).getHint())
.materialize() .switchIfEmpty(client
.flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) .send(new TdApi.Close())
.dematerialize() .materialize()
.then(Mono.empty())); .flatMap(signal -> onUserbotClosed.get().thenReturn(signal))
case NOTIFY_LINK: .dematerialize()
default: .then(Mono.empty()));
return Mono.empty(); case NOTIFY_LINK:
} default:
}) return Mono.empty();
.build()) }
.then(Mono.defer(() -> { })
var clientStateFlux = client.getState().skip(1).publish().autoConnect(2); .build())
.then(Mono.defer(() -> {
var clientStateFlux = client.getState().skip(1).publish().autoConnect(2);
clientStateFlux clientStateFlux
.filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR || state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR) || state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR)
.take(1) .take(1)
.singleOrEmpty() .singleOrEmpty()
.then() .then()
.materialize() .materialize()
.flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) .flatMap(signal -> onUserbotClosed.get().thenReturn(signal))
.dematerialize() .dematerialize()
.subscribe(state -> System.out.println("State: " + state)); .subscribe(state -> System.out.println("State: " + state));
client.getIncomingUpdates() client
.flatMap(this::onClientUpdate) .getIncomingUpdates()
.subscribe(u -> System.out.println(u), e-> System.err.println(e)); .flatMap(this::onClientUpdate)
.subscribe(u -> System.out.println(u), e -> System.err.println(e));
return clientStateFlux
.filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR)
.take(1)
.singleOrEmpty()
.doOnNext(state -> System.out.println("aState: " + state))
.handle((state, sink) -> {
if (state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR) {
sink.complete();
} else {
sink.error(new Exception(state.getClass().getSimpleName()));
}
})
.then();
}))
.doOnSuccess((_v) -> clients.put(phoneNumberLong, new TransferClient(client)));
return clientStateFlux
.filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR)
.take(1)
.singleOrEmpty()
.doOnNext(state -> System.out.println("aState: " + state))
.handle((state, sink) -> {
if (state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR) {
sink.complete();
} else {
sink.error(new Exception(state.getClass().getSimpleName()));
}
})
.then();
}))
.doOnSuccess((_v) -> clients.put(phoneNumberLong, new TransferClient(client))));
}) })
.map(_v -> AddUserBotResult.newSuccess()); .map(_v -> AddUserBotResult.newSuccess());
} }

View File

@ -1,7 +1,28 @@
package it.cavallium; package it.cavallium;
import it.tdlight.jni.TdApi.Chat;
import it.tdlight.jni.TdApi.ChatListMain;
import it.tdlight.jni.TdApi.ChatPosition;
import it.tdlight.jni.TdApi.Chats;
import it.tdlight.jni.TdApi.GetChat;
import it.tdlight.jni.TdApi.GetChats;
import it.tdlight.utils.MonoUtils;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class TransferUtils { public class TransferUtils {
private static final Logger logger = LoggerFactory.getLogger(TransferUtils.class);
public static int chatIdToChatEntityId(long id) { public static int chatIdToChatEntityId(long id) {
if (id <= -1000000000000L) { if (id <= -1000000000000L) {
return (int) (Math.abs(id) - 1000000000000L); return (int) (Math.abs(id) - 1000000000000L);
@ -25,4 +46,105 @@ public class TransferUtils {
throw new UnsupportedOperationException("Unsupported chat id type: " + chatEntityId); throw new UnsupportedOperationException("Unsupported chat id type: " + chatEntityId);
} }
} }
private static class ChatIdAndOrderOffsets {
private final long chatIdOffset;
private final long orderOffset;
private ChatIdAndOrderOffsets(long chatIdOffset, long orderOffset) {
this.chatIdOffset = chatIdOffset;
this.orderOffset = orderOffset;
}
public long getChatIdOffset() {
return chatIdOffset;
}
public long getOrderOffset() {
return orderOffset;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ChatIdAndOrderOffsets that = (ChatIdAndOrderOffsets) o;
if (chatIdOffset != that.chatIdOffset) {
return false;
}
return orderOffset == that.orderOffset;
}
@Override
public int hashCode() {
int result = (int) (chatIdOffset ^ (chatIdOffset >>> 32));
result = 31 * result + (int) (orderOffset ^ (orderOffset >>> 32));
return result;
}
}
public static Mono<Set<Chat>> getAllHomeChatsSet(TransferClient client) {
return getAllHomeChats(client).collect(Collectors.toSet());
}
/**
* @return flux of home chats. They can repeat themselves
*/
public static Flux<Chat> getAllHomeChats(TransferClient client) {
logger.debug("Getting the full chat list");
var singleScheduler = Schedulers.newSingle("getallchats");
return Mono
.deferWithContext((context) -> {
var offsets = Objects.requireNonNull(context.<AtomicReference<ChatIdAndOrderOffsets>>get("offsets"));
var offsetsValue = offsets.get();
return client.<Chats>send(new GetChats(new ChatListMain(),
offsetsValue.getOrderOffset(),
offsetsValue.getChatIdOffset(),
100))
.flatMap(MonoUtils::orElseThrow)
.publishOn(singleScheduler)
.flatMapMany(chats -> Flux.fromStream(Arrays.stream(chats.chatIds).boxed()))
.flatMap(chatId -> {
return client.<Chat>send(new GetChat(chatId))
.publishOn(singleScheduler)
.flatMap(MonoUtils::orElseThrow);
})
.collectList()
.doOnNext(chats -> {
if (!chats.isEmpty()) {
var lastChat = chats.get(chats.size() - 1);
getMainChatListPosition(lastChat.positions).ifPresentOrElse(lastChatPosition -> {
offsets.set(new ChatIdAndOrderOffsets(lastChat.id, lastChatPosition.order));
}, () -> {
offsets.set(new ChatIdAndOrderOffsets(lastChat.id, 0L));
});
} else {
offsets.set(new ChatIdAndOrderOffsets(9223372036854775807L, 0L));
}
})
.filter(chats1 -> !chats1.isEmpty())
.subscriberContext(context);
})
.repeatWhen(nFlux -> nFlux.takeWhile(n -> n > 0))
.flatMap(Flux::fromIterable)
.subscriberContext(ctx -> ctx.put("offsets", new AtomicReference<>(new ChatIdAndOrderOffsets(0L, 9223372036854775807L))));
}
private static Optional<ChatPosition> getMainChatListPosition(ChatPosition[] positions) {
for (ChatPosition position : positions) {
if (position.list.getConstructor() == ChatListMain.CONSTRUCTOR) {
return Optional.of(position);
}
}
return Optional.empty();
}
} }