package it.cavallium; import static it.cavallium.PrimaryController.getUserbotPhoneNumber; import static it.cavallium.StaticSettings.REMOVE_MEMBERS_FROM_SOURCE_SUPERGROUP; import static it.cavallium.StaticSettings.requiresAdminPrivilegesOnSourceSupergroup; import com.google.i18n.phonenumbers.NumberParseException; import com.google.i18n.phonenumbers.PhoneNumberUtil; import com.google.i18n.phonenumbers.PhoneNumberUtil.PhoneNumberFormat; import com.google.i18n.phonenumbers.Phonenumber.PhoneNumber; import com.hazelcast.cp.internal.util.Tuple2; import com.hazelcast.cp.internal.util.Tuple3; import io.vertx.core.impl.ConcurrentHashSet; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.AuthorizationStateClosing; import it.tdlight.jni.TdApi.AuthorizationStateLoggingOut; import it.tdlight.jni.TdApi.AuthorizationStateReady; import it.tdlight.jni.TdApi.ChatMemberStatusAdministrator; import it.tdlight.jni.TdApi.ChatMemberStatusCreator; import it.tdlight.jni.TdApi.ChatMemberStatusLeft; import it.tdlight.jni.TdApi.ChatMemberStatusMember; import it.tdlight.jni.TdApi.GetMe; import it.tdlight.jni.TdApi.GetUserPrivacySettingRules; import it.tdlight.jni.TdApi.Ok; import it.tdlight.jni.TdApi.Supergroup; import it.tdlight.jni.TdApi.SupergroupFullInfo; import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.User; import it.tdlight.jni.TdApi.UserFullInfo; import it.tdlight.jni.TdApi.UserTypeRegular; import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.easy.AsyncTdEasy; import it.tdlight.tdlibsession.td.easy.ParameterInfoPasswordHint; import it.tdlight.tdlibsession.td.easy.TdEasySettings; import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.tdlibsession.td.middle.direct.AsyncTdMiddleDirect; import it.tdlight.utils.MonoUtils; import it.tdlight.utils.TdLightUtils; import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; import it.unimi.dsi.fastutil.objects.ObjectSets; import java.io.File; import java.time.Duration; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import org.slf4j.event.Level; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; public class TransferServiceImpl implements TransferService { private final TdClusterManager clusterManager; private final ConcurrentHashMap clients = new ConcurrentHashMap<>(); private final ConcurrentHashMap> supergroupClients = new ConcurrentHashMap<>(); private final EmitterProcessor> newClients = EmitterProcessor.create(); private final Scheduler updatesScheduler; private int apiId; private String apiHash; public TransferServiceImpl(TdClusterManager clusterManager) { this.clusterManager = clusterManager; this.updatesScheduler = Schedulers.boundedElastic(); } @Override public void setApiId(int apiId) { this.apiId = apiId; } @Override public void setApiHash(String apiHash) { this.apiHash = apiHash; } @Override public Mono addUserbot(PhoneNumber phoneNumber, Function> codeSupplier, BiFunction> otpSupplier, Supplier> onUserbotClosed) { long phoneNumberLong = getLongPhoneNumber(phoneNumber); if (clients.containsKey(phoneNumberLong)) { return Mono.just(AddUserBotResult.newFailed("Userbot already added!")); } String alias = PhoneNumberUtil.getInstance().format(phoneNumber, PhoneNumberFormat.INTERNATIONAL); return Mono .fromCallable(() -> { return AsyncTdMiddleDirect.getAndDeployInstance(clusterManager, alias, "" + phoneNumberLong); }) .subscribeOn(Schedulers.boundedElastic()) .flatMap(v -> v) .map(middle -> new AsyncTdEasy(middle, alias)).flatMap(client -> { return client .execute(new TdApi.SetLogVerbosityLevel(0)) .then(client .create(TdEasySettings .newBuilder() .setUseMessageDatabase(false) .setUseFileDatabase(false) .setUseChatInfoDatabase(false) .setApiId(apiId) .setApiHash(apiHash) .setEnableStorageOptimizer(false) .setApplicationVersion(App.VERSION) .setDatabaseDirectory("sessions" + File.separator + "userbot_" + phoneNumberLong) .setIgnoreFileNames(true) .setPhoneNumber(phoneNumberLong) .setSystemLanguageCode("en") .setDeviceModel(System.getProperty("os.name")) .setSystemVersion(System.getProperty("os.version")) .setParameterRequestHandler((parameter, parameterInfo) -> { switch (parameter) { case ASK_FIRST_NAME: return Mono.just("FirstName"); case ASK_LAST_NAME: return Mono.just("LastName"); case ASK_CODE: return codeSupplier .apply(client) .map(i -> "" + i) .switchIfEmpty(client .send(new TdApi.Close()) .materialize() .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) .dematerialize() .then(Mono.empty())); case ASK_PASSWORD: return otpSupplier .apply(client, ((ParameterInfoPasswordHint) parameterInfo).getHint()) .switchIfEmpty(client .send(new TdApi.Close()) .materialize() .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) .dematerialize() .then(Mono.empty())); case NOTIFY_LINK: default: return Mono.empty(); } }) .build()) .then(Mono.defer(() -> { var clientStateFlux = client.getState().publish().autoConnect(3); clientStateFlux .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR || state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR || state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR) .take(1) .singleOrEmpty() .then() .materialize() .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) .dematerialize() .subscribeOn(updatesScheduler) .subscribe(state -> System.out.println("Userbot closed with state: " + state)); clientStateFlux.subscribeOn(updatesScheduler).subscribe(state -> System.out.println("state: " + state)); client .getIncomingUpdates() .subscribeOn(updatesScheduler) .flatMap(update -> onClientUpdate(update)) .subscribe(_v -> {}, e -> System.err.println(e)); return clientStateFlux .subscribeOn(updatesScheduler) .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR || state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR || state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR || state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR) .take(1) .singleOrEmpty() .doOnNext(state -> System.out.println("aState: " + state)) .handle((state, sink) -> { if (state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR) { sink.complete(); } else { sink.error(new Exception(state.getClass().getSimpleName())); } }) .then(); })) .doOnSuccess((clientUser) -> { var newClient = new TransferClient(alias, client); clients.put(phoneNumberLong, newClient); newClient.subscribeAdminSupergroups().doOnNext(supergroupInfoItemUpdate -> { if (supergroupInfoItemUpdate.isRemoved()) { supergroupClients.compute(supergroupInfoItemUpdate.getItem().getBaseChatInfo().getSupergroupIdInt(), (sgId, clients) -> { if (clients != null) { clients.remove(newClient); } return clients; }); } else { supergroupClients.compute(supergroupInfoItemUpdate.getItem().getBaseChatInfo().getSupergroupIdInt(), (sgId, clients) -> { if (clients == null) { clients = ObjectSets.synchronize(new ObjectOpenHashSet<>()); } clients.add(newClient); return clients; }); } }).subscribe(); newClients.onNext(new ItemUpdate<>(false, newClient)); })); }) .map(_v -> AddUserBotResult.newSuccess()); } private Mono onClientUpdate(Update update) { return Mono.empty(); } private static long getLongPhoneNumber(PhoneNumber phoneNumber) { return Long.parseLong(PhoneNumberUtil .getInstance() .format(phoneNumber, PhoneNumberFormat.E164) .replace("+", "")); } @Override public Mono closeUserbot(PhoneNumber phoneNumber) { var client = clients.remove(getLongPhoneNumber(phoneNumber)); newClients.onNext(new ItemUpdate<>(true, client)); if (client == null) { return Mono.error(new Exception("Userbot " + phoneNumber + " was not found!")); } else { return MonoUtils.thenOrError(client.send(new TdApi.Close())); } } @Override public Set getPhoneNumbers() { var phonenumbers = new HashSet(); clients.forEach((phoneNumberLong, client) -> { try { phonenumbers.add(getUserbotPhoneNumber("+" + phoneNumberLong)); } catch (NumberParseException e) { // Can't happen e.printStackTrace(); } }); return phonenumbers; } @Override public Set getAdminSupergroups(boolean canRestrictMembers, boolean canInviteUsers) { var adminSupergroups = clients .values() .stream() .flatMap((TransferClient transferClient) -> transferClient .getAdminSupergroups(canRestrictMembers, canInviteUsers) .stream()) .collect(Collectors.toSet()); return adminSupergroups; } @Override public Flux> subscribeAdminSupergroups() { return Flux.merge(this.newClients, Flux .fromStream(clients.values().stream().map(client -> new ItemUpdate<>(false, client)))) .filter(itemClient -> !itemClient.isRemoved()) .map(ItemUpdate::getItem) .map(TransferClient::subscribeAdminSupergroups) .flatMap(f -> f); } @Override public Mono transferMembers(BaseChatInfo sourceGroup, BaseChatInfo destGroup, Function> userStatusConsumer, Function> percentageConsumer, Function> phaseDescriptionConsumer) { var sourceSupergroupClients = this.supergroupClients .getOrDefault(sourceGroup.getSupergroupIdInt(), Set.of()) .stream() .filter(clients::containsValue) .collect(Collectors.toSet()); if (sourceSupergroupClients.isEmpty()) { return Mono.error(new Exception("No userbot can remove members from the source group")); } var destSupergroupClients = this.supergroupClients .getOrDefault(destGroup.getSupergroupIdInt(), Set.of()) .stream() .filter(clients::containsValue) .collect(Collectors.toSet()); if (destSupergroupClients.isEmpty()) { return Mono.error(new Exception("No userbot can add members to the destination group")); } AtomicInteger transferredSuccessfullyUsersStats = new AtomicInteger(0); return percentageConsumer .apply(0) .then(phaseDescriptionConsumer.apply("Transfer from " + sourceGroup.getTitle() + " to " + destGroup.getTitle())) // Check and get the set of userbots that can transfer users from group X to group Y .then(phaseDescriptionConsumer.apply("Checking available userbots for " + (REMOVE_MEMBERS_FROM_SOURCE_SUPERGROUP ? "removing" : "managing") + " users in the source group")) .thenMany(Flux.fromIterable(sourceSupergroupClients)) .flatMap(client -> client .send(new TdApi.GetMe()) .timeout(Duration.ofSeconds(5)) .flatMap(MonoUtils::orElseThrow) .flatMap(_v -> client.send(new TdApi.GetSupergroupFullInfo(sourceGroup.getSupergroupIdInt()))) .flatMap(MonoUtils::orElseThrow) .timeout(Duration.ofSeconds(5)) .flatMap(_v -> client.send(new TdApi.GetSupergroup(sourceGroup.getSupergroupIdInt()))) .flatMap(MonoUtils::orElseThrow) .timeout(Duration.ofSeconds(5)) .filter(sourceGroupFullInfo -> { if (requiresAdminPrivilegesOnSourceSupergroup()) { if (sourceGroupFullInfo.status.getConstructor() == ChatMemberStatusAdministrator.CONSTRUCTOR) { var statusAdmin = (ChatMemberStatusAdministrator) sourceGroupFullInfo.status; if (statusAdmin.canRestrictMembers) { return true; } else { App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't restrict members of group " + sourceGroup.getTitle()); return false; } } else if (sourceGroupFullInfo.status.getConstructor() == ChatMemberStatusCreator.CONSTRUCTOR) { return true; } else { App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't administer group " + sourceGroup.getTitle()); return false; } } else { switch (sourceGroupFullInfo.status.getConstructor()) { case ChatMemberStatusAdministrator.CONSTRUCTOR: case ChatMemberStatusCreator.CONSTRUCTOR: case ChatMemberStatusMember.CONSTRUCTOR: return true; default: App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't access group " + sourceGroup.getTitle() + " (probably restricted, left or banned)"); return false; } } }) .map(_v -> client) .onErrorResume(e -> { App.getLogService().append(Level.WARN, "Userbot " + client + " failed: " + e.getLocalizedMessage()); return Mono.empty(); })) .collect(Collectors.toSet()) .flatMap(transferSourceClients -> { return phaseDescriptionConsumer.apply("Checking available userbots for adding users in the destination group") .thenMany(Flux.fromIterable(destSupergroupClients)) .flatMap(client -> client .send(new TdApi.GetMe()) .flatMap(MonoUtils::orElseThrow) .timeout(Duration.ofSeconds(5)) .flatMap(_v -> client.send(new TdApi.GetSupergroup(destGroup.getSupergroupIdInt()))) .flatMap(MonoUtils::orElseThrow) .timeout(Duration.ofSeconds(5)) .filter(destGroupFullInfo -> { if (destGroupFullInfo.status.getConstructor() == ChatMemberStatusAdministrator.CONSTRUCTOR) { var statusAdmin = (ChatMemberStatusAdministrator) destGroupFullInfo.status; if (statusAdmin.canInviteUsers) { return true; } else { App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't invite members to group " + destGroup.getTitle()); } } else if (destGroupFullInfo.status.getConstructor() == ChatMemberStatusCreator.CONSTRUCTOR) { return true; } else { App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't administer group " + destGroup.getTitle()); } return false; }) .map(_v -> client) .onErrorResume(e -> { App.getLogService().append(Level.WARN, "Userbot " + client + " failed: " + e.getLocalizedMessage()); return Mono.empty(); })) .collect(Collectors.toSet()) .map(transferDestClients -> Tuple2.of(transferSourceClients, transferDestClients)); }) .map(clientsTuple -> { var sourceClients = clientsTuple.element1; var destClients = clientsTuple.element2; App.getLogService().append(Level.INFO, "Found source userbots: " + sourceClients.stream().map(TransferClient::toString).collect(Collectors.joining(", "))); App.getLogService().append(Level.INFO, "Found destination userbots: " + destClients.stream().map(TransferClient::toString).collect(Collectors.joining(", "))); var chosenClients = new HashSet(sourceClients); chosenClients.retainAll(destClients); return chosenClients; }) .filter(chosenClients -> !chosenClients.isEmpty()) .doOnNext(chosenClients -> { App.getLogService().append(Level.INFO, "Chosen userbots: " + chosenClients.stream().map(TransferClient::toString).collect(Collectors.joining(", "))); }) .switchIfEmpty(Mono.defer(() -> { App.getLogService().append(Level.ERROR, "No userbots are admin in both groups!"); return Mono.error(new Exception("No userbots are admin in both groups!")); })) // Now we have a set of userbots that can transfer the users // Get the list of members of the first group from a bot .flatMap(clients -> { return phaseDescriptionConsumer.apply("Obtaining group members") .then(percentageConsumer.apply(5)).thenReturn(clients); }).flatMap(clients -> { // Get the members of the source group return Flux .fromIterable(clients) .flatMap(client -> { return Mono.fromCallable(() -> { var members = TransferUtils.getSupergroupMembers(client, sourceGroup.getSupergroupIdInt()); App.getLogService().append(Level.INFO, "Source group has " + members.size() + " members."); return members; }); } ) .last() .onErrorMap(error -> { error.printStackTrace(); return new Exception("Error while obtaining source group members: " + error.getLocalizedMessage(), error); }) .subscribeOn(Schedulers.boundedElastic()) .map(members -> Tuple2.of(clients, members)); }) // Finished getting the list of members of the source group // Resolve users .flatMap(context -> { return phaseDescriptionConsumer.apply("Resolving users") .then(percentageConsumer.apply(10)).thenReturn(context); }) .flatMap(context -> { var clients = context.element1; var unresolvedUsers = context.element2; return Flux .fromIterable(clients) .flatMap(client -> Flux .fromIterable(unresolvedUsers) .flatMap(userId -> client.send(new TdApi.GetUser(userId))) .flatMap(MonoFxUtils::orElseLogSkipError) .timeout(Duration.ofMinutes(2)) .onErrorResume(error -> { App.getLogService().append(Level.WARN, "Error while resolving an user: " + error); return Mono.empty(); }) .collect(Collectors.toSet()) ) .last() .map(resolvedUsers -> Tuple2.of(clients, resolvedUsers)); }) // Finished resolving users // Filter out unsuitable users .flatMap(context -> { return phaseDescriptionConsumer.apply("Filtering users") .then(percentageConsumer.apply(15)).thenReturn(context); }) .flatMap(context -> { var clients = context.element1; var unfilteredUsers = context.element2; return Flux .fromIterable(unfilteredUsers) .flatMap(user -> { if (this.clients.values().stream().map(TransferClient::getClientUser).noneMatch(clientUser -> clientUser.id == user.id)) { return Mono.just(user); } return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.NOT_REGULAR_USER, "")).then(Mono.empty()); }) .flatMap(user -> { if (user.haveAccess) { return Mono.just(user); } return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.NO_ACCESS_HASH, "")).then(Mono.empty()); }) .flatMap(user -> { if (user.type.getConstructor() == UserTypeRegular.CONSTRUCTOR) { return Mono.just(user); } return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.NOT_REGULAR_USER, "")).then(Mono.empty()); }) .flatMap(user -> { if (!user.isScam) { return Mono.just(user); } return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.SCAM_USER, "")).then(Mono.empty()); }) .flatMap(user -> { if (user.restrictionReason == null || user.restrictionReason.isEmpty()) { return Mono.just(user); } return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.RESTRICTED_USER, "Restricted user: " + user.restrictionReason)).then(Mono.empty()); }) .flatMap(user -> { return userStatusConsumer .apply(new UserStatus(getName(user), user.id, UserStatusType.JUST_FOUND, "")) .thenReturn(user); }) .collect(Collectors.toSet()) .map(resolvedUsers -> Tuple3.of(clients, resolvedUsers, unfilteredUsers.size())); }) // Finished filtering unsuitable users // Transfer users .flatMap(context -> { return phaseDescriptionConsumer.apply("Transferring users") .then(percentageConsumer.apply(20)).thenReturn(context); }) .flatMap(context -> { var clients = context.element1; var users = context.element2; var totalUsersCount = context.element3; var client = clients.stream().skip(ThreadLocalRandom.current().nextInt(clients.size())).findFirst().orElseThrow(); AtomicInteger processedUsersStats = new AtomicInteger(0); return Flux .fromIterable(users) .flatMap(user -> { return percentageConsumer .apply(20 + processedUsersStats.getAndIncrement() / users.size() * (100 - 20)) .thenReturn(user); }) .flatMap(user -> { return client.send(new TdApi.AddChatMember(destGroup.getSupergroupId(), user.id, 0)) .flatMap(result -> { if (result.failed()) { if (TdLightUtils.errorEquals(new TdError(result.cause().code, result.cause().message), 403, "USER_PRIVACY_RESTRICTED")) { return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.USER_PRIVACY_RESTRICTED, "")).then(Mono.empty()); } else if (TdLightUtils.errorEquals(new TdError(result.cause().code, result.cause().message), 400, "USER_NOT_MUTUAL_CONTACT")) { return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.USER_NOT_MUTUAL_CONTACT, "")).then(Mono.empty()); } } return Mono.just(result); }) .flatMap(MonoUtils::orElseThrow) .timeout(Duration.ofMinutes(2)) .onErrorResume((error) -> { App.getLogService().append(Level.WARN, "Can't add user \"" + getName(user) + "\" to supergroup \"" + destGroup.getSupergroupIdInt() + " " + destGroup.getTitle() + "\""); return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.CANT_ADD, "Can't add to destination supergroup: " + error.getLocalizedMessage())).then(Mono.empty()); }) .flatMap(_v -> { return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.ADDED_AND_WAITING_TO_BE_REMOVED, "")).then(Mono.empty()).thenReturn(user); }); }) .flatMap(user -> { if (REMOVE_MEMBERS_FROM_SOURCE_SUPERGROUP) { // Remove the user from the source supergroup return client.send(new TdApi.SetChatMemberStatus(sourceGroup.getSupergroupId(), user.id, new ChatMemberStatusLeft())) .flatMap(result -> { if (result.failed()) { if (TdLightUtils.errorEquals(new TdError(result.cause().code, result.cause().message), 3, "Can't remove chat owner")) { return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.CANT_REMOVE_CHAT_OWNER, "")).then(Mono.empty()); } } return Mono.just(result); }) .flatMap(MonoUtils::orElseThrow) .timeout(Duration.ofMinutes(2)) .onErrorResume((error) -> { App.getLogService().append(Level.WARN, "Can't remove user \"" + getName(user) + "\" from supergroup \"" + sourceGroup.getSupergroupIdInt() + " " + sourceGroup.getTitle() + "\""); return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.CANT_REMOVE, "Can't remove from source supergroup: " + error.getLocalizedMessage())).then(Mono.empty()); }) .flatMap(_v -> { transferredSuccessfullyUsersStats.incrementAndGet(); return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.DONE, "")).then(Mono.empty()).thenReturn(user); }); } else { // Don't remove the user from the source supergroup return Mono.just(user); } }) .delayElements(App.getSettingsService().getDelayBetweenAdds()) .collect(Collectors.toSet()) .map(resolvedUsers -> Tuple3.of(clients, resolvedUsers, totalUsersCount)); }) // Finished transferring users .doOnNext(context -> { App.getLogService().append(Level.INFO, "Transfer done. Transferred " + transferredSuccessfullyUsersStats.get() + "/" + context.element3 + " users"); }) .then(percentageConsumer.apply(100)) .then(phaseDescriptionConsumer.apply("Done")) .then(Mono.delay(Duration.ofMillis(500))) .then(); } private static String getName(User user) { return String.join(" ", List.of("" + user.id, user.firstName, user.lastName)); } @Override public Mono quit() { return Flux .fromIterable(clients.values()) .flatMap(client -> client.send(new TdApi.Close())) .onErrorResume(error -> { App.getLogService().append(Level.ERROR, "Can't close a tdlib instance: " + error.getLocalizedMessage()); return Mono.empty(); }) .collectList() .then(); } }