package it.cavallium; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationState; import it.tdlight.jni.TdApi.AuthorizationStateReady; import it.tdlight.jni.TdApi.Chat; import it.tdlight.jni.TdApi.ChatMemberStatusAdministrator; import it.tdlight.jni.TdApi.ChatMemberStatusCreator; import it.tdlight.jni.TdApi.ChatMemberStatusMember; import it.tdlight.jni.TdApi.GetSupergroupFullInfo; import it.tdlight.jni.TdApi.Object; import it.tdlight.jni.TdApi.Supergroup; import it.tdlight.jni.TdApi.SupergroupFullInfo; import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.jni.TdApi.UpdateNewChat; import it.tdlight.jni.TdApi.UpdateSupergroup; import it.tdlight.jni.TdApi.UpdateSupergroupFullInfo; import it.tdlight.jni.TdApi.User; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.tdlibsession.td.easy.AsyncTdEasy; import it.tdlight.utils.MonoUtils; import java.time.Duration; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; public class TransferClient { private final String alias; private final AsyncTdEasy client; private final AtomicReference clientUser = new AtomicReference<>(null); private final ConcurrentHashMap supergroupInfos = new ConcurrentHashMap<>(); private final ConcurrentHashMap supergroupFullInfos = new ConcurrentHashMap<>(); private final ConcurrentHashMap chats = new ConcurrentHashMap<>(); private final EmitterProcessor> usefulSupergroups = EmitterProcessor.create(); private final Scheduler scheduler; public TransferClient(String alias, AsyncTdEasy client) { this.alias = alias; this.client = client; this.scheduler = Schedulers.boundedElastic(); this.client.getIncomingUpdates() .subscribeOn(scheduler) .publishOn(scheduler) .flatMap(this::onUpdate) .subscribe(); } public User getClientUser() { return Objects.requireNonNull(clientUser.get(), "Userbot details not already received!"); } private Mono onUpdate(Update update) { switch (update.getConstructor()) { case UpdateSupergroup.CONSTRUCTOR: return this.onUpdateSupergroup(((UpdateSupergroup) update).supergroup); case UpdateSupergroupFullInfo.CONSTRUCTOR: return this.onUpdateSupergroupFullInfo( ((UpdateSupergroupFullInfo) update).supergroupId, ((UpdateSupergroupFullInfo) update).supergroupFullInfo); case UpdateNewChat.CONSTRUCTOR: return this.onChat(((UpdateNewChat) update).chat); case UpdateAuthorizationState.CONSTRUCTOR: return this.onUpdateAuthorizationState(((UpdateAuthorizationState) update).authorizationState); default: return Mono.empty(); } } private Mono onUpdateAuthorizationState(AuthorizationState authorizationState) { switch (authorizationState.getConstructor()) { case AuthorizationStateReady.CONSTRUCTOR: return TransferUtils .getAllHomeChats(this) .timeout(Duration.ofMinutes(2)) .flatMap(this::onChat) .then(client.send(new TdApi.GetMe()) .flatMap(MonoUtils::orElseThrow) .timeout(Duration.ofSeconds(30)) .doOnSuccess(this.clientUser::set)) .then(); default: return Mono.empty(); } } private Mono onChat(Chat chat) { if (chats.put(chat.id, chat) == null) { // ok } return Mono.empty(); } private Mono onUpdateSupergroup(Supergroup supergroup) { // Fast checks to ignore most unwanted infos if (!supergroup.isChannel) { boolean isRelevantSupergroup = false; if (supergroup.status.getConstructor() == ChatMemberStatusAdministrator.CONSTRUCTOR || supergroup.status.getConstructor() == ChatMemberStatusCreator.CONSTRUCTOR) { isRelevantSupergroup = true; } else if (!StaticSettings.requiresAdminPrivilegesOnSourceSupergroup()) { switch (supergroup.status.getConstructor()) { case ChatMemberStatusCreator.CONSTRUCTOR: case ChatMemberStatusAdministrator.CONSTRUCTOR: case ChatMemberStatusMember.CONSTRUCTOR: isRelevantSupergroup = true; break; default: isRelevantSupergroup = false; break; } } if (isRelevantSupergroup) { if (supergroupInfos.put(supergroup.id, supergroup) == null) { return this.send(new GetSupergroupFullInfo(supergroup.id)) .filter(TdResult::succeeded) .map(TdResult::result) .flatMap(supergroupFullInfo -> onUpdateSupergroupFullInfo(supergroup.id, supergroupFullInfo)); } } } return Mono.empty(); } private Mono onUpdateSupergroupFullInfo(int id, SupergroupFullInfo supergroupFullInfo) { if (supergroupFullInfos.put(id, supergroupFullInfo) == null) { var sgInfo = getSupergroupBaseChatInfo(id); sgInfo.ifPresent(t -> this.usefulSupergroups.onNext(new ItemUpdate<>(false, t))); // ok } return Mono.empty(); } /** * Sends request to TDLib. * @return The response or {@link TdApi.Error}. */ public Mono> send(TdApi.Function request) { return client.send(request).publishOn(scheduler); } public Set getAdminSupergroups(boolean requireCanRestrictMembers, boolean requireCanInviteUsers) { return supergroupFullInfos.entrySet().stream().flatMap(entry -> { int id = entry.getKey(); return getSupergroupBaseChatInfo(id).filter(sgInfo -> { if (!sgInfo.canRestrictMembers() && requireCanRestrictMembers) { return false; } if (!sgInfo.canInviteUsers() && requireCanInviteUsers) { return false; } return true; }).map(SupergroupInfo::getBaseChatInfo).stream(); }).collect(Collectors.toSet()); } private Optional getSupergroupBaseChatInfo(int id) { var chatInfo = chats.get(TransferUtils.chatEntityIdToChatId(id, TChatType.SUPERGROUP)); var baseInfo = supergroupInfos.get(id); if (chatInfo == null || baseInfo == null) { return Optional.empty(); } else { var baseChatInfo = new BaseChatInfo(TransferUtils.chatEntityIdToChatId(id, TChatType.SUPERGROUP), chatInfo.title); if (baseInfo.status.getConstructor() == ChatMemberStatusCreator.CONSTRUCTOR) { var sgInfo = new SupergroupInfo(baseChatInfo, true, true); return Optional.of(sgInfo); } else if (baseInfo.status.getConstructor() == ChatMemberStatusAdministrator.CONSTRUCTOR) { var adminStatus = (ChatMemberStatusAdministrator) baseInfo.status; var sgInfo = new SupergroupInfo(baseChatInfo, adminStatus.canRestrictMembers, adminStatus.canInviteUsers); return Optional.of(sgInfo); } else if (!StaticSettings.requiresAdminPrivilegesOnSourceSupergroup()) { switch (baseInfo.status.getConstructor()) { case ChatMemberStatusCreator.CONSTRUCTOR: case ChatMemberStatusAdministrator.CONSTRUCTOR: case ChatMemberStatusMember.CONSTRUCTOR: var sgInfo = new SupergroupInfo(baseChatInfo, false, false); return Optional.of(sgInfo); default: return Optional.empty(); } } else { return Optional.empty(); } } } public Flux> subscribeAdminSupergroups() { return usefulSupergroups.hide(); } @Override public String toString() { return alias; } }