TransferBot/src/main/java/it/cavallium/TransferServiceImpl.java
2020-10-20 00:31:11 +02:00

496 lines
20 KiB
Java

package it.cavallium;
import static it.cavallium.PrimaryController.getUserbotPhoneNumber;
import com.google.i18n.phonenumbers.NumberParseException;
import com.google.i18n.phonenumbers.PhoneNumberUtil;
import com.google.i18n.phonenumbers.PhoneNumberUtil.PhoneNumberFormat;
import com.google.i18n.phonenumbers.Phonenumber.PhoneNumber;
import com.hazelcast.cp.internal.util.Tuple2;
import com.hazelcast.cp.internal.util.Tuple3;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.AuthorizationStateClosing;
import it.tdlight.jni.TdApi.AuthorizationStateLoggingOut;
import it.tdlight.jni.TdApi.AuthorizationStateReady;
import it.tdlight.jni.TdApi.ChatMemberStatusAdministrator;
import it.tdlight.jni.TdApi.ChatMemberStatusCreator;
import it.tdlight.jni.TdApi.GetUserPrivacySettingRules;
import it.tdlight.jni.TdApi.Supergroup;
import it.tdlight.jni.TdApi.SupergroupFullInfo;
import it.tdlight.jni.TdApi.Update;
import it.tdlight.jni.TdApi.User;
import it.tdlight.jni.TdApi.UserFullInfo;
import it.tdlight.jni.TdApi.UserTypeRegular;
import it.tdlight.tdlibsession.td.easy.AsyncTdEasy;
import it.tdlight.tdlibsession.td.easy.ParameterInfoPasswordHint;
import it.tdlight.tdlibsession.td.easy.TdEasySettings;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.tdlibsession.td.middle.direct.AsyncTdMiddleDirect;
import it.tdlight.utils.MonoUtils;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import it.unimi.dsi.fastutil.objects.ObjectSets;
import java.io.File;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.event.Level;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class TransferServiceImpl implements TransferService {
private final TdClusterManager clusterManager;
private final ConcurrentHashMap<Long, TransferClient> clients = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, Set<TransferClient>> supergroupClients = new ConcurrentHashMap<>();
private final EmitterProcessor<ItemUpdate<TransferClient>> newClients = EmitterProcessor.create();
private final Scheduler updatesScheduler;
private int apiId;
private String apiHash;
public TransferServiceImpl(TdClusterManager clusterManager) {
this.clusterManager = clusterManager;
this.updatesScheduler = Schedulers.boundedElastic();
}
@Override
public void setApiId(int apiId) {
this.apiId = apiId;
}
@Override
public void setApiHash(String apiHash) {
this.apiHash = apiHash;
}
@Override
public Mono<AddUserBotResult> addUserbot(PhoneNumber phoneNumber,
Function<AsyncTdEasy, Mono<Integer>> codeSupplier,
BiFunction<AsyncTdEasy, String, Mono<String>> otpSupplier,
Supplier<Mono<Void>> onUserbotClosed) {
long phoneNumberLong = getLongPhoneNumber(phoneNumber);
if (clients.containsKey(phoneNumberLong)) {
return Mono.just(AddUserBotResult.newFailed("Userbot already added!"));
}
String alias = PhoneNumberUtil.getInstance().format(phoneNumber, PhoneNumberFormat.INTERNATIONAL);
return Mono
.fromCallable(() -> {
return AsyncTdMiddleDirect.getAndDeployInstance(clusterManager, alias, "" + phoneNumberLong);
})
.subscribeOn(Schedulers.boundedElastic())
.flatMap(v -> v)
.map(middle -> new AsyncTdEasy(middle, alias)).flatMap(client -> {
return client
.execute(new TdApi.SetLogVerbosityLevel(0))
.then(client
.create(TdEasySettings
.newBuilder()
.setUseMessageDatabase(false)
.setUseFileDatabase(false)
.setUseChatInfoDatabase(false)
.setApiId(apiId)
.setApiHash(apiHash)
.setEnableStorageOptimizer(false)
.setApplicationVersion(App.VERSION)
.setDatabaseDirectory("sessions" + File.separator + "userbot_" + phoneNumberLong)
.setIgnoreFileNames(true)
.setPhoneNumber(phoneNumberLong)
.setSystemLanguageCode("en")
.setDeviceModel(System.getProperty("os.name"))
.setSystemVersion(System.getProperty("os.version"))
.setParameterRequestHandler((parameter, parameterInfo) -> {
switch (parameter) {
case ASK_FIRST_NAME:
return Mono.just("FirstName");
case ASK_LAST_NAME:
return Mono.just("LastName");
case ASK_CODE:
return codeSupplier
.apply(client)
.map(i -> "" + i)
.switchIfEmpty(client
.send(new TdApi.Close())
.materialize()
.flatMap(signal -> onUserbotClosed.get().thenReturn(signal))
.dematerialize()
.then(Mono.empty()));
case ASK_PASSWORD:
return otpSupplier
.apply(client, ((ParameterInfoPasswordHint) parameterInfo).getHint())
.switchIfEmpty(client
.send(new TdApi.Close())
.materialize()
.flatMap(signal -> onUserbotClosed.get().thenReturn(signal))
.dematerialize()
.then(Mono.empty()));
case NOTIFY_LINK:
default:
return Mono.empty();
}
})
.build())
.then(Mono.defer(() -> {
var clientStateFlux = client.getState().publish().autoConnect(3);
clientStateFlux
.filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR)
.take(1)
.singleOrEmpty()
.then()
.materialize()
.flatMap(signal -> onUserbotClosed.get().thenReturn(signal))
.dematerialize()
.subscribeOn(updatesScheduler)
.subscribe(state -> System.out.println("Userbot closed with state: " + state));
clientStateFlux.subscribeOn(updatesScheduler).subscribe(state -> System.out.println("state: " + state));
client
.getIncomingUpdates()
.subscribeOn(updatesScheduler)
.flatMap(update -> onClientUpdate(update))
.subscribe(_v -> {}, e -> System.err.println(e));
return clientStateFlux
.subscribeOn(updatesScheduler)
.filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR
|| state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR)
.take(1)
.singleOrEmpty()
.doOnNext(state -> System.out.println("aState: " + state))
.handle((state, sink) -> {
if (state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR) {
sink.complete();
} else {
sink.error(new Exception(state.getClass().getSimpleName()));
}
})
.then();
}))
.doOnSuccess((_v) -> {
var newClient = new TransferClient(alias, client);
clients.put(phoneNumberLong, newClient);
newClient.subscribeAdminSupergroups().doOnNext(supergroupInfoItemUpdate -> {
if (supergroupInfoItemUpdate.isRemoved()) {
supergroupClients.compute(supergroupInfoItemUpdate.getItem().getBaseChatInfo().getSupergroupIdInt(), (sgId, clients) -> {
if (clients != null) {
clients.remove(newClient);
}
return clients;
});
} else {
supergroupClients.compute(supergroupInfoItemUpdate.getItem().getBaseChatInfo().getSupergroupIdInt(), (sgId, clients) -> {
if (clients == null) {
clients = ObjectSets.synchronize(new ObjectOpenHashSet<>());
}
clients.add(newClient);
return clients;
});
}
}).subscribe();
newClients.onNext(new ItemUpdate<>(false, newClient));
}));
})
.map(_v -> AddUserBotResult.newSuccess());
}
private Mono<Void> onClientUpdate(Update update) {
return Mono.empty();
}
private static long getLongPhoneNumber(PhoneNumber phoneNumber) {
return Long.parseLong(PhoneNumberUtil
.getInstance()
.format(phoneNumber, PhoneNumberFormat.E164)
.replace("+", ""));
}
@Override
public Mono<Void> closeUserbot(PhoneNumber phoneNumber) {
var client = clients.remove(getLongPhoneNumber(phoneNumber));
newClients.onNext(new ItemUpdate<>(true, client));
if (client == null) {
return Mono.error(new Exception("Userbot " + phoneNumber + " was not found!"));
} else {
return MonoUtils.thenOrError(client.send(new TdApi.Close()));
}
}
@Override
public Set<PhoneNumber> getPhoneNumbers() {
var phonenumbers = new HashSet<PhoneNumber>();
clients.forEach((phoneNumberLong, client) -> {
try {
phonenumbers.add(getUserbotPhoneNumber("+" + phoneNumberLong));
} catch (NumberParseException e) {
// Can't happen
e.printStackTrace();
}
});
return phonenumbers;
}
@Override
public Set<BaseChatInfo> getAdminSupergroups(boolean canRestrictMembers, boolean canInviteUsers) {
var adminSupergroups = clients
.values()
.stream()
.flatMap((TransferClient transferClient) -> transferClient
.getAdminSupergroups(canRestrictMembers, canInviteUsers)
.stream())
.collect(Collectors.toSet());
return adminSupergroups;
}
@Override
public Flux<ItemUpdate<SupergroupInfo>> subscribeAdminSupergroups() {
return Flux.merge(this.newClients, Flux
.fromStream(clients.values().stream().map(client -> new ItemUpdate<>(false, client))))
.filter(itemClient -> !itemClient.isRemoved())
.map(ItemUpdate::getItem)
.map(TransferClient::subscribeAdminSupergroups)
.flatMap(f -> f);
}
@Override
public Mono<Void> transferMembers(BaseChatInfo sourceGroup,
BaseChatInfo destGroup,
Function<UserStatus, Mono<Void>> userStatusConsumer,
Function<Integer, Mono<Void>> percentageConsumer,
Function<String, Mono<Void>> phaseDescriptionConsumer) {
var sourceSupergroupClients = this.supergroupClients
.getOrDefault(sourceGroup.getSupergroupIdInt(), Set.of())
.stream()
.filter(clients::containsValue)
.collect(Collectors.toSet());
if (sourceSupergroupClients.isEmpty()) {
return Mono.error(new Exception("No userbot can remove members from the source group"));
}
var destSupergroupClients = this.supergroupClients
.getOrDefault(destGroup.getSupergroupIdInt(), Set.of())
.stream()
.filter(clients::containsValue)
.collect(Collectors.toSet());
if (destSupergroupClients.isEmpty()) {
return Mono.error(new Exception("No userbot can add members to the destination group"));
}
return percentageConsumer
.apply(0)
.then(phaseDescriptionConsumer.apply("Transfer from " + sourceGroup.getTitle() + " to " + destGroup.getTitle()))
// Check and get the set of userbots that can transfer users from group X to group Y
.then(phaseDescriptionConsumer.apply("Checking available userbots for removing users in the source group"))
.thenMany(Flux.fromIterable(sourceSupergroupClients))
.flatMap(client -> client
.send(new TdApi.GetMe())
.timeout(Duration.ofSeconds(5))
.then(client.<SupergroupFullInfo>send(new TdApi.GetSupergroupFullInfo(sourceGroup.getSupergroupIdInt())))
.timeout(Duration.ofSeconds(5))
.then(client.<Supergroup>send(new TdApi.GetSupergroup(sourceGroup.getSupergroupIdInt())))
.timeout(Duration.ofSeconds(5))
.filter(sourceGroupFullInfo -> {
if (sourceGroupFullInfo.succeeded()) {
if (sourceGroupFullInfo.result().status.getConstructor() == ChatMemberStatusAdministrator.CONSTRUCTOR) {
var statusAdmin = (ChatMemberStatusAdministrator) sourceGroupFullInfo.result().status;
if (statusAdmin.canRestrictMembers) {
return true;
} else {
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't restrict members of group " + sourceGroup.getTitle());
}
} else if (sourceGroupFullInfo.result().status.getConstructor() == ChatMemberStatusCreator.CONSTRUCTOR) {
return true;
} else {
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't administer group " + sourceGroup.getTitle());
}
} else {
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: " + sourceGroupFullInfo.cause());
}
return false;
})
.map(_v -> client)
.onErrorResume(e -> {
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: " + e.getLocalizedMessage());
return Mono.empty();
}))
.collect(Collectors.toSet())
.flatMap(transferSourceClients -> {
return phaseDescriptionConsumer.apply("Checking available userbots for adding users in the destination group")
.thenMany(Flux.fromIterable(destSupergroupClients))
.flatMap(client -> client
.send(new TdApi.GetMe())
.timeout(Duration.ofSeconds(5))
.then(client.<Supergroup>send(new TdApi.GetSupergroup(destGroup.getSupergroupIdInt())))
.timeout(Duration.ofSeconds(5))
.filter(destGroupFullInfo -> {
if (destGroupFullInfo.succeeded()) {
if (destGroupFullInfo.result().status.getConstructor() == ChatMemberStatusAdministrator.CONSTRUCTOR) {
var statusAdmin = (ChatMemberStatusAdministrator) destGroupFullInfo.result().status;
if (statusAdmin.canInviteUsers) {
return true;
} else {
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't invite members to group " + destGroup.getTitle());
}
} else if (destGroupFullInfo.result().status.getConstructor() == ChatMemberStatusCreator.CONSTRUCTOR) {
return true;
} else {
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: Can't administer group " + destGroup.getTitle());
}
} else {
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: " + destGroupFullInfo.cause());
}
return false;
})
.map(_v -> client)
.onErrorResume(e -> {
App.getLogService().append(Level.WARN, "Userbot " + client + " failed: " + e.getLocalizedMessage());
return Mono.empty();
}))
.collect(Collectors.toSet())
.map(transferDestClients -> Tuple2.of(transferSourceClients, transferDestClients));
})
.map(clientsTuple -> {
var sourceClients = clientsTuple.element1;
var destClients = clientsTuple.element2;
App.getLogService().append(Level.INFO, "Found source userbots: " + sourceClients.stream().map(TransferClient::toString).collect(Collectors.joining(", ")));
App.getLogService().append(Level.INFO, "Found destination userbots: " + destClients.stream().map(TransferClient::toString).collect(Collectors.joining(", ")));
var chosenClients = new HashSet<TransferClient>(sourceClients);
chosenClients.retainAll(destClients);
return chosenClients;
})
.filter(chosenClients -> !chosenClients.isEmpty())
.doOnNext(chosenClients -> {
App.getLogService().append(Level.INFO, "Chosen userbots: " + chosenClients.stream().map(TransferClient::toString).collect(Collectors.joining(", ")));
})
.switchIfEmpty(Mono.defer(() -> {
App.getLogService().append(Level.ERROR, "No userbots are admin in both groups!");
return Mono.error(new Exception("No userbots are admin in both groups!"));
}))
// Now we have a set of userbots that can transfer the users
// Get the list of members of the first group from a bot
.flatMap(clients -> {
return phaseDescriptionConsumer.apply("Obtaining group members")
.then(percentageConsumer.apply(5)).thenReturn(clients);
}).flatMap(clients -> {
// Get the first userbot
var client = clients.stream().findAny().orElseThrow(() -> new NullPointerException("No userbots found"));
// Get the members of the source group
return Mono.fromCallable(() -> {
var members = TransferUtils.getSupergroupMembers(client, sourceGroup.getSupergroupIdInt());
App.getLogService().append(Level.INFO, "Source group has " + members.size() + " members.");
return members;
}).subscribeOn(Schedulers.boundedElastic()).map(members -> Tuple3.of(client, clients, members));
})
// Finished getting the list of members of the source group
// Resolve users
.flatMap(context -> {
return phaseDescriptionConsumer.apply("Resolving users")
.then(percentageConsumer.apply(10)).thenReturn(context);
})
.flatMap(context -> {
var client = context.element1;
var clients = context.element2;
var unresolvedUsers = context.element3;
return Flux
.fromIterable(unresolvedUsers)
.flatMap(userId -> client.<User>send(new TdApi.GetUser(userId)))
.timeout(Duration.ofMinutes(2))
.flatMap(MonoFxUtils::orElseLogSkipError)
.collect(Collectors.toSet())
.map(resolvedUsers -> Tuple3.of(client, clients, resolvedUsers));
})
// Finished resolving users
// Filter out unsuitable users
.flatMap(context -> {
return phaseDescriptionConsumer.apply("Filtering users")
.then(percentageConsumer.apply(15)).thenReturn(context);
})
.flatMap(context -> {
var client = context.element1;
var clients = context.element2;
var unfilteredUsers = context.element3;
return Flux
.fromIterable(unfilteredUsers)
.filter(user -> {
if (user.haveAccess) {
return true;
}
userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.NO_ACCESS_HASH, ""));
return false;
})
.filter(user -> {
if (user.type.getConstructor() == UserTypeRegular.CONSTRUCTOR) {
return true;
}
userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.NOT_REGULAR_USER, ""));
return false;
})
.filter(user -> {
if (!user.isScam) {
return true;
}
userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.SCAM_USER, ""));
return false;
})
.filter(user -> {
if (user.restrictionReason == null || user.restrictionReason.isEmpty()) {
return true;
}
userStatusConsumer.apply(new UserStatus(getName(user), user.id, UserStatusType.RESTRICTED_USER, "Restricted user: " + user.restrictionReason));
return false;
})
.flatMap(user -> {
return userStatusConsumer
.apply(new UserStatus(getName(user), user.id, UserStatusType.JUST_FOUND, ""))
.thenReturn(user);
})
.timeout(Duration.ofMinutes(2))
.collect(Collectors.toSet())
.map(resolvedUsers -> Tuple3.of(client, clients, resolvedUsers));
})
// Finished filtering unsuitable users
.then(percentageConsumer.apply(100))
.then(phaseDescriptionConsumer.apply("Done"))
.then(Mono.delay(Duration.ofMillis(500)))
.then();
}
private static String getName(User user) {
return String.join(" ", List.of("" + user.id, user.firstName, user.lastName));
}
@Override
public Mono<Void> quit() {
return Flux
.fromIterable(clients.values())
.flatMap(client -> client.send(new TdApi.Close()))
.collectList()
.then();
}
}