2020-10-17 01:54:06 +02:00
package it.cavallium ;
import static it.cavallium.PrimaryController.getUserbotPhoneNumber ;
2020-11-03 22:48:10 +01:00
import static it.cavallium.StaticSettings.REMOVE_MEMBERS_FROM_SOURCE_SUPERGROUP ;
import static it.cavallium.StaticSettings.requiresAdminPrivilegesOnSourceSupergroup ;
2020-10-17 01:54:06 +02:00
import com.google.i18n.phonenumbers.NumberParseException ;
import com.google.i18n.phonenumbers.PhoneNumberUtil ;
import com.google.i18n.phonenumbers.PhoneNumberUtil.PhoneNumberFormat ;
import com.google.i18n.phonenumbers.Phonenumber.PhoneNumber ;
2020-10-20 00:31:11 +02:00
import com.hazelcast.cp.internal.util.Tuple2 ;
import com.hazelcast.cp.internal.util.Tuple3 ;
2020-10-20 20:09:29 +02:00
import io.vertx.core.impl.ConcurrentHashSet ;
2020-10-17 01:54:06 +02:00
import it.tdlight.jni.TdApi ;
import it.tdlight.jni.TdApi.AuthorizationStateClosed ;
import it.tdlight.jni.TdApi.AuthorizationStateClosing ;
import it.tdlight.jni.TdApi.AuthorizationStateLoggingOut ;
import it.tdlight.jni.TdApi.AuthorizationStateReady ;
2020-10-20 00:31:11 +02:00
import it.tdlight.jni.TdApi.ChatMemberStatusAdministrator ;
import it.tdlight.jni.TdApi.ChatMemberStatusCreator ;
2020-10-20 02:14:04 +02:00
import it.tdlight.jni.TdApi.ChatMemberStatusLeft ;
2020-11-03 22:48:10 +01:00
import it.tdlight.jni.TdApi.ChatMemberStatusMember ;
2020-10-20 20:09:29 +02:00
import it.tdlight.jni.TdApi.GetMe ;
2020-10-20 00:31:11 +02:00
import it.tdlight.jni.TdApi.GetUserPrivacySettingRules ;
2020-10-20 02:14:04 +02:00
import it.tdlight.jni.TdApi.Ok ;
2020-10-20 00:31:11 +02:00
import it.tdlight.jni.TdApi.Supergroup ;
import it.tdlight.jni.TdApi.SupergroupFullInfo ;
2020-10-17 01:54:06 +02:00
import it.tdlight.jni.TdApi.Update ;
2020-10-20 00:31:11 +02:00
import it.tdlight.jni.TdApi.User ;
import it.tdlight.jni.TdApi.UserFullInfo ;
import it.tdlight.jni.TdApi.UserTypeRegular ;
2020-10-20 02:14:04 +02:00
import it.tdlight.tdlibsession.td.TdError ;
2020-10-17 01:54:06 +02:00
import it.tdlight.tdlibsession.td.easy.AsyncTdEasy ;
import it.tdlight.tdlibsession.td.easy.ParameterInfoPasswordHint ;
import it.tdlight.tdlibsession.td.easy.TdEasySettings ;
import it.tdlight.tdlibsession.td.middle.TdClusterManager ;
import it.tdlight.tdlibsession.td.middle.direct.AsyncTdMiddleDirect ;
import it.tdlight.utils.MonoUtils ;
2020-10-20 02:14:04 +02:00
import it.tdlight.utils.TdLightUtils ;
2020-10-20 00:31:11 +02:00
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet ;
import it.unimi.dsi.fastutil.objects.ObjectSets ;
2020-10-17 01:54:06 +02:00
import java.io.File ;
2020-10-20 00:31:11 +02:00
import java.time.Duration ;
2020-10-17 01:54:06 +02:00
import java.util.HashSet ;
2020-10-20 00:31:11 +02:00
import java.util.List ;
2020-10-17 01:54:06 +02:00
import java.util.Set ;
import java.util.concurrent.ConcurrentHashMap ;
2020-10-20 02:14:04 +02:00
import java.util.concurrent.ThreadLocalRandom ;
import java.util.concurrent.atomic.AtomicInteger ;
2020-10-17 01:54:06 +02:00
import java.util.function.BiFunction ;
import java.util.function.Function ;
import java.util.function.Supplier ;
import java.util.stream.Collectors ;
2020-10-20 00:31:11 +02:00
import org.slf4j.event.Level ;
2020-10-17 01:54:06 +02:00
import reactor.core.publisher.EmitterProcessor ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Mono ;
2020-10-19 17:47:45 +02:00
import reactor.core.scheduler.Scheduler ;
2020-10-17 01:54:06 +02:00
import reactor.core.scheduler.Schedulers ;
public class TransferServiceImpl implements TransferService {
private final TdClusterManager clusterManager ;
private final ConcurrentHashMap < Long , TransferClient > clients = new ConcurrentHashMap < > ( ) ;
2020-10-19 19:13:55 +02:00
private final ConcurrentHashMap < Integer , Set < TransferClient > > supergroupClients = new ConcurrentHashMap < > ( ) ;
private final EmitterProcessor < ItemUpdate < TransferClient > > newClients = EmitterProcessor . create ( ) ;
2020-10-19 17:47:45 +02:00
private final Scheduler updatesScheduler ;
2020-10-17 01:54:06 +02:00
private int apiId ;
private String apiHash ;
public TransferServiceImpl ( TdClusterManager clusterManager ) {
this . clusterManager = clusterManager ;
2020-10-19 17:47:45 +02:00
this . updatesScheduler = Schedulers . boundedElastic ( ) ;
2020-10-17 01:54:06 +02:00
}
@Override
public void setApiId ( int apiId ) {
this . apiId = apiId ;
}
@Override
public void setApiHash ( String apiHash ) {
this . apiHash = apiHash ;
}
@Override
public Mono < AddUserBotResult > addUserbot ( PhoneNumber phoneNumber ,
Function < AsyncTdEasy , Mono < Integer > > codeSupplier ,
BiFunction < AsyncTdEasy , String , Mono < String > > otpSupplier ,
Supplier < Mono < Void > > onUserbotClosed ) {
long phoneNumberLong = getLongPhoneNumber ( phoneNumber ) ;
if ( clients . containsKey ( phoneNumberLong ) ) {
return Mono . just ( AddUserBotResult . newFailed ( " Userbot already added! " ) ) ;
}
String alias = PhoneNumberUtil . getInstance ( ) . format ( phoneNumber , PhoneNumberFormat . INTERNATIONAL ) ;
return Mono
. fromCallable ( ( ) - > {
return AsyncTdMiddleDirect . getAndDeployInstance ( clusterManager , alias , " " + phoneNumberLong ) ;
} )
. subscribeOn ( Schedulers . boundedElastic ( ) )
. flatMap ( v - > v )
2020-10-17 13:44:59 +02:00
. map ( middle - > new AsyncTdEasy ( middle , alias ) ) . flatMap ( client - > {
2020-10-17 01:54:06 +02:00
return client
2020-10-17 13:44:59 +02:00
. execute ( new TdApi . SetLogVerbosityLevel ( 0 ) )
. then ( client
. create ( TdEasySettings
. newBuilder ( )
. setUseMessageDatabase ( false )
. setUseFileDatabase ( false )
. setUseChatInfoDatabase ( false )
. setApiId ( apiId )
. setApiHash ( apiHash )
. setEnableStorageOptimizer ( false )
. setApplicationVersion ( App . VERSION )
. setDatabaseDirectory ( " sessions " + File . separator + " userbot_ " + phoneNumberLong )
. setIgnoreFileNames ( true )
. setPhoneNumber ( phoneNumberLong )
. setSystemLanguageCode ( " en " )
. setDeviceModel ( System . getProperty ( " os.name " ) )
. setSystemVersion ( System . getProperty ( " os.version " ) )
. setParameterRequestHandler ( ( parameter , parameterInfo ) - > {
switch ( parameter ) {
case ASK_FIRST_NAME :
return Mono . just ( " FirstName " ) ;
case ASK_LAST_NAME :
return Mono . just ( " LastName " ) ;
case ASK_CODE :
return codeSupplier
. apply ( client )
. map ( i - > " " + i )
. switchIfEmpty ( client
. send ( new TdApi . Close ( ) )
. materialize ( )
. flatMap ( signal - > onUserbotClosed . get ( ) . thenReturn ( signal ) )
. dematerialize ( )
. then ( Mono . empty ( ) ) ) ;
case ASK_PASSWORD :
return otpSupplier
. apply ( client , ( ( ParameterInfoPasswordHint ) parameterInfo ) . getHint ( ) )
. switchIfEmpty ( client
. send ( new TdApi . Close ( ) )
. materialize ( )
. flatMap ( signal - > onUserbotClosed . get ( ) . thenReturn ( signal ) )
. dematerialize ( )
. then ( Mono . empty ( ) ) ) ;
case NOTIFY_LINK :
default :
return Mono . empty ( ) ;
}
} )
. build ( ) )
. then ( Mono . defer ( ( ) - > {
2020-10-19 17:47:45 +02:00
var clientStateFlux = client . getState ( ) . publish ( ) . autoConnect ( 3 ) ;
2020-10-17 13:44:59 +02:00
clientStateFlux
. filter ( state - > state . getConstructor ( ) = = AuthorizationStateClosing . CONSTRUCTOR
| | state . getConstructor ( ) = = AuthorizationStateClosed . CONSTRUCTOR
| | state . getConstructor ( ) = = AuthorizationStateLoggingOut . CONSTRUCTOR )
. take ( 1 )
. singleOrEmpty ( )
. then ( )
. materialize ( )
. flatMap ( signal - > onUserbotClosed . get ( ) . thenReturn ( signal ) )
. dematerialize ( )
2020-10-19 17:47:45 +02:00
. subscribeOn ( updatesScheduler )
2020-10-19 16:00:16 +02:00
. subscribe ( state - > System . out . println ( " Userbot closed with state: " + state ) ) ;
2020-10-19 17:47:45 +02:00
clientStateFlux . subscribeOn ( updatesScheduler ) . subscribe ( state - > System . out . println ( " state: " + state ) ) ;
2020-10-17 13:44:59 +02:00
client
. getIncomingUpdates ( )
2020-10-19 17:47:45 +02:00
. subscribeOn ( updatesScheduler )
2020-10-19 16:00:16 +02:00
. flatMap ( update - > onClientUpdate ( update ) )
. subscribe ( _v - > { } , e - > System . err . println ( e ) ) ;
2020-10-17 13:44:59 +02:00
return clientStateFlux
2020-10-19 17:47:45 +02:00
. subscribeOn ( updatesScheduler )
2020-10-17 13:44:59 +02:00
. filter ( state - > state . getConstructor ( ) = = AuthorizationStateClosing . CONSTRUCTOR
| | state . getConstructor ( ) = = AuthorizationStateClosed . CONSTRUCTOR
| | state . getConstructor ( ) = = AuthorizationStateLoggingOut . CONSTRUCTOR
| | state . getConstructor ( ) = = AuthorizationStateReady . CONSTRUCTOR )
. take ( 1 )
. singleOrEmpty ( )
. doOnNext ( state - > System . out . println ( " aState: " + state ) )
. handle ( ( state , sink ) - > {
if ( state . getConstructor ( ) = = AuthorizationStateReady . CONSTRUCTOR ) {
sink . complete ( ) ;
} else {
sink . error ( new Exception ( state . getClass ( ) . getSimpleName ( ) ) ) ;
}
} )
. then ( ) ;
} ) )
2020-10-20 20:09:29 +02:00
. doOnSuccess ( ( clientUser ) - > {
2020-10-20 00:31:11 +02:00
var newClient = new TransferClient ( alias , client ) ;
2020-10-19 19:13:55 +02:00
clients . put ( phoneNumberLong , newClient ) ;
2020-10-20 00:31:11 +02:00
newClient . subscribeAdminSupergroups ( ) . doOnNext ( supergroupInfoItemUpdate - > {
if ( supergroupInfoItemUpdate . isRemoved ( ) ) {
supergroupClients . compute ( supergroupInfoItemUpdate . getItem ( ) . getBaseChatInfo ( ) . getSupergroupIdInt ( ) , ( sgId , clients ) - > {
if ( clients ! = null ) {
clients . remove ( newClient ) ;
}
return clients ;
} ) ;
} else {
supergroupClients . compute ( supergroupInfoItemUpdate . getItem ( ) . getBaseChatInfo ( ) . getSupergroupIdInt ( ) , ( sgId , clients ) - > {
if ( clients = = null ) {
clients = ObjectSets . synchronize ( new ObjectOpenHashSet < > ( ) ) ;
}
clients . add ( newClient ) ;
return clients ;
} ) ;
}
} ) . subscribe ( ) ;
2020-10-19 19:13:55 +02:00
newClients . onNext ( new ItemUpdate < > ( false , newClient ) ) ;
} ) ) ;
2020-10-17 01:54:06 +02:00
} )
. map ( _v - > AddUserBotResult . newSuccess ( ) ) ;
}
private Mono < Void > onClientUpdate ( Update update ) {
return Mono . empty ( ) ;
}
private static long getLongPhoneNumber ( PhoneNumber phoneNumber ) {
return Long . parseLong ( PhoneNumberUtil
. getInstance ( )
. format ( phoneNumber , PhoneNumberFormat . E164 )
. replace ( " + " , " " ) ) ;
}
@Override
public Mono < Void > closeUserbot ( PhoneNumber phoneNumber ) {
var client = clients . remove ( getLongPhoneNumber ( phoneNumber ) ) ;
2020-10-19 19:13:55 +02:00
newClients . onNext ( new ItemUpdate < > ( true , client ) ) ;
2020-10-17 01:54:06 +02:00
if ( client = = null ) {
return Mono . error ( new Exception ( " Userbot " + phoneNumber + " was not found! " ) ) ;
} else {
return MonoUtils . thenOrError ( client . send ( new TdApi . Close ( ) ) ) ;
}
}
@Override
public Set < PhoneNumber > getPhoneNumbers ( ) {
var phonenumbers = new HashSet < PhoneNumber > ( ) ;
clients . forEach ( ( phoneNumberLong , client ) - > {
try {
phonenumbers . add ( getUserbotPhoneNumber ( " + " + phoneNumberLong ) ) ;
} catch ( NumberParseException e ) {
// Can't happen
e . printStackTrace ( ) ;
}
} ) ;
return phonenumbers ;
}
@Override
public Set < BaseChatInfo > getAdminSupergroups ( boolean canRestrictMembers , boolean canInviteUsers ) {
var adminSupergroups = clients
. values ( )
. stream ( )
. flatMap ( ( TransferClient transferClient ) - > transferClient
. getAdminSupergroups ( canRestrictMembers , canInviteUsers )
. stream ( ) )
. collect ( Collectors . toSet ( ) ) ;
return adminSupergroups ;
}
2020-10-19 19:13:55 +02:00
@Override
public Flux < ItemUpdate < SupergroupInfo > > subscribeAdminSupergroups ( ) {
return Flux . merge ( this . newClients , Flux
. fromStream ( clients . values ( ) . stream ( ) . map ( client - > new ItemUpdate < > ( false , client ) ) ) )
. filter ( itemClient - > ! itemClient . isRemoved ( ) )
. map ( ItemUpdate : : getItem )
2020-10-20 00:31:11 +02:00
. map ( TransferClient : : subscribeAdminSupergroups )
2020-10-19 19:13:55 +02:00
. flatMap ( f - > f ) ;
}
@Override
public Mono < Void > transferMembers ( BaseChatInfo sourceGroup ,
BaseChatInfo destGroup ,
2020-10-20 00:31:11 +02:00
Function < UserStatus , Mono < Void > > userStatusConsumer ,
Function < Integer , Mono < Void > > percentageConsumer ,
Function < String , Mono < Void > > phaseDescriptionConsumer ) {
var sourceSupergroupClients = this . supergroupClients
. getOrDefault ( sourceGroup . getSupergroupIdInt ( ) , Set . of ( ) )
. stream ( )
. filter ( clients : : containsValue )
. collect ( Collectors . toSet ( ) ) ;
if ( sourceSupergroupClients . isEmpty ( ) ) {
return Mono . error ( new Exception ( " No userbot can remove members from the source group " ) ) ;
}
var destSupergroupClients = this . supergroupClients
. getOrDefault ( destGroup . getSupergroupIdInt ( ) , Set . of ( ) )
. stream ( )
. filter ( clients : : containsValue )
. collect ( Collectors . toSet ( ) ) ;
if ( destSupergroupClients . isEmpty ( ) ) {
return Mono . error ( new Exception ( " No userbot can add members to the destination group " ) ) ;
}
2020-10-20 02:14:04 +02:00
AtomicInteger transferredSuccessfullyUsersStats = new AtomicInteger ( 0 ) ;
2020-10-20 00:31:11 +02:00
return percentageConsumer
. apply ( 0 )
. then ( phaseDescriptionConsumer . apply ( " Transfer from " + sourceGroup . getTitle ( ) + " to " + destGroup . getTitle ( ) ) )
// Check and get the set of userbots that can transfer users from group X to group Y
2020-11-03 22:48:10 +01:00
. then ( phaseDescriptionConsumer . apply ( " Checking available userbots for " + ( REMOVE_MEMBERS_FROM_SOURCE_SUPERGROUP ? " removing " : " managing " ) + " users in the source group " ) )
2020-10-20 00:31:11 +02:00
. thenMany ( Flux . fromIterable ( sourceSupergroupClients ) )
. flatMap ( client - > client
. send ( new TdApi . GetMe ( ) )
. timeout ( Duration . ofSeconds ( 5 ) )
2020-10-20 02:14:04 +02:00
. flatMap ( MonoUtils : : orElseThrow )
2020-10-20 20:09:29 +02:00
. flatMap ( _v - > client . < SupergroupFullInfo > send ( new TdApi . GetSupergroupFullInfo ( sourceGroup . getSupergroupIdInt ( ) ) ) )
2020-10-20 02:14:04 +02:00
. flatMap ( MonoUtils : : orElseThrow )
2020-10-20 00:31:11 +02:00
. timeout ( Duration . ofSeconds ( 5 ) )
2020-10-20 20:09:29 +02:00
. flatMap ( _v - > client . < Supergroup > send ( new TdApi . GetSupergroup ( sourceGroup . getSupergroupIdInt ( ) ) ) )
2020-10-20 02:14:04 +02:00
. flatMap ( MonoUtils : : orElseThrow )
2020-10-20 00:31:11 +02:00
. timeout ( Duration . ofSeconds ( 5 ) )
. filter ( sourceGroupFullInfo - > {
2020-11-03 22:48:10 +01:00
if ( requiresAdminPrivilegesOnSourceSupergroup ( ) ) {
if ( sourceGroupFullInfo . status . getConstructor ( ) = = ChatMemberStatusAdministrator . CONSTRUCTOR ) {
var statusAdmin = ( ChatMemberStatusAdministrator ) sourceGroupFullInfo . status ;
if ( statusAdmin . canRestrictMembers ) {
return true ;
} else {
App . getLogService ( ) . append ( Level . WARN , " Userbot " + client + " failed: Can't restrict members of group " + sourceGroup . getTitle ( ) ) ;
return false ;
}
} else if ( sourceGroupFullInfo . status . getConstructor ( ) = = ChatMemberStatusCreator . CONSTRUCTOR ) {
2020-10-20 00:31:11 +02:00
return true ;
} else {
2020-11-03 22:48:10 +01:00
App . getLogService ( ) . append ( Level . WARN , " Userbot " + client + " failed: Can't administer group " + sourceGroup . getTitle ( ) ) ;
return false ;
2020-10-20 00:31:11 +02:00
}
} else {
2020-11-03 22:48:10 +01:00
switch ( sourceGroupFullInfo . status . getConstructor ( ) ) {
case ChatMemberStatusAdministrator . CONSTRUCTOR :
case ChatMemberStatusCreator . CONSTRUCTOR :
case ChatMemberStatusMember . CONSTRUCTOR :
return true ;
default :
App . getLogService ( ) . append ( Level . WARN , " Userbot " + client + " failed: Can't access group " + sourceGroup . getTitle ( ) + " (probably restricted, left or banned) " ) ;
return false ;
}
2020-10-20 00:31:11 +02:00
}
} )
. map ( _v - > client )
. onErrorResume ( e - > {
App . getLogService ( ) . append ( Level . WARN , " Userbot " + client + " failed: " + e . getLocalizedMessage ( ) ) ;
return Mono . empty ( ) ;
} ) )
. collect ( Collectors . toSet ( ) )
. flatMap ( transferSourceClients - > {
return phaseDescriptionConsumer . apply ( " Checking available userbots for adding users in the destination group " )
. thenMany ( Flux . fromIterable ( destSupergroupClients ) )
. flatMap ( client - > client
. send ( new TdApi . GetMe ( ) )
2020-10-20 02:14:04 +02:00
. flatMap ( MonoUtils : : orElseThrow )
2020-10-20 00:31:11 +02:00
. timeout ( Duration . ofSeconds ( 5 ) )
2020-10-20 20:09:29 +02:00
. flatMap ( _v - > client . < Supergroup > send ( new TdApi . GetSupergroup ( destGroup . getSupergroupIdInt ( ) ) ) )
2020-10-20 02:14:04 +02:00
. flatMap ( MonoUtils : : orElseThrow )
2020-10-20 00:31:11 +02:00
. timeout ( Duration . ofSeconds ( 5 ) )
. filter ( destGroupFullInfo - > {
2020-10-20 02:14:04 +02:00
if ( destGroupFullInfo . status . getConstructor ( ) = = ChatMemberStatusAdministrator . CONSTRUCTOR ) {
var statusAdmin = ( ChatMemberStatusAdministrator ) destGroupFullInfo . status ;
if ( statusAdmin . canInviteUsers ) {
2020-10-20 00:31:11 +02:00
return true ;
} else {
2020-10-20 02:14:04 +02:00
App . getLogService ( ) . append ( Level . WARN , " Userbot " + client + " failed: Can't invite members to group " + destGroup . getTitle ( ) ) ;
2020-10-20 00:31:11 +02:00
}
2020-10-20 02:14:04 +02:00
} else if ( destGroupFullInfo . status . getConstructor ( ) = = ChatMemberStatusCreator . CONSTRUCTOR ) {
return true ;
2020-10-20 00:31:11 +02:00
} else {
2020-10-20 02:14:04 +02:00
App . getLogService ( ) . append ( Level . WARN , " Userbot " + client + " failed: Can't administer group " + destGroup . getTitle ( ) ) ;
2020-10-20 00:31:11 +02:00
}
return false ;
} )
. map ( _v - > client )
. onErrorResume ( e - > {
App . getLogService ( ) . append ( Level . WARN , " Userbot " + client + " failed: " + e . getLocalizedMessage ( ) ) ;
return Mono . empty ( ) ;
} ) )
. collect ( Collectors . toSet ( ) )
. map ( transferDestClients - > Tuple2 . of ( transferSourceClients , transferDestClients ) ) ;
} )
. map ( clientsTuple - > {
var sourceClients = clientsTuple . element1 ;
var destClients = clientsTuple . element2 ;
App . getLogService ( ) . append ( Level . INFO , " Found source userbots: " + sourceClients . stream ( ) . map ( TransferClient : : toString ) . collect ( Collectors . joining ( " , " ) ) ) ;
App . getLogService ( ) . append ( Level . INFO , " Found destination userbots: " + destClients . stream ( ) . map ( TransferClient : : toString ) . collect ( Collectors . joining ( " , " ) ) ) ;
var chosenClients = new HashSet < TransferClient > ( sourceClients ) ;
chosenClients . retainAll ( destClients ) ;
return chosenClients ;
} )
. filter ( chosenClients - > ! chosenClients . isEmpty ( ) )
. doOnNext ( chosenClients - > {
App . getLogService ( ) . append ( Level . INFO , " Chosen userbots: " + chosenClients . stream ( ) . map ( TransferClient : : toString ) . collect ( Collectors . joining ( " , " ) ) ) ;
} )
. switchIfEmpty ( Mono . defer ( ( ) - > {
App . getLogService ( ) . append ( Level . ERROR , " No userbots are admin in both groups! " ) ;
return Mono . error ( new Exception ( " No userbots are admin in both groups! " ) ) ;
} ) )
// Now we have a set of userbots that can transfer the users
// Get the list of members of the first group from a bot
. flatMap ( clients - > {
return phaseDescriptionConsumer . apply ( " Obtaining group members " )
. then ( percentageConsumer . apply ( 5 ) ) . thenReturn ( clients ) ;
} ) . flatMap ( clients - > {
// Get the members of the source group
2020-10-20 20:09:29 +02:00
return Flux
. fromIterable ( clients )
. flatMap ( client - > {
return Mono . fromCallable ( ( ) - > {
var members = TransferUtils . getSupergroupMembers ( client , sourceGroup . getSupergroupIdInt ( ) ) ;
App . getLogService ( ) . append ( Level . INFO , " Source group has " + members . size ( ) + " members. " ) ;
return members ;
} ) ;
}
)
. last ( )
2020-10-20 22:25:49 +02:00
. onErrorMap ( error - > {
error . printStackTrace ( ) ;
return new Exception ( " Error while obtaining source group members: " + error . getLocalizedMessage ( ) , error ) ;
} )
2020-10-20 20:09:29 +02:00
. subscribeOn ( Schedulers . boundedElastic ( ) )
. map ( members - > Tuple2 . of ( clients , members ) ) ;
2020-10-20 00:31:11 +02:00
} )
// Finished getting the list of members of the source group
// Resolve users
. flatMap ( context - > {
return phaseDescriptionConsumer . apply ( " Resolving users " )
. then ( percentageConsumer . apply ( 10 ) ) . thenReturn ( context ) ;
} )
. flatMap ( context - > {
2020-10-20 20:09:29 +02:00
var clients = context . element1 ;
var unresolvedUsers = context . element2 ;
2020-10-20 00:31:11 +02:00
return Flux
2020-10-20 20:09:29 +02:00
. fromIterable ( clients )
. flatMap ( client - > Flux
. fromIterable ( unresolvedUsers )
. flatMap ( userId - > client . < User > send ( new TdApi . GetUser ( userId ) ) )
. flatMap ( MonoFxUtils : : orElseLogSkipError )
. timeout ( Duration . ofMinutes ( 2 ) )
. onErrorResume ( error - > {
App . getLogService ( ) . append ( Level . WARN , " Error while resolving an user: " + error ) ;
return Mono . empty ( ) ;
} )
. collect ( Collectors . toSet ( ) )
)
. last ( )
. map ( resolvedUsers - > Tuple2 . of ( clients , resolvedUsers ) ) ;
2020-10-20 00:31:11 +02:00
} )
// Finished resolving users
// Filter out unsuitable users
. flatMap ( context - > {
return phaseDescriptionConsumer . apply ( " Filtering users " )
. then ( percentageConsumer . apply ( 15 ) ) . thenReturn ( context ) ;
} )
. flatMap ( context - > {
2020-10-20 20:09:29 +02:00
var clients = context . element1 ;
var unfilteredUsers = context . element2 ;
2020-10-20 00:31:11 +02:00
return Flux
. fromIterable ( unfilteredUsers )
2020-10-20 20:09:29 +02:00
. < User > flatMap ( user - > {
if ( this . clients . values ( ) . stream ( ) . map ( TransferClient : : getClientUser ) . noneMatch ( clientUser - > clientUser . id = = user . id ) ) {
return Mono . just ( user ) ;
}
return userStatusConsumer . apply ( new UserStatus ( getName ( user ) , user . id , UserStatusType . NOT_REGULAR_USER , " " ) ) . then ( Mono . empty ( ) ) ;
} )
2020-10-20 02:14:04 +02:00
. < User > flatMap ( user - > {
2020-10-20 00:31:11 +02:00
if ( user . haveAccess ) {
2020-10-20 02:14:04 +02:00
return Mono . just ( user ) ;
2020-10-20 00:31:11 +02:00
}
2020-10-20 02:14:04 +02:00
return userStatusConsumer . apply ( new UserStatus ( getName ( user ) , user . id , UserStatusType . NO_ACCESS_HASH , " " ) ) . then ( Mono . empty ( ) ) ;
2020-10-20 00:31:11 +02:00
} )
2020-10-20 02:14:04 +02:00
. flatMap ( user - > {
2020-10-20 00:31:11 +02:00
if ( user . type . getConstructor ( ) = = UserTypeRegular . CONSTRUCTOR ) {
2020-10-20 02:14:04 +02:00
return Mono . just ( user ) ;
2020-10-20 00:31:11 +02:00
}
2020-10-20 02:14:04 +02:00
return userStatusConsumer . apply ( new UserStatus ( getName ( user ) , user . id , UserStatusType . NOT_REGULAR_USER , " " ) ) . then ( Mono . empty ( ) ) ;
2020-10-20 00:31:11 +02:00
} )
2020-10-20 02:14:04 +02:00
. flatMap ( user - > {
2020-10-20 00:31:11 +02:00
if ( ! user . isScam ) {
2020-10-20 02:14:04 +02:00
return Mono . just ( user ) ;
2020-10-20 00:31:11 +02:00
}
2020-10-20 02:14:04 +02:00
return userStatusConsumer . apply ( new UserStatus ( getName ( user ) , user . id , UserStatusType . SCAM_USER , " " ) ) . then ( Mono . empty ( ) ) ;
2020-10-20 00:31:11 +02:00
} )
2020-10-20 02:14:04 +02:00
. flatMap ( user - > {
2020-10-20 00:31:11 +02:00
if ( user . restrictionReason = = null | | user . restrictionReason . isEmpty ( ) ) {
2020-10-20 02:14:04 +02:00
return Mono . just ( user ) ;
2020-10-20 00:31:11 +02:00
}
2020-10-20 02:14:04 +02:00
return userStatusConsumer . apply ( new UserStatus ( getName ( user ) , user . id , UserStatusType . RESTRICTED_USER , " Restricted user: " + user . restrictionReason ) ) . then ( Mono . empty ( ) ) ;
2020-10-20 00:31:11 +02:00
} )
. flatMap ( user - > {
return userStatusConsumer
. apply ( new UserStatus ( getName ( user ) , user . id , UserStatusType . JUST_FOUND , " " ) )
. thenReturn ( user ) ;
} )
. collect ( Collectors . toSet ( ) )
2020-10-20 20:09:29 +02:00
. map ( resolvedUsers - > Tuple3 . of ( clients , resolvedUsers , unfilteredUsers . size ( ) ) ) ;
2020-10-20 00:31:11 +02:00
} )
// Finished filtering unsuitable users
2020-10-20 02:14:04 +02:00
// Transfer users
. flatMap ( context - > {
return phaseDescriptionConsumer . apply ( " Transferring users " )
. then ( percentageConsumer . apply ( 20 ) ) . thenReturn ( context ) ;
} )
. flatMap ( context - > {
var clients = context . element1 ;
var users = context . element2 ;
2020-10-20 20:09:29 +02:00
var totalUsersCount = context . element3 ;
2020-10-20 02:14:04 +02:00
var client = clients . stream ( ) . skip ( ThreadLocalRandom . current ( ) . nextInt ( clients . size ( ) ) ) . findFirst ( ) . orElseThrow ( ) ;
AtomicInteger processedUsersStats = new AtomicInteger ( 0 ) ;
return Flux
. fromIterable ( users )
. flatMap ( user - > {
return percentageConsumer
. apply ( 20 + processedUsersStats . getAndIncrement ( ) / users . size ( ) * ( 100 - 20 ) )
. thenReturn ( user ) ;
} )
. < User > flatMap ( user - > {
return client . < Ok > send ( new TdApi . AddChatMember ( destGroup . getSupergroupId ( ) , user . id , 0 ) )
. flatMap ( result - > {
if ( result . failed ( ) ) {
if ( TdLightUtils . errorEquals ( new TdError ( result . cause ( ) . code , result . cause ( ) . message ) , 403 , " USER_PRIVACY_RESTRICTED " ) ) {
return userStatusConsumer . apply ( new UserStatus ( getName ( user ) , user . id , UserStatusType . USER_PRIVACY_RESTRICTED , " " ) ) . then ( Mono . empty ( ) ) ;
} else if ( TdLightUtils . errorEquals ( new TdError ( result . cause ( ) . code , result . cause ( ) . message ) , 400 , " USER_NOT_MUTUAL_CONTACT " ) ) {
return userStatusConsumer . apply ( new UserStatus ( getName ( user ) , user . id , UserStatusType . USER_NOT_MUTUAL_CONTACT , " " ) ) . then ( Mono . empty ( ) ) ;
}
}
return Mono . just ( result ) ;
} )
. flatMap ( MonoUtils : : orElseThrow )
. timeout ( Duration . ofMinutes ( 2 ) )
. onErrorResume ( ( error ) - > {
App . getLogService ( ) . append ( Level . WARN , " Can't add user \" " + getName ( user ) + " \" to supergroup \" " + destGroup . getSupergroupIdInt ( ) + " " + destGroup . getTitle ( ) + " \" " ) ;
return userStatusConsumer . apply ( new UserStatus ( getName ( user ) , user . id , UserStatusType . CANT_ADD , " Can't add to destination supergroup: " + error . getLocalizedMessage ( ) ) ) . then ( Mono . empty ( ) ) ;
} )
. flatMap ( _v - > {
return userStatusConsumer . apply ( new UserStatus ( getName ( user ) , user . id , UserStatusType . ADDED_AND_WAITING_TO_BE_REMOVED , " " ) ) . then ( Mono . empty ( ) ) . thenReturn ( user ) ;
} ) ;
} )
. < User > flatMap ( user - > {
2020-11-03 22:48:10 +01:00
if ( REMOVE_MEMBERS_FROM_SOURCE_SUPERGROUP ) {
// Remove the user from the source supergroup
return client . < Ok > send ( new TdApi . SetChatMemberStatus ( sourceGroup . getSupergroupId ( ) , user . id , new ChatMemberStatusLeft ( ) ) )
. flatMap ( result - > {
if ( result . failed ( ) ) {
if ( TdLightUtils . errorEquals ( new TdError ( result . cause ( ) . code , result . cause ( ) . message ) , 3 , " Can't remove chat owner " ) ) {
return userStatusConsumer . apply ( new UserStatus ( getName ( user ) , user . id , UserStatusType . CANT_REMOVE_CHAT_OWNER , " " ) ) . then ( Mono . empty ( ) ) ;
}
2020-10-20 02:14:04 +02:00
}
2020-11-03 22:48:10 +01:00
return Mono . just ( result ) ;
} )
. flatMap ( MonoUtils : : orElseThrow )
. timeout ( Duration . ofMinutes ( 2 ) )
. onErrorResume ( ( error ) - > {
App . getLogService ( ) . append ( Level . WARN , " Can't remove user \" " + getName ( user ) + " \" from supergroup \" " + sourceGroup . getSupergroupIdInt ( ) + " " + sourceGroup . getTitle ( ) + " \" " ) ;
return userStatusConsumer . apply ( new UserStatus ( getName ( user ) , user . id , UserStatusType . CANT_REMOVE , " Can't remove from source supergroup: " + error . getLocalizedMessage ( ) ) ) . then ( Mono . empty ( ) ) ;
} )
. flatMap ( _v - > {
transferredSuccessfullyUsersStats . incrementAndGet ( ) ;
return userStatusConsumer . apply ( new UserStatus ( getName ( user ) , user . id , UserStatusType . DONE , " " ) ) . then ( Mono . empty ( ) ) . thenReturn ( user ) ;
} ) ;
} else {
// Don't remove the user from the source supergroup
return Mono . just ( user ) ;
}
2020-10-20 02:14:04 +02:00
} )
2020-10-20 02:54:32 +02:00
. delayElements ( App . getSettingsService ( ) . getDelayBetweenAdds ( ) )
2020-10-20 02:14:04 +02:00
. collect ( Collectors . toSet ( ) )
2020-10-20 20:09:29 +02:00
. map ( resolvedUsers - > Tuple3 . of ( clients , resolvedUsers , totalUsersCount ) ) ;
2020-10-20 02:14:04 +02:00
} )
// Finished transferring users
. doOnNext ( context - > {
2020-10-20 20:09:29 +02:00
App . getLogService ( ) . append ( Level . INFO , " Transfer done. Transferred " + transferredSuccessfullyUsersStats . get ( ) + " / " + context . element3 + " users " ) ;
2020-10-20 02:14:04 +02:00
} )
2020-10-20 00:31:11 +02:00
. then ( percentageConsumer . apply ( 100 ) )
. then ( phaseDescriptionConsumer . apply ( " Done " ) )
. then ( Mono . delay ( Duration . ofMillis ( 500 ) ) )
. then ( ) ;
}
private static String getName ( User user ) {
return String . join ( " " , List . of ( " " + user . id , user . firstName , user . lastName ) ) ;
2020-10-19 19:13:55 +02:00
}
2020-10-17 01:54:06 +02:00
@Override
public Mono < Void > quit ( ) {
return Flux
. fromIterable ( clients . values ( ) )
. flatMap ( client - > client . send ( new TdApi . Close ( ) ) )
2020-10-20 02:14:04 +02:00
. onErrorResume ( error - > {
App . getLogService ( ) . append ( Level . ERROR , " Can't close a tdlib instance: " + error . getLocalizedMessage ( ) ) ;
return Mono . empty ( ) ;
} )
2020-10-17 01:54:06 +02:00
. collectList ( )
. then ( ) ;
}
}