Keep userbots in group

This commit is contained in:
Andrea Cavalli 2020-10-20 20:09:29 +02:00
parent e0a862bf40
commit 52019d505a
3 changed files with 71 additions and 37 deletions

View File

@ -9,18 +9,18 @@
<mountPoint id="157" />
</mountPoints>
<entries>
<dirEntry mountPoint="157" file="/home/ubuntu/IdeaProjects/TransferBot/target/lib" entryMode="subdir" subDirectory="lib" />
<fileEntry mountPoint="157" file="/home/ubuntu/IdeaProjects/TransferBot/target/TransferBot-1.0-SNAPSHOT.jar" />
</entries>
</files>
<launchers>
<launcher name="transferbot" id="58">
<executable name="transferbot" executableDir="." executableMode="gui" />
<java mainClass="it.cavallium/it.cavallium.App" mainMode="module">
<modulePath>
<directory location="classes" failOnError="false" />
<java mainClass="it.cavallium.Launcher">
<classPath>
<archive location="TransferBot-1.0-SNAPSHOT.jar" failOnError="false" />
</classPath>
<modulePath>
<archive location="TransferBot-1.0-SNAPSHOT.jar" failOnError="false" />
<directory location="lib" failOnError="false" />
</modulePath>
</java>
</launcher>

View File

@ -15,12 +15,17 @@ import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.jni.TdApi.UpdateNewChat;
import it.tdlight.jni.TdApi.UpdateSupergroup;
import it.tdlight.jni.TdApi.UpdateSupergroupFullInfo;
import it.tdlight.jni.TdApi.User;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlibsession.td.easy.AsyncTdEasy;
import it.tdlight.utils.MonoUtils;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
@ -32,6 +37,7 @@ public class TransferClient {
private final String alias;
private final AsyncTdEasy client;
private final AtomicReference<User> clientUser = new AtomicReference<>(null);
private final ConcurrentHashMap<Integer, Supergroup> supergroupInfos = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, SupergroupFullInfo> supergroupFullInfos = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, Chat> chats = new ConcurrentHashMap<>();
@ -51,6 +57,10 @@ public class TransferClient {
.subscribe();
}
public User getClientUser() {
return Objects.requireNonNull(clientUser.get(), "Userbot details not already received!");
}
private Mono<Void> onUpdate(Update update) {
switch (update.getConstructor()) {
case UpdateSupergroup.CONSTRUCTOR:
@ -71,7 +81,15 @@ public class TransferClient {
private Mono<Void> onUpdateAuthorizationState(AuthorizationState authorizationState) {
switch (authorizationState.getConstructor()) {
case AuthorizationStateReady.CONSTRUCTOR:
return TransferUtils.getAllHomeChats(this).flatMap(this::onChat).then();
return TransferUtils
.getAllHomeChats(this)
.timeout(Duration.ofMinutes(2))
.flatMap(this::onChat)
.then(client.<User>send(new TdApi.GetMe())
.flatMap(MonoUtils::orElseThrow)
.timeout(Duration.ofSeconds(30))
.doOnSuccess(this.clientUser::set))
.then();
default:
return Mono.empty();
}

View File

@ -8,6 +8,7 @@ import com.google.i18n.phonenumbers.PhoneNumberUtil.PhoneNumberFormat;
import com.google.i18n.phonenumbers.Phonenumber.PhoneNumber;
import com.hazelcast.cp.internal.util.Tuple2;
import com.hazelcast.cp.internal.util.Tuple3;
import io.vertx.core.impl.ConcurrentHashSet;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.AuthorizationStateClosing;
@ -16,6 +17,7 @@ import it.tdlight.jni.TdApi.AuthorizationStateReady;
import it.tdlight.jni.TdApi.ChatMemberStatusAdministrator;
import it.tdlight.jni.TdApi.ChatMemberStatusCreator;
import it.tdlight.jni.TdApi.ChatMemberStatusLeft;
import it.tdlight.jni.TdApi.GetMe;
import it.tdlight.jni.TdApi.GetUserPrivacySettingRules;
import it.tdlight.jni.TdApi.Ok;
import it.tdlight.jni.TdApi.Supergroup;
@ -189,7 +191,7 @@ public class TransferServiceImpl implements TransferService {
})
.then();
}))
.doOnSuccess((_v) -> {
.doOnSuccess((clientUser) -> {
var newClient = new TransferClient(alias, client);
clients.put(phoneNumberLong, newClient);
@ -313,10 +315,10 @@ public class TransferServiceImpl implements TransferService {
.send(new TdApi.GetMe())
.timeout(Duration.ofSeconds(5))
.flatMap(MonoUtils::orElseThrow)
.then(client.<SupergroupFullInfo>send(new TdApi.GetSupergroupFullInfo(sourceGroup.getSupergroupIdInt())))
.flatMap(_v -> client.<SupergroupFullInfo>send(new TdApi.GetSupergroupFullInfo(sourceGroup.getSupergroupIdInt())))
.flatMap(MonoUtils::orElseThrow)
.timeout(Duration.ofSeconds(5))
.then(client.<Supergroup>send(new TdApi.GetSupergroup(sourceGroup.getSupergroupIdInt())))
.flatMap(_v -> client.<Supergroup>send(new TdApi.GetSupergroup(sourceGroup.getSupergroupIdInt())))
.flatMap(MonoUtils::orElseThrow)
.timeout(Duration.ofSeconds(5))
.filter(sourceGroupFullInfo -> {
@ -347,7 +349,7 @@ public class TransferServiceImpl implements TransferService {
.send(new TdApi.GetMe())
.flatMap(MonoUtils::orElseThrow)
.timeout(Duration.ofSeconds(5))
.then(client.<Supergroup>send(new TdApi.GetSupergroup(destGroup.getSupergroupIdInt())))
.flatMap(_v -> client.<Supergroup>send(new TdApi.GetSupergroup(destGroup.getSupergroupIdInt())))
.flatMap(MonoUtils::orElseThrow)
.timeout(Duration.ofSeconds(5))
.filter(destGroupFullInfo -> {
@ -397,15 +399,20 @@ public class TransferServiceImpl implements TransferService {
return phaseDescriptionConsumer.apply("Obtaining group members")
.then(percentageConsumer.apply(5)).thenReturn(clients);
}).flatMap(clients -> {
// Get the first userbot
var client = clients.stream().findAny().orElseThrow(() -> new NullPointerException("No userbots found"));
// Get the members of the source group
return Mono.fromCallable(() -> {
var members = TransferUtils.getSupergroupMembers(client, sourceGroup.getSupergroupIdInt());
App.getLogService().append(Level.INFO, "Source group has " + members.size() + " members.");
return members;
}).subscribeOn(Schedulers.boundedElastic()).map(members -> Tuple3.of(client, clients, members));
return Flux
.fromIterable(clients)
.flatMap(client -> {
return Mono.fromCallable(() -> {
var members = TransferUtils.getSupergroupMembers(client, sourceGroup.getSupergroupIdInt());
App.getLogService().append(Level.INFO, "Source group has " + members.size() + " members.");
return members;
});
}
)
.last()
.subscribeOn(Schedulers.boundedElastic())
.map(members -> Tuple2.of(clients, members));
})
// Finished getting the list of members of the source group
@ -415,20 +422,23 @@ public class TransferServiceImpl implements TransferService {
.then(percentageConsumer.apply(10)).thenReturn(context);
})
.flatMap(context -> {
var client = context.element1;
var clients = context.element2;
var unresolvedUsers = context.element3;
var clients = context.element1;
var unresolvedUsers = context.element2;
return Flux
.fromIterable(unresolvedUsers)
.flatMap(userId -> client.<User>send(new TdApi.GetUser(userId)))
.flatMap(MonoFxUtils::orElseLogSkipError)
.timeout(Duration.ofMinutes(2))
.onErrorResume(error -> {
App.getLogService().append(Level.WARN, "Error while resolving an user: " + error);
return Mono.empty();
})
.collect(Collectors.toSet())
.map(resolvedUsers -> Tuple3.of(client, clients, resolvedUsers));
.fromIterable(clients)
.flatMap(client -> Flux
.fromIterable(unresolvedUsers)
.flatMap(userId -> client.<User>send(new TdApi.GetUser(userId)))
.flatMap(MonoFxUtils::orElseLogSkipError)
.timeout(Duration.ofMinutes(2))
.onErrorResume(error -> {
App.getLogService().append(Level.WARN, "Error while resolving an user: " + error);
return Mono.empty();
})
.collect(Collectors.toSet())
)
.last()
.map(resolvedUsers -> Tuple2.of(clients, resolvedUsers));
})
// Finished resolving users
@ -438,11 +448,16 @@ public class TransferServiceImpl implements TransferService {
.then(percentageConsumer.apply(15)).thenReturn(context);
})
.flatMap(context -> {
var client = context.element1;
var clients = context.element2;
var unfilteredUsers = context.element3;
var clients = context.element1;
var unfilteredUsers = context.element2;
return Flux
.fromIterable(unfilteredUsers)
.<User>flatMap(user -> {
if (this.clients.values().stream().map(TransferClient::getClientUser).noneMatch(clientUser -> clientUser.id == user.id)) {
return Mono.just(user);
}
return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.NOT_REGULAR_USER, "")).then(Mono.empty());
})
.<User>flatMap(user -> {
if (user.haveAccess) {
return Mono.just(user);
@ -473,7 +488,7 @@ public class TransferServiceImpl implements TransferService {
.thenReturn(user);
})
.collect(Collectors.toSet())
.map(resolvedUsers -> Tuple2.of(clients, resolvedUsers));
.map(resolvedUsers -> Tuple3.of(clients, resolvedUsers, unfilteredUsers.size()));
})
// Finished filtering unsuitable users
@ -485,6 +500,7 @@ public class TransferServiceImpl implements TransferService {
.flatMap(context -> {
var clients = context.element1;
var users = context.element2;
var totalUsersCount = context.element3;
var client = clients.stream().skip(ThreadLocalRandom.current().nextInt(clients.size())).findFirst().orElseThrow();
AtomicInteger processedUsersStats = new AtomicInteger(0);
return Flux
@ -539,13 +555,13 @@ public class TransferServiceImpl implements TransferService {
})
.delayElements(App.getSettingsService().getDelayBetweenAdds())
.collect(Collectors.toSet())
.map(resolvedUsers -> Tuple2.of(clients, resolvedUsers));
.map(resolvedUsers -> Tuple3.of(clients, resolvedUsers, totalUsersCount));
})
// Finished transferring users
.doOnNext(context -> {
App.getLogService().append(Level.INFO, "Transfer done. Transferred " + transferredSuccessfullyUsersStats.get() + "/" + context.element2.size() + " users");
App.getLogService().append(Level.INFO, "Transfer done. Transferred " + transferredSuccessfullyUsersStats.get() + "/" + context.element3 + " users");
})
.then(percentageConsumer.apply(100))