616 lines
26 KiB
Java
616 lines
26 KiB
Java
package it.cavallium;
|
|
|
|
import static it.cavallium.PrimaryController.getUserbotPhoneNumber;
|
|
import static it.cavallium.StaticSettings.REMOVE_MEMBERS_FROM_SOURCE_SUPERGROUP;
|
|
import static it.cavallium.StaticSettings.requiresAdminPrivilegesOnSourceSupergroup;
|
|
|
|
import com.google.i18n.phonenumbers.NumberParseException;
|
|
import com.google.i18n.phonenumbers.PhoneNumberUtil;
|
|
import com.google.i18n.phonenumbers.PhoneNumberUtil.PhoneNumberFormat;
|
|
import com.google.i18n.phonenumbers.Phonenumber.PhoneNumber;
|
|
import com.hazelcast.cp.internal.util.Tuple2;
|
|
import com.hazelcast.cp.internal.util.Tuple3;
|
|
import io.vertx.core.impl.ConcurrentHashSet;
|
|
import it.tdlight.jni.TdApi;
|
|
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
|
|
import it.tdlight.jni.TdApi.AuthorizationStateClosing;
|
|
import it.tdlight.jni.TdApi.AuthorizationStateLoggingOut;
|
|
import it.tdlight.jni.TdApi.AuthorizationStateReady;
|
|
import it.tdlight.jni.TdApi.ChatMemberStatusAdministrator;
|
|
import it.tdlight.jni.TdApi.ChatMemberStatusCreator;
|
|
import it.tdlight.jni.TdApi.ChatMemberStatusLeft;
|
|
import it.tdlight.jni.TdApi.ChatMemberStatusMember;
|
|
import it.tdlight.jni.TdApi.GetMe;
|
|
import it.tdlight.jni.TdApi.GetUserPrivacySettingRules;
|
|
import it.tdlight.jni.TdApi.Ok;
|
|
import it.tdlight.jni.TdApi.Supergroup;
|
|
import it.tdlight.jni.TdApi.SupergroupFullInfo;
|
|
import it.tdlight.jni.TdApi.Update;
|
|
import it.tdlight.jni.TdApi.User;
|
|
import it.tdlight.jni.TdApi.UserFullInfo;
|
|
import it.tdlight.jni.TdApi.UserTypeRegular;
|
|
import it.tdlight.tdlibsession.td.TdError;
|
|
import it.tdlight.tdlibsession.td.easy.AsyncTdEasy;
|
|
import it.tdlight.tdlibsession.td.easy.ParameterInfoPasswordHint;
|
|
import it.tdlight.tdlibsession.td.easy.TdEasySettings;
|
|
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
|
|
import it.tdlight.tdlibsession.td.middle.direct.AsyncTdMiddleDirect;
|
|
import it.tdlight.utils.MonoUtils;
|
|
import it.tdlight.utils.TdLightUtils;
|
|
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
|
|
import it.unimi.dsi.fastutil.objects.ObjectSets;
|
|
import java.io.File;
|
|
import java.time.Duration;
|
|
import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.Set;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.function.BiFunction;
|
|
import java.util.function.Function;
|
|
import java.util.function.Supplier;
|
|
import java.util.stream.Collectors;
|
|
import org.slf4j.event.Level;
|
|
import reactor.core.publisher.EmitterProcessor;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.scheduler.Scheduler;
|
|
import reactor.core.scheduler.Schedulers;
|
|
|
|
public class TransferServiceImpl implements TransferService {
|
|
|
|
private final TdClusterManager clusterManager;
|
|
private final ConcurrentHashMap<Long, TransferClient> clients = new ConcurrentHashMap<>();
|
|
private final ConcurrentHashMap<Integer, Set<TransferClient>> supergroupClients = new ConcurrentHashMap<>();
|
|
private final EmitterProcessor<ItemUpdate<TransferClient>> newClients = EmitterProcessor.create();
|
|
private final Scheduler updatesScheduler;
|
|
private int apiId;
|
|
private String apiHash;
|
|
|
|
public TransferServiceImpl(TdClusterManager clusterManager) {
|
|
this.clusterManager = clusterManager;
|
|
this.updatesScheduler = Schedulers.boundedElastic();
|
|
}
|
|
|
|
@Override
|
|
public void setApiId(int apiId) {
|
|
this.apiId = apiId;
|
|
}
|
|
|
|
@Override
|
|
public void setApiHash(String apiHash) {
|
|
this.apiHash = apiHash;
|
|
}
|
|
|
|
@Override
|
|
public Mono<AddUserBotResult> addUserbot(PhoneNumber phoneNumber,
|
|
Function<AsyncTdEasy, Mono<Integer>> codeSupplier,
|
|
BiFunction<AsyncTdEasy, String, Mono<String>> otpSupplier,
|
|
Supplier<Mono<Void>> onUserbotClosed) {
|
|
|
|
long phoneNumberLong = getLongPhoneNumber(phoneNumber);
|
|
|
|
if (clients.containsKey(phoneNumberLong)) {
|
|
return Mono.just(AddUserBotResult.newFailed("Userbot already added!"));
|
|
}
|
|
|
|
String alias = PhoneNumberUtil.getInstance().format(phoneNumber, PhoneNumberFormat.INTERNATIONAL);
|
|
return Mono
|
|
.fromCallable(() -> {
|
|
return AsyncTdMiddleDirect.getAndDeployInstance(clusterManager, alias, "" + phoneNumberLong);
|
|
})
|
|
.subscribeOn(Schedulers.boundedElastic())
|
|
.flatMap(v -> v)
|
|
.map(middle -> new AsyncTdEasy(middle, alias)).flatMap(client -> {
|
|
return client
|
|
.execute(new TdApi.SetLogVerbosityLevel(0))
|
|
.then(client
|
|
.create(TdEasySettings
|
|
.newBuilder()
|
|
.setUseMessageDatabase(false)
|
|
.setUseFileDatabase(false)
|
|
.setUseChatInfoDatabase(false)
|
|
.setApiId(apiId)
|
|
.setApiHash(apiHash)
|
|
.setEnableStorageOptimizer(false)
|
|
.setApplicationVersion(App.VERSION)
|
|
.setDatabaseDirectory("sessions" + File.separator + "userbot_" + phoneNumberLong)
|
|
.setIgnoreFileNames(true)
|
|
.setPhoneNumber(phoneNumberLong)
|
|
.setSystemLanguageCode("en")
|
|
.setDeviceModel(System.getProperty("os.name"))
|
|
.setSystemVersion(System.getProperty("os.version"))
|
|
.setParameterRequestHandler((parameter, parameterInfo) -> {
|
|
switch (parameter) {
|
|
case ASK_FIRST_NAME:
|
|
return Mono.just("FirstName");
|
|
case ASK_LAST_NAME:
|
|
return Mono.just("LastName");
|
|
case ASK_CODE:
|
|
return codeSupplier
|
|
.apply(client)
|
|
.map(i -> "" + i)
|
|
.switchIfEmpty(client
|
|
.send(new TdApi.Close())
|
|
.materialize()
|
|
.flatMap(signal -> onUserbotClosed.get().thenReturn(signal))
|
|
.dematerialize()
|
|
.then(Mono.empty()));
|
|
case ASK_PASSWORD:
|
|
return otpSupplier
|
|
.apply(client, ((ParameterInfoPasswordHint) parameterInfo).getHint())
|
|
.switchIfEmpty(client
|
|
.send(new TdApi.Close())
|
|
.materialize()
|
|
.flatMap(signal -> onUserbotClosed.get().thenReturn(signal))
|
|
.dematerialize()
|
|
.then(Mono.empty()));
|
|
case NOTIFY_LINK:
|
|
default:
|
|
return Mono.empty();
|
|
}
|
|
})
|
|
.build())
|
|
.then(Mono.defer(() -> {
|
|
var clientStateFlux = client.getState().publish().autoConnect(3);
|
|
|
|
clientStateFlux
|
|
.filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR
|
|
|| state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR
|
|
|| state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR)
|
|
.take(1)
|
|
.singleOrEmpty()
|
|
.then()
|
|
.materialize()
|
|
.flatMap(signal -> onUserbotClosed.get().thenReturn(signal))
|
|
.dematerialize()
|
|
.subscribeOn(updatesScheduler)
|
|
.subscribe(state -> System.out.println("Userbot closed with state: " + state));
|
|
|
|
clientStateFlux.subscribeOn(updatesScheduler).subscribe(state -> System.out.println("state: " + state));
|
|
|
|
client
|
|
.getIncomingUpdates()
|
|
.subscribeOn(updatesScheduler)
|
|
.flatMap(update -> onClientUpdate(update))
|
|
.subscribe(_v -> {}, e -> System.err.println(e));
|
|
|
|
return clientStateFlux
|
|
.subscribeOn(updatesScheduler)
|
|
.filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR
|
|
|| state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR
|
|
|| state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR
|
|
|| state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR)
|
|
.take(1)
|
|
.singleOrEmpty()
|
|
.doOnNext(state -> System.out.println("aState: " + state))
|
|
.handle((state, sink) -> {
|
|
if (state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR) {
|
|
sink.complete();
|
|
} else {
|
|
sink.error(new Exception(state.getClass().getSimpleName()));
|
|
}
|
|
})
|
|
.then();
|
|
}))
|
|
.doOnSuccess((clientUser) -> {
|
|
var newClient = new TransferClient(alias, client);
|
|
clients.put(phoneNumberLong, newClient);
|
|
|
|
newClient.subscribeAdminSupergroups().doOnNext(supergroupInfoItemUpdate -> {
|
|
if (supergroupInfoItemUpdate.isRemoved()) {
|
|
supergroupClients.compute(supergroupInfoItemUpdate.getItem().getBaseChatInfo().getSupergroupIdInt(), (sgId, clients) -> {
|
|
if (clients != null) {
|
|
clients.remove(newClient);
|
|
}
|
|
return clients;
|
|
});
|
|
} else {
|
|
supergroupClients.compute(supergroupInfoItemUpdate.getItem().getBaseChatInfo().getSupergroupIdInt(), (sgId, clients) -> {
|
|
if (clients == null) {
|
|
clients = ObjectSets.synchronize(new ObjectOpenHashSet<>());
|
|
}
|
|
clients.add(newClient);
|
|
return clients;
|
|
});
|
|
}
|
|
}).subscribe();
|
|
|
|
newClients.onNext(new ItemUpdate<>(false, newClient));
|
|
}));
|
|
})
|
|
.map(_v -> AddUserBotResult.newSuccess());
|
|
}
|
|
|
|
private Mono<Void> onClientUpdate(Update update) {
|
|
return Mono.empty();
|
|
}
|
|
|
|
private static long getLongPhoneNumber(PhoneNumber phoneNumber) {
|
|
return Long.parseLong(PhoneNumberUtil
|
|
.getInstance()
|
|
.format(phoneNumber, PhoneNumberFormat.E164)
|
|
.replace("+", ""));
|
|
}
|
|
|
|
@Override
|
|
public Mono<Void> closeUserbot(PhoneNumber phoneNumber) {
|
|
var client = clients.remove(getLongPhoneNumber(phoneNumber));
|
|
newClients.onNext(new ItemUpdate<>(true, client));
|
|
if (client == null) {
|
|
return Mono.error(new Exception("Userbot " + phoneNumber + " was not found!"));
|
|
} else {
|
|
return MonoUtils.thenOrError(client.send(new TdApi.Close()));
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public Set<PhoneNumber> getPhoneNumbers() {
|
|
var phonenumbers = new HashSet<PhoneNumber>();
|
|
clients.forEach((phoneNumberLong, client) -> {
|
|
try {
|
|
phonenumbers.add(getUserbotPhoneNumber("+" + phoneNumberLong));
|
|
} catch (NumberParseException e) {
|
|
// Can't happen
|
|
e.printStackTrace();
|
|
}
|
|
});
|
|
return phonenumbers;
|
|
}
|
|
|
|
@Override
|
|
public Set<BaseChatInfo> getAdminSupergroups(boolean canRestrictMembers, boolean canInviteUsers) {
|
|
var adminSupergroups = clients
|
|
.values()
|
|
.stream()
|
|
.flatMap((TransferClient transferClient) -> transferClient
|
|
.getAdminSupergroups(canRestrictMembers, canInviteUsers)
|
|
.stream())
|
|
.collect(Collectors.toSet());
|
|
return adminSupergroups;
|
|
}
|
|
|
|
@Override
|
|
public Flux<ItemUpdate<SupergroupInfo>> subscribeAdminSupergroups() {
|
|
return Flux.merge(this.newClients, Flux
|
|
.fromStream(clients.values().stream().map(client -> new ItemUpdate<>(false, client))))
|
|
.filter(itemClient -> !itemClient.isRemoved())
|
|
.map(ItemUpdate::getItem)
|
|
.map(TransferClient::subscribeAdminSupergroups)
|
|
.flatMap(f -> f);
|
|
}
|
|
|
|
@Override
|
|
public Mono<Void> transferMembers(BaseChatInfo sourceGroup,
|
|
BaseChatInfo destGroup,
|
|
Function<UserStatus, Mono<Void>> userStatusConsumer,
|
|
Function<Integer, Mono<Void>> percentageConsumer,
|
|
Function<String, Mono<Void>> phaseDescriptionConsumer) {
|
|
var sourceSupergroupClients = this.supergroupClients
|
|
.getOrDefault(sourceGroup.getSupergroupIdInt(), Set.of())
|
|
.stream()
|
|
.filter(clients::containsValue)
|
|
.collect(Collectors.toSet());
|
|
if (sourceSupergroupClients.isEmpty()) {
|
|
return Mono.error(new Exception("No userbot can remove members from the source group"));
|
|
}
|
|
|
|
var destSupergroupClients = this.supergroupClients
|
|
.getOrDefault(destGroup.getSupergroupIdInt(), Set.of())
|
|
.stream()
|
|
.filter(clients::containsValue)
|
|
.collect(Collectors.toSet());
|
|
if (destSupergroupClients.isEmpty()) {
|
|
return Mono.error(new Exception("No userbot can add members to the destination group"));
|
|
}
|
|
|
|
AtomicInteger transferredSuccessfullyUsersStats = new AtomicInteger(0);
|
|
|
|
return percentageConsumer
|
|
.apply(0)
|
|
.then(phaseDescriptionConsumer.apply("Add users from " + sourceGroup.getTitle() + " to " + destGroup.getTitle()))
|
|
|
|
// Check and get the set of userbots that can transfer users from group X to group Y
|
|
.then(phaseDescriptionConsumer.apply("Checking available userbots for " + (REMOVE_MEMBERS_FROM_SOURCE_SUPERGROUP ? "removing" : "managing") + " users in the source group"))
|
|
.thenMany(Flux.fromIterable(sourceSupergroupClients))
|
|
.flatMap(client -> client
|
|
.send(new TdApi.GetMe())
|
|
.timeout(Duration.ofSeconds(5))
|
|
.flatMap(MonoUtils::orElseThrow)
|
|
.flatMap(_v -> client.<SupergroupFullInfo>send(new TdApi.GetSupergroupFullInfo(sourceGroup.getSupergroupIdInt())))
|
|
.flatMap(MonoUtils::orElseThrow)
|
|
.timeout(Duration.ofSeconds(5))
|
|
.flatMap(_v -> client.<Supergroup>send(new TdApi.GetSupergroup(sourceGroup.getSupergroupIdInt())))
|
|
.flatMap(MonoUtils::orElseThrow)
|
|
.timeout(Duration.ofSeconds(5))
|
|
.filter(sourceGroupFullInfo -> {
|
|
if (requiresAdminPrivilegesOnSourceSupergroup()) {
|
|
if (sourceGroupFullInfo.status.getConstructor() == ChatMemberStatusAdministrator.CONSTRUCTOR) {
|
|
var statusAdmin = (ChatMemberStatusAdministrator) sourceGroupFullInfo.status;
|
|
if (statusAdmin.canRestrictMembers) {
|
|
return true;
|
|
} else {
|
|
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't restrict members of group " + sourceGroup.getTitle());
|
|
return false;
|
|
}
|
|
} else if (sourceGroupFullInfo.status.getConstructor() == ChatMemberStatusCreator.CONSTRUCTOR) {
|
|
return true;
|
|
} else {
|
|
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't administer group " + sourceGroup.getTitle());
|
|
return false;
|
|
}
|
|
} else {
|
|
switch (sourceGroupFullInfo.status.getConstructor()) {
|
|
case ChatMemberStatusAdministrator.CONSTRUCTOR:
|
|
case ChatMemberStatusCreator.CONSTRUCTOR:
|
|
case ChatMemberStatusMember.CONSTRUCTOR:
|
|
return true;
|
|
default:
|
|
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't access group " + sourceGroup.getTitle() + " (probably restricted, left or banned)");
|
|
return false;
|
|
}
|
|
}
|
|
})
|
|
.map(_v -> client)
|
|
.onErrorResume(e -> {
|
|
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: " + e.getLocalizedMessage());
|
|
return Mono.empty();
|
|
}))
|
|
.collect(Collectors.toSet())
|
|
.flatMap(transferSourceClients -> {
|
|
return phaseDescriptionConsumer.apply("Checking available userbots for adding users in the destination group")
|
|
.thenMany(Flux.fromIterable(destSupergroupClients))
|
|
.flatMap(client -> client
|
|
.send(new TdApi.GetMe())
|
|
.flatMap(MonoUtils::orElseThrow)
|
|
.timeout(Duration.ofSeconds(5))
|
|
.flatMap(_v -> client.<Supergroup>send(new TdApi.GetSupergroup(destGroup.getSupergroupIdInt())))
|
|
.flatMap(MonoUtils::orElseThrow)
|
|
.timeout(Duration.ofSeconds(5))
|
|
.filter(destGroupFullInfo -> {
|
|
if (destGroupFullInfo.status.getConstructor() == ChatMemberStatusAdministrator.CONSTRUCTOR) {
|
|
var statusAdmin = (ChatMemberStatusAdministrator) destGroupFullInfo.status;
|
|
if (statusAdmin.canInviteUsers) {
|
|
return true;
|
|
} else {
|
|
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't invite members to group " + destGroup.getTitle());
|
|
}
|
|
} else if (destGroupFullInfo.status.getConstructor() == ChatMemberStatusCreator.CONSTRUCTOR) {
|
|
return true;
|
|
} else {
|
|
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't administer group " + destGroup.getTitle());
|
|
}
|
|
return false;
|
|
})
|
|
.map(_v -> client)
|
|
.onErrorResume(e -> {
|
|
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: " + e.getLocalizedMessage());
|
|
return Mono.empty();
|
|
}))
|
|
.collect(Collectors.toSet())
|
|
.map(transferDestClients -> Tuple2.of(transferSourceClients, transferDestClients));
|
|
})
|
|
.map(clientsTuple -> {
|
|
var sourceClients = clientsTuple.element1;
|
|
var destClients = clientsTuple.element2;
|
|
App.getLogService().append(Level.INFO, "Found source userbots: " + sourceClients.stream().map(TransferClient::toString).collect(Collectors.joining(", ")));
|
|
App.getLogService().append(Level.INFO, "Found destination userbots: " + destClients.stream().map(TransferClient::toString).collect(Collectors.joining(", ")));
|
|
var chosenClients = new HashSet<TransferClient>(sourceClients);
|
|
chosenClients.retainAll(destClients);
|
|
return chosenClients;
|
|
})
|
|
.filter(chosenClients -> !chosenClients.isEmpty())
|
|
.doOnNext(chosenClients -> {
|
|
App.getLogService().append(Level.INFO, "Chosen userbots: " + chosenClients.stream().map(TransferClient::toString).collect(Collectors.joining(", ")));
|
|
})
|
|
.switchIfEmpty(Mono.defer(() -> {
|
|
App.getLogService().append(Level.ERROR, "No userbots are admin in both groups!");
|
|
return Mono.error(new Exception("No userbots are admin in both groups!"));
|
|
}))
|
|
// Now we have a set of userbots that can transfer the users
|
|
|
|
// Get the list of members of the first group from a bot
|
|
.flatMap(clients -> {
|
|
return phaseDescriptionConsumer.apply("Obtaining group members")
|
|
.then(percentageConsumer.apply(5)).thenReturn(clients);
|
|
}).flatMap(clients -> {
|
|
// Get the members of the source group
|
|
return Flux
|
|
.fromIterable(clients)
|
|
.flatMap(client -> {
|
|
return Mono.fromCallable(() -> {
|
|
var members = TransferUtils.getSupergroupMembers(client, sourceGroup.getSupergroupIdInt());
|
|
App.getLogService().append(Level.INFO, "Source group has " + members.size() + " members.");
|
|
return members;
|
|
});
|
|
}
|
|
)
|
|
.last()
|
|
.onErrorMap(error -> {
|
|
error.printStackTrace();
|
|
return new Exception("Error while obtaining source group members: " + error.getLocalizedMessage(), error);
|
|
})
|
|
.subscribeOn(Schedulers.boundedElastic())
|
|
.map(members -> Tuple2.of(clients, members));
|
|
})
|
|
// Finished getting the list of members of the source group
|
|
|
|
// Resolve users
|
|
.flatMap(context -> {
|
|
return phaseDescriptionConsumer.apply("Resolving users")
|
|
.then(percentageConsumer.apply(10)).thenReturn(context);
|
|
})
|
|
.flatMap(context -> {
|
|
var clients = context.element1;
|
|
var unresolvedUsers = context.element2;
|
|
return Flux
|
|
.fromIterable(clients)
|
|
.flatMap(client -> Flux
|
|
.fromIterable(unresolvedUsers)
|
|
.flatMap(userId -> client.<User>send(new TdApi.GetUser(userId)))
|
|
.flatMap(MonoFxUtils::orElseLogSkipError)
|
|
.timeout(Duration.ofMinutes(2))
|
|
.onErrorResume(error -> {
|
|
App.getLogService().append(Level.WARN, "Error while resolving an user: " + error);
|
|
return Mono.empty();
|
|
})
|
|
.collect(Collectors.toSet())
|
|
)
|
|
.last()
|
|
.map(resolvedUsers -> Tuple2.of(clients, resolvedUsers));
|
|
})
|
|
// Finished resolving users
|
|
|
|
// Filter out unsuitable users
|
|
.flatMap(context -> {
|
|
return phaseDescriptionConsumer.apply("Filtering users")
|
|
.then(percentageConsumer.apply(15)).thenReturn(context);
|
|
})
|
|
.flatMap(context -> {
|
|
var clients = context.element1;
|
|
var unfilteredUsers = context.element2;
|
|
return Flux
|
|
.fromIterable(unfilteredUsers)
|
|
.<User>flatMap(user -> {
|
|
if (this.clients.values().stream().map(TransferClient::getClientUser).noneMatch(clientUser -> clientUser.id == user.id)) {
|
|
return Mono.just(user);
|
|
}
|
|
return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.NOT_REGULAR_USER, "")).then(Mono.empty());
|
|
})
|
|
.<User>flatMap(user -> {
|
|
if (user.haveAccess) {
|
|
return Mono.just(user);
|
|
}
|
|
return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.NO_ACCESS_HASH, "")).then(Mono.empty());
|
|
})
|
|
.flatMap(user -> {
|
|
if (user.type.getConstructor() == UserTypeRegular.CONSTRUCTOR) {
|
|
return Mono.just(user);
|
|
}
|
|
return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.NOT_REGULAR_USER, "")).then(Mono.empty());
|
|
})
|
|
.flatMap(user -> {
|
|
if (!user.isScam) {
|
|
return Mono.just(user);
|
|
}
|
|
return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.SCAM_USER, "")).then(Mono.empty());
|
|
})
|
|
.flatMap(user -> {
|
|
if (user.restrictionReason == null || user.restrictionReason.isEmpty()) {
|
|
return Mono.just(user);
|
|
}
|
|
return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.RESTRICTED_USER, "Restricted user: " + user.restrictionReason)).then(Mono.empty());
|
|
})
|
|
.flatMap(user -> {
|
|
return userStatusConsumer
|
|
.apply(new UserStatus(getName(user), user.id, UserStatusType.JUST_FOUND, ""))
|
|
.thenReturn(user);
|
|
})
|
|
.collect(Collectors.toSet())
|
|
.map(resolvedUsers -> Tuple3.of(clients, resolvedUsers, unfilteredUsers.size()));
|
|
})
|
|
// Finished filtering unsuitable users
|
|
|
|
// Transfer users
|
|
.flatMap(context -> {
|
|
return phaseDescriptionConsumer.apply("Adding users")
|
|
.then(percentageConsumer.apply(20)).thenReturn(context);
|
|
})
|
|
.flatMap(context -> {
|
|
var clients = context.element1;
|
|
var users = context.element2;
|
|
var totalUsersCount = context.element3;
|
|
var client = clients.stream().skip(ThreadLocalRandom.current().nextInt(clients.size())).findFirst().orElseThrow();
|
|
AtomicInteger processedUsersStats = new AtomicInteger(0);
|
|
return Flux
|
|
.fromIterable(users)
|
|
.flatMap(user -> {
|
|
return percentageConsumer
|
|
.apply(20 + processedUsersStats.getAndIncrement() / users.size() * (100 - 20))
|
|
.thenReturn(user);
|
|
})
|
|
.<User>flatMap(user -> {
|
|
return client.<Ok>send(new TdApi.AddChatMember(destGroup.getSupergroupId(), user.id, 0))
|
|
.flatMap(result -> {
|
|
if (result.failed()) {
|
|
if (TdLightUtils.errorEquals(new TdError(result.cause().code, result.cause().message), 403, "USER_PRIVACY_RESTRICTED")) {
|
|
return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.USER_PRIVACY_RESTRICTED, "")).then(Mono.empty());
|
|
} else if (TdLightUtils.errorEquals(new TdError(result.cause().code, result.cause().message), 400, "USER_NOT_MUTUAL_CONTACT")) {
|
|
return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.USER_NOT_MUTUAL_CONTACT, "")).then(Mono.empty());
|
|
}
|
|
}
|
|
return Mono.just(result);
|
|
})
|
|
.flatMap(MonoUtils::orElseThrow)
|
|
.timeout(Duration.ofMinutes(2))
|
|
.onErrorResume((error) -> {
|
|
App.getLogService().append(Level.WARN, "Can't add user \"" + getName(user) + "\" to supergroup \"" + destGroup.getSupergroupIdInt() + " " + destGroup.getTitle() + "\"");
|
|
return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.CANT_ADD, "Can't add to destination supergroup: " + error.getLocalizedMessage())).then(Mono.empty());
|
|
})
|
|
.flatMap(_v -> {
|
|
return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.ADDED_AND_WAITING_TO_BE_REMOVED, "")).then(Mono.empty()).thenReturn(user);
|
|
});
|
|
})
|
|
.<User>flatMap(user -> {
|
|
if (REMOVE_MEMBERS_FROM_SOURCE_SUPERGROUP) {
|
|
// Remove the user from the source supergroup
|
|
return client.<Ok>send(new TdApi.SetChatMemberStatus(sourceGroup.getSupergroupId(), user.id, new ChatMemberStatusLeft()))
|
|
.flatMap(result -> {
|
|
if (result.failed()) {
|
|
if (TdLightUtils.errorEquals(new TdError(result.cause().code, result.cause().message), 3, "Can't remove chat owner")) {
|
|
return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.CANT_REMOVE_CHAT_OWNER, "")).then(Mono.empty());
|
|
}
|
|
}
|
|
return Mono.just(result);
|
|
})
|
|
.flatMap(MonoUtils::orElseThrow)
|
|
.timeout(Duration.ofMinutes(2))
|
|
.onErrorResume((error) -> {
|
|
App.getLogService().append(Level.WARN, "Can't remove user \"" + getName(user) + "\" from supergroup \"" + sourceGroup.getSupergroupIdInt() + " " + sourceGroup.getTitle() + "\"");
|
|
return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.CANT_REMOVE, "Can't remove from source supergroup: " + error.getLocalizedMessage())).then(Mono.empty());
|
|
})
|
|
.flatMap(_v -> {
|
|
transferredSuccessfullyUsersStats.incrementAndGet();
|
|
return userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.DONE, "")).then(Mono.empty()).thenReturn(user);
|
|
});
|
|
} else {
|
|
// Don't remove the user from the source supergroup
|
|
return Mono.just(user);
|
|
}
|
|
})
|
|
.delayElements(App.getSettingsService().getDelayBetweenAdds())
|
|
.collect(Collectors.toSet())
|
|
.map(resolvedUsers -> Tuple3.of(clients, resolvedUsers, totalUsersCount));
|
|
})
|
|
// Finished transferring users
|
|
|
|
|
|
.doOnNext(context -> {
|
|
App.getLogService().append(Level.INFO, "Users added. Added " + transferredSuccessfullyUsersStats.get() + "/" + context.element3 + " users");
|
|
})
|
|
|
|
.then(percentageConsumer.apply(100))
|
|
.then(phaseDescriptionConsumer.apply("Done"))
|
|
.then(Mono.delay(Duration.ofMillis(500)))
|
|
.then();
|
|
}
|
|
|
|
private static String getName(User user) {
|
|
return String.join(" ", List.of("" + user.id, user.firstName, user.lastName));
|
|
}
|
|
|
|
@Override
|
|
public Mono<Void> quit() {
|
|
return Flux
|
|
.fromIterable(clients.values())
|
|
.flatMap(client -> client.send(new TdApi.Close()))
|
|
.onErrorResume(error -> {
|
|
App.getLogService().append(Level.ERROR, "Can't close a tdlib instance: " + error.getLocalizedMessage());
|
|
return Mono.empty();
|
|
})
|
|
.collectList()
|
|
.then();
|
|
}
|
|
}
|