package it.cavallium; import static it.cavallium.PrimaryController.getUserbotPhoneNumber; import com.google.i18n.phonenumbers.NumberParseException; import com.google.i18n.phonenumbers.PhoneNumberUtil; import com.google.i18n.phonenumbers.PhoneNumberUtil.PhoneNumberFormat; import com.google.i18n.phonenumbers.Phonenumber.PhoneNumber; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.AuthorizationStateClosing; import it.tdlight.jni.TdApi.AuthorizationStateLoggingOut; import it.tdlight.jni.TdApi.AuthorizationStateReady; import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.UpdateSupergroup; import it.tdlight.tdlibsession.td.easy.AsyncTdEasy; import it.tdlight.tdlibsession.td.easy.ParameterInfoPasswordHint; import it.tdlight.tdlibsession.td.easy.TdEasySettings; import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.tdlibsession.td.middle.direct.AsyncTdMiddleDirect; import it.tdlight.utils.MonoUtils; import java.io.File; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.reactivestreams.Publisher; import org.slf4j.LoggerFactory; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class TransferServiceImpl implements TransferService { private final TdClusterManager clusterManager; private final ConcurrentHashMap clients = new ConcurrentHashMap<>(); private int apiId; private String apiHash; public TransferServiceImpl(TdClusterManager clusterManager) { this.clusterManager = clusterManager; } @Override public void setApiId(int apiId) { this.apiId = apiId; } @Override public void setApiHash(String apiHash) { this.apiHash = apiHash; } @Override public Mono addUserbot(PhoneNumber phoneNumber, Function> codeSupplier, BiFunction> otpSupplier, Supplier> onUserbotClosed) { long phoneNumberLong = getLongPhoneNumber(phoneNumber); if (clients.containsKey(phoneNumberLong)) { return Mono.just(AddUserBotResult.newFailed("Userbot already added!")); } String alias = PhoneNumberUtil.getInstance().format(phoneNumber, PhoneNumberFormat.INTERNATIONAL); return Mono .fromCallable(() -> { return AsyncTdMiddleDirect.getAndDeployInstance(clusterManager, alias, "" + phoneNumberLong); }) .subscribeOn(Schedulers.boundedElastic()) .flatMap(v -> v) .map(middle -> new AsyncTdEasy(middle, alias)).flatMap(client -> { return client .execute(new TdApi.SetLogVerbosityLevel(0)) .then(client .create(TdEasySettings .newBuilder() .setUseMessageDatabase(false) .setUseFileDatabase(false) .setUseChatInfoDatabase(false) .setApiId(apiId) .setApiHash(apiHash) .setEnableStorageOptimizer(false) .setApplicationVersion(App.VERSION) .setDatabaseDirectory("sessions" + File.separator + "userbot_" + phoneNumberLong) .setIgnoreFileNames(true) .setPhoneNumber(phoneNumberLong) .setSystemLanguageCode("en") .setDeviceModel(System.getProperty("os.name")) .setSystemVersion(System.getProperty("os.version")) .setParameterRequestHandler((parameter, parameterInfo) -> { switch (parameter) { case ASK_FIRST_NAME: return Mono.just("FirstName"); case ASK_LAST_NAME: return Mono.just("LastName"); case ASK_CODE: return codeSupplier .apply(client) .map(i -> "" + i) .switchIfEmpty(client .send(new TdApi.Close()) .materialize() .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) .dematerialize() .then(Mono.empty())); case ASK_PASSWORD: return otpSupplier .apply(client, ((ParameterInfoPasswordHint) parameterInfo).getHint()) .switchIfEmpty(client .send(new TdApi.Close()) .materialize() .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) .dematerialize() .then(Mono.empty())); case NOTIFY_LINK: default: return Mono.empty(); } }) .build()) .then(Mono.defer(() -> { var clientStateFlux = client.getState().skip(1).publish().autoConnect(2); clientStateFlux .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR || state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR || state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR) .take(1) .singleOrEmpty() .then() .materialize() .flatMap(signal -> onUserbotClosed.get().thenReturn(signal)) .dematerialize() .subscribe(state -> System.out.println("Userbot closed with state: " + state)); clientStateFlux.subscribe(state -> System.out.println("state: " + state)); client .getIncomingUpdates() .flatMap(update -> onClientUpdate(update)) .subscribe(_v -> {}, e -> System.err.println(e)); return clientStateFlux .filter(state -> state.getConstructor() == AuthorizationStateClosing.CONSTRUCTOR || state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR || state.getConstructor() == AuthorizationStateLoggingOut.CONSTRUCTOR || state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR) .take(1) .singleOrEmpty() .doOnNext(state -> System.out.println("aState: " + state)) .handle((state, sink) -> { if (state.getConstructor() == AuthorizationStateReady.CONSTRUCTOR) { sink.complete(); } else { sink.error(new Exception(state.getClass().getSimpleName())); } }) .then(); })) .doOnSuccess((_v) -> clients.put(phoneNumberLong, new TransferClient(client)))); }) .map(_v -> AddUserBotResult.newSuccess()); } private Mono onClientUpdate(Update update) { return Mono.empty(); } private static long getLongPhoneNumber(PhoneNumber phoneNumber) { return Long.parseLong(PhoneNumberUtil .getInstance() .format(phoneNumber, PhoneNumberFormat.E164) .replace("+", "")); } @Override public Mono closeUserbot(PhoneNumber phoneNumber) { var client = clients.remove(getLongPhoneNumber(phoneNumber)); if (client == null) { return Mono.error(new Exception("Userbot " + phoneNumber + " was not found!")); } else { return MonoUtils.thenOrError(client.send(new TdApi.Close())); } } @Override public Set getPhoneNumbers() { var phonenumbers = new HashSet(); clients.forEach((phoneNumberLong, client) -> { try { phonenumbers.add(getUserbotPhoneNumber("+" + phoneNumberLong)); } catch (NumberParseException e) { // Can't happen e.printStackTrace(); } }); return phonenumbers; } @Override public Set getAdminSupergroups(boolean canRestrictMembers, boolean canInviteUsers) { var adminSupergroups = clients .values() .stream() .flatMap((TransferClient transferClient) -> transferClient .getAdminSupergroups(canRestrictMembers, canInviteUsers) .stream()) .collect(Collectors.toSet()); return adminSupergroups; } @Override public Mono quit() { return Flux .fromIterable(clients.values()) .flatMap(client -> client.send(new TdApi.Close())) .log() .collectList() .then(); } }