diff --git a/install4j.install4j b/install4j.install4j index bc77b0d..da28210 100644 --- a/install4j.install4j +++ b/install4j.install4j @@ -9,18 +9,18 @@ - - - - + + + + + - diff --git a/src/main/java/it/cavallium/TransferClient.java b/src/main/java/it/cavallium/TransferClient.java index 6374961..69056a2 100644 --- a/src/main/java/it/cavallium/TransferClient.java +++ b/src/main/java/it/cavallium/TransferClient.java @@ -15,12 +15,17 @@ import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.jni.TdApi.UpdateNewChat; import it.tdlight.jni.TdApi.UpdateSupergroup; import it.tdlight.jni.TdApi.UpdateSupergroupFullInfo; +import it.tdlight.jni.TdApi.User; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.tdlibsession.td.easy.AsyncTdEasy; +import it.tdlight.utils.MonoUtils; +import java.time.Duration; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; @@ -32,6 +37,7 @@ public class TransferClient { private final String alias; private final AsyncTdEasy client; + private final AtomicReference clientUser = new AtomicReference<>(null); private final ConcurrentHashMap supergroupInfos = new ConcurrentHashMap<>(); private final ConcurrentHashMap supergroupFullInfos = new ConcurrentHashMap<>(); private final ConcurrentHashMap chats = new ConcurrentHashMap<>(); @@ -51,6 +57,10 @@ public class TransferClient { .subscribe(); } + public User getClientUser() { + return Objects.requireNonNull(clientUser.get(), "Userbot details not already received!"); + } + private Mono onUpdate(Update update) { switch (update.getConstructor()) { case UpdateSupergroup.CONSTRUCTOR: @@ -71,7 +81,15 @@ public class TransferClient { private Mono onUpdateAuthorizationState(AuthorizationState authorizationState) { switch (authorizationState.getConstructor()) { case AuthorizationStateReady.CONSTRUCTOR: - return TransferUtils.getAllHomeChats(this).flatMap(this::onChat).then(); + return TransferUtils + .getAllHomeChats(this) + .timeout(Duration.ofMinutes(2)) + .flatMap(this::onChat) + .then(client.send(new TdApi.GetMe()) + .flatMap(MonoUtils::orElseThrow) + .timeout(Duration.ofSeconds(30)) + .doOnSuccess(this.clientUser::set)) + .then(); default: return Mono.empty(); } diff --git a/src/main/java/it/cavallium/TransferServiceImpl.java b/src/main/java/it/cavallium/TransferServiceImpl.java index df2d8fc..f10548a 100644 --- a/src/main/java/it/cavallium/TransferServiceImpl.java +++ b/src/main/java/it/cavallium/TransferServiceImpl.java @@ -8,6 +8,7 @@ import com.google.i18n.phonenumbers.PhoneNumberUtil.PhoneNumberFormat; import com.google.i18n.phonenumbers.Phonenumber.PhoneNumber; import com.hazelcast.cp.internal.util.Tuple2; import com.hazelcast.cp.internal.util.Tuple3; +import io.vertx.core.impl.ConcurrentHashSet; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.AuthorizationStateClosing; @@ -16,6 +17,7 @@ import it.tdlight.jni.TdApi.AuthorizationStateReady; import it.tdlight.jni.TdApi.ChatMemberStatusAdministrator; import it.tdlight.jni.TdApi.ChatMemberStatusCreator; import it.tdlight.jni.TdApi.ChatMemberStatusLeft; +import it.tdlight.jni.TdApi.GetMe; import it.tdlight.jni.TdApi.GetUserPrivacySettingRules; import it.tdlight.jni.TdApi.Ok; import it.tdlight.jni.TdApi.Supergroup; @@ -189,7 +191,7 @@ public class TransferServiceImpl implements TransferService { }) .then(); })) - .doOnSuccess((_v) -> { + .doOnSuccess((clientUser) -> { var newClient = new TransferClient(alias, client); clients.put(phoneNumberLong, newClient); @@ -313,10 +315,10 @@ public class TransferServiceImpl implements TransferService { .send(new TdApi.GetMe()) .timeout(Duration.ofSeconds(5)) .flatMap(MonoUtils::orElseThrow) - .then(client.send(new TdApi.GetSupergroupFullInfo(sourceGroup.getSupergroupIdInt()))) + .flatMap(_v -> client.send(new TdApi.GetSupergroupFullInfo(sourceGroup.getSupergroupIdInt()))) .flatMap(MonoUtils::orElseThrow) .timeout(Duration.ofSeconds(5)) - .then(client.send(new TdApi.GetSupergroup(sourceGroup.getSupergroupIdInt()))) + .flatMap(_v -> client.send(new TdApi.GetSupergroup(sourceGroup.getSupergroupIdInt()))) .flatMap(MonoUtils::orElseThrow) .timeout(Duration.ofSeconds(5)) .filter(sourceGroupFullInfo -> { @@ -347,7 +349,7 @@ public class TransferServiceImpl implements TransferService { .send(new TdApi.GetMe()) .flatMap(MonoUtils::orElseThrow) .timeout(Duration.ofSeconds(5)) - .then(client.send(new TdApi.GetSupergroup(destGroup.getSupergroupIdInt()))) + .flatMap(_v -> client.send(new TdApi.GetSupergroup(destGroup.getSupergroupIdInt()))) .flatMap(MonoUtils::orElseThrow) .timeout(Duration.ofSeconds(5)) .filter(destGroupFullInfo -> { @@ -397,15 +399,20 @@ public class TransferServiceImpl implements TransferService { return phaseDescriptionConsumer.apply("Obtaining group members") .then(percentageConsumer.apply(5)).thenReturn(clients); }).flatMap(clients -> { - // Get the first userbot - var client = clients.stream().findAny().orElseThrow(() -> new NullPointerException("No userbots found")); - // Get the members of the source group - return Mono.fromCallable(() -> { - var members = TransferUtils.getSupergroupMembers(client, sourceGroup.getSupergroupIdInt()); - App.getLogService().append(Level.INFO, "Source group has " + members.size() + " members."); - return members; - }).subscribeOn(Schedulers.boundedElastic()).map(members -> Tuple3.of(client, clients, members)); + return Flux + .fromIterable(clients) + .flatMap(client -> { + return Mono.fromCallable(() -> { + var members = TransferUtils.getSupergroupMembers(client, sourceGroup.getSupergroupIdInt()); + App.getLogService().append(Level.INFO, "Source group has " + members.size() + " members."); + return members; + }); + } + ) + .last() + .subscribeOn(Schedulers.boundedElastic()) + .map(members -> Tuple2.of(clients, members)); }) // Finished getting the list of members of the source group @@ -415,20 +422,23 @@ public class TransferServiceImpl implements TransferService { .then(percentageConsumer.apply(10)).thenReturn(context); }) .flatMap(context -> { - var client = context.element1; - var clients = context.element2; - var unresolvedUsers = context.element3; + var clients = context.element1; + var unresolvedUsers = context.element2; return Flux - .fromIterable(unresolvedUsers) - .flatMap(userId -> client.send(new TdApi.GetUser(userId))) - .flatMap(MonoFxUtils::orElseLogSkipError) - .timeout(Duration.ofMinutes(2)) - .onErrorResume(error -> { - App.getLogService().append(Level.WARN, "Error while resolving an user: " + error); - return Mono.empty(); - }) - .collect(Collectors.toSet()) - .map(resolvedUsers -> Tuple3.of(client, clients, resolvedUsers)); + .fromIterable(clients) + .flatMap(client -> Flux + .fromIterable(unresolvedUsers) + .flatMap(userId -> client.send(new TdApi.GetUser(userId))) + .flatMap(MonoFxUtils::orElseLogSkipError) + .timeout(Duration.ofMinutes(2)) + .onErrorResume(error -> { + App.getLogService().append(Level.WARN, "Error while resolving an user: " + error); + return Mono.empty(); + }) + .collect(Collectors.toSet()) + ) + .last() + .map(resolvedUsers -> Tuple2.of(clients, resolvedUsers)); }) // Finished resolving users @@ -438,11 +448,16 @@ public class TransferServiceImpl implements TransferService { .then(percentageConsumer.apply(15)).thenReturn(context); }) .flatMap(context -> { - var client = context.element1; - var clients = context.element2; - var unfilteredUsers = context.element3; + var clients = context.element1; + var unfilteredUsers = context.element2; return Flux .fromIterable(unfilteredUsers) + .flatMap(user -> { + if (this.clients.values().stream().map(TransferClient::getClientUser).noneMatch(clientUser -> clientUser.id == user.id)) { + return Mono.just(user); + } + return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.NOT_REGULAR_USER, "")).then(Mono.empty()); + }) .flatMap(user -> { if (user.haveAccess) { return Mono.just(user); @@ -473,7 +488,7 @@ public class TransferServiceImpl implements TransferService { .thenReturn(user); }) .collect(Collectors.toSet()) - .map(resolvedUsers -> Tuple2.of(clients, resolvedUsers)); + .map(resolvedUsers -> Tuple3.of(clients, resolvedUsers, unfilteredUsers.size())); }) // Finished filtering unsuitable users @@ -485,6 +500,7 @@ public class TransferServiceImpl implements TransferService { .flatMap(context -> { var clients = context.element1; var users = context.element2; + var totalUsersCount = context.element3; var client = clients.stream().skip(ThreadLocalRandom.current().nextInt(clients.size())).findFirst().orElseThrow(); AtomicInteger processedUsersStats = new AtomicInteger(0); return Flux @@ -539,13 +555,13 @@ public class TransferServiceImpl implements TransferService { }) .delayElements(App.getSettingsService().getDelayBetweenAdds()) .collect(Collectors.toSet()) - .map(resolvedUsers -> Tuple2.of(clients, resolvedUsers)); + .map(resolvedUsers -> Tuple3.of(clients, resolvedUsers, totalUsersCount)); }) // Finished transferring users .doOnNext(context -> { - App.getLogService().append(Level.INFO, "Transfer done. Transferred " + transferredSuccessfullyUsersStats.get() + "/" + context.element2.size() + " users"); + App.getLogService().append(Level.INFO, "Transfer done. Transferred " + transferredSuccessfullyUsersStats.get() + "/" + context.element3 + " users"); }) .then(percentageConsumer.apply(100))