SequenceDispatcher: hide usage into NetQueryDispatcher

This commit is contained in:
Arseny Smirnov 2022-02-01 18:51:20 +03:00
parent 950876b496
commit ad38f712e8
20 changed files with 103 additions and 99 deletions

View File

@ -3034,7 +3034,7 @@ class GetMegagroupStatsQuery final : public Td::ResultHandler {
flags |= telegram_api::stats_getMegagroupStats::DARK_MASK;
}
send_query(G()->net_query_creator().create(
telegram_api::stats_getMegagroupStats(flags, false /*ignored*/, std::move(input_channel)), dc_id));
telegram_api::stats_getMegagroupStats(flags, false /*ignored*/, std::move(input_channel)), {}, dc_id));
}
void on_result(BufferSlice packet) final {
@ -3072,7 +3072,7 @@ class GetBroadcastStatsQuery final : public Td::ResultHandler {
flags |= telegram_api::stats_getBroadcastStats::DARK_MASK;
}
send_query(G()->net_query_creator().create(
telegram_api::stats_getBroadcastStats(flags, false /*ignored*/, std::move(input_channel)), dc_id));
telegram_api::stats_getBroadcastStats(flags, false /*ignored*/, std::move(input_channel)), {}, dc_id));
}
void on_result(BufferSlice packet) final {
@ -3123,7 +3123,7 @@ class GetMessageStatsQuery final : public Td::ResultHandler {
send_query(G()->net_query_creator().create(
telegram_api::stats_getMessageStats(flags, false /*ignored*/, std::move(input_channel),
message_id.get_server_message_id().get()),
dc_id));
{}, dc_id));
}
void on_result(BufferSlice packet) final {
@ -3154,7 +3154,7 @@ class LoadAsyncGraphQuery final : public Td::ResultHandler {
if (x != 0) {
flags |= telegram_api::stats_loadAsyncGraph::X_MASK;
}
send_query(G()->net_query_creator().create(telegram_api::stats_loadAsyncGraph(flags, token, x), dc_id));
send_query(G()->net_query_creator().create(telegram_api::stats_loadAsyncGraph(flags, token, x), {}, dc_id));
}
void on_result(BufferSlice packet) final {

View File

@ -58,11 +58,8 @@ class SetGameScoreActor final : public NetActorOnce {
CHECK(input_user != nullptr);
auto query = G()->net_query_creator().create(
telegram_api::messages_setGameScore(flags, false /*ignored*/, false /*ignored*/, std::move(input_peer),
message_id.get_server_message_id().get(), std::move(input_user), score));
query->debug("send to MultiSequenceDispatcher");
send_closure(td_->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), sequence_dispatcher_id);
message_id.get_server_message_id().get(), std::move(input_user), score), {sequence_dispatcher_id});
send_query(std::move(query));
}
void on_result(BufferSlice packet) final {
@ -107,7 +104,7 @@ class SetInlineGameScoreQuery final : public Td::ResultHandler {
send_query(G()->net_query_creator().create(
telegram_api::messages_setInlineGameScore(flags, false /*ignored*/, false /*ignored*/,
std::move(input_bot_inline_message_id), std::move(input_user), score),
dc_id));
{}, dc_id));
}
void on_result(BufferSlice packet) final {
@ -178,7 +175,7 @@ class GetInlineGameHighScoresQuery final : public Td::ResultHandler {
auto dc_id = DcId::internal(InlineQueriesManager::get_inline_message_dc_id(input_bot_inline_message_id));
send_query(G()->net_query_creator().create(
telegram_api::messages_getInlineGameHighScores(std::move(input_bot_inline_message_id), std::move(input_user)),
dc_id));
{}, dc_id));
}
void on_result(BufferSlice packet) final {

View File

@ -51,7 +51,7 @@ class GetGroupCallStreamQuery final : public Td::ResultHandler {
int32 flags = 0;
auto query = G()->net_query_creator().create(
telegram_api::upload_getFile(flags, false /*ignored*/, false /*ignored*/, std::move(input_stream), 0, 1 << 20),
stream_dc_id, NetQuery::Type::DownloadSmall);
{}, stream_dc_id, NetQuery::Type::DownloadSmall);
query->total_timeout_limit_ = 0;
send_query(std::move(query));
}

View File

@ -338,10 +338,9 @@ class GetPinnedDialogsActor final : public NetActorOnce {
NetQueryRef send(FolderId folder_id, uint64 sequence_id) {
folder_id_ = folder_id;
auto query = G()->net_query_creator().create(telegram_api::messages_getPinnedDialogs(folder_id.get()));
auto query = G()->net_query_creator().create(telegram_api::messages_getPinnedDialogs(folder_id.get()), {sequence_id});
auto result = query.get_weak();
send_closure(td_->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), ChainIds{sequence_id});
send_query(std::move(query));
return result;
}
@ -794,9 +793,8 @@ class GetDialogListActor final : public NetActorOnce {
telegram_api::messages_getDialogs::EXCLUDE_PINNED_MASK | telegram_api::messages_getDialogs::FOLDER_ID_MASK;
auto query = G()->net_query_creator().create(
telegram_api::messages_getDialogs(flags, false /*ignored*/, folder_id.get(), offset_date,
offset_message_id.get(), std::move(input_peer), limit, 0));
send_closure(td_->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), ChainIds{sequence_id});
offset_message_id.get(), std::move(input_peer), limit, 0), {sequence_id});
send_query(std::move(query));
}
void on_result(BufferSlice packet) final {
@ -1852,8 +1850,8 @@ class ToggleDialogIsBlockedActor final : public NetActorOnce {
CHECK(input_peer != nullptr && input_peer->get_id() != telegram_api::inputPeerEmpty::ID);
auto query = is_blocked ? G()->net_query_creator().create(telegram_api::contacts_block(std::move(input_peer)))
: G()->net_query_creator().create(telegram_api::contacts_unblock(std::move(input_peer)));
send_closure(td_->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), ChainIds{sequence_dispatcher_id});
query->set_chains({sequence_dispatcher_id});
send_query(std::move(query));
}
void on_result(BufferSlice packet) final {
@ -2709,7 +2707,7 @@ class GetMessagePublicForwardsQuery final : public Td::ResultHandler {
td_->contacts_manager_->get_input_channel(dialog_id_.get_channel_id()),
full_message_id.get_message_id().get_server_message_id().get(), offset_date,
std::move(input_peer), offset_message_id.get(), limit),
dc_id));
{}, dc_id));
}
void on_result(BufferSlice packet) final {
@ -3093,10 +3091,8 @@ class SaveDefaultSendAsActor final : public NetActorOnce {
CHECK(send_as_input_peer != nullptr);
auto query = G()->net_query_creator().create(
telegram_api::messages_saveDefaultSendAs(std::move(input_peer), std::move(send_as_input_peer)));
query->debug("send to MessagesManager::MultiSequenceDispatcher");
send_closure(td_->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), ChainIds{sequence_dispatcher_id});
telegram_api::messages_saveDefaultSendAs(std::move(input_peer), std::move(send_as_input_peer)), {sequence_dispatcher_id});
send_query(std::move(query));
}
void on_result(BufferSlice packet) final {
@ -3197,7 +3193,7 @@ class SendMessageActor final : public NetActorOnce {
auto query = G()->net_query_creator().create(telegram_api::messages_sendMessage(
flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
std::move(input_peer), reply_to_message_id.get_server_message_id().get(), text, random_id,
std::move(reply_markup), std::move(entities), schedule_date, std::move(as_input_peer)));
std::move(reply_markup), std::move(entities), schedule_date, std::move(as_input_peer)), {sequence_dispatcher_id});
if (G()->shared_config().get_option_boolean("use_quick_ack")) {
query->quick_ack_promise_ = PromiseCreator::lambda(
[random_id](Unit) {
@ -3206,9 +3202,7 @@ class SendMessageActor final : public NetActorOnce {
PromiseCreator::Ignore());
}
*send_query_ref = query.get_weak();
query->debug("send to MessagesManager::MultiSequenceDispatcher");
send_closure(td_->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), ChainIds{sequence_dispatcher_id});
send_query(std::move(query));
}
void on_result(BufferSlice packet) final {
@ -3389,11 +3383,9 @@ class SendMultiMediaActor final : public NetActorOnce {
auto query = G()->net_query_creator().create(telegram_api::messages_sendMultiMedia(
flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, std::move(input_peer),
reply_to_message_id.get_server_message_id().get(), std::move(input_single_media), schedule_date,
std::move(as_input_peer)));
std::move(as_input_peer)), {sequence_dispatcher_id});
// no quick ack, because file reference errors are very likely to happen
query->debug("send to MessagesManager::MultiSequenceDispatcher");
send_closure(td_->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), ChainIds{sequence_dispatcher_id});
send_query(std::move(query));
}
void on_result(BufferSlice packet) final {
@ -3508,7 +3500,7 @@ class SendMediaActor final : public NetActorOnce {
auto query = G()->net_query_creator().create(telegram_api::messages_sendMedia(
flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, std::move(input_peer),
reply_to_message_id.get_server_message_id().get(), std::move(input_media), text, random_id,
std::move(reply_markup), std::move(entities), schedule_date, std::move(as_input_peer)));
std::move(reply_markup), std::move(entities), schedule_date, std::move(as_input_peer)), {sequence_dispatcher_id});
if (G()->shared_config().get_option_boolean("use_quick_ack") && was_uploaded_) {
query->quick_ack_promise_ = PromiseCreator::lambda(
[random_id](Unit) {
@ -3517,9 +3509,7 @@ class SendMediaActor final : public NetActorOnce {
PromiseCreator::Ignore());
}
*send_query_ref = query.get_weak();
query->debug("send to MessagesManager::MultiSequenceDispatcher");
send_closure(td_->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), ChainIds{sequence_dispatcher_id});
send_query(std::move(query));
}
void on_result(BufferSlice packet) final {
@ -3680,11 +3670,9 @@ class SendScheduledMessageActor final : public NetActorOnce {
int32 server_message_id = message_id.get_scheduled_server_message_id().get();
auto query = G()->net_query_creator().create(
telegram_api::messages_sendScheduledMessages(std::move(input_peer), {server_message_id}));
telegram_api::messages_sendScheduledMessages(std::move(input_peer), {server_message_id}), {sequence_dispatcher_id});
query->debug("send to MessagesManager::MultiSequenceDispatcher");
send_closure(td_->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), ChainIds{sequence_dispatcher_id});
send_query(std::move(query));
}
void on_result(BufferSlice packet) final {
@ -3762,11 +3750,8 @@ class EditMessageActor final : public NetActorOnce {
: message_id.get_server_message_id().get();
auto query = G()->net_query_creator().create(telegram_api::messages_editMessage(
flags, false /*ignored*/, std::move(input_peer), server_message_id, text, std::move(input_media),
std::move(reply_markup), std::move(entities), schedule_date));
query->debug("send to MessagesManager::MultiSequenceDispatcher");
send_closure(td_->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), ChainIds{sequence_dispatcher_id});
std::move(reply_markup), std::move(entities), schedule_date), {sequence_dispatcher_id});
send_query(std::move(query));
}
void on_result(BufferSlice packet) final {
@ -3828,7 +3813,7 @@ class EditInlineMessageQuery final : public Td::ResultHandler {
telegram_api::messages_editInlineBotMessage(flags, false /*ignored*/, std::move(input_bot_inline_message_id),
text, std::move(input_media), std::move(reply_markup),
std::move(entities)),
dc_id));
{}, dc_id));
}
void on_result(BufferSlice packet) final {
@ -3886,7 +3871,7 @@ class ForwardMessagesActor final : public NetActorOnce {
auto query = G()->net_query_creator().create(telegram_api::messages_forwardMessages(
flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
false /*ignored*/, std::move(from_input_peer), MessagesManager::get_server_message_ids(message_ids),
std::move(random_ids), std::move(to_input_peer), schedule_date, std::move(as_input_peer)));
std::move(random_ids), std::move(to_input_peer), schedule_date, std::move(as_input_peer)), {sequence_dispatcher_id});
if (G()->shared_config().get_option_boolean("use_quick_ack")) {
query->quick_ack_promise_ = PromiseCreator::lambda(
[random_ids = random_ids_](Unit) {
@ -3896,8 +3881,7 @@ class ForwardMessagesActor final : public NetActorOnce {
},
PromiseCreator::Ignore());
}
send_closure(td_->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), ChainIds{sequence_dispatcher_id});
send_query(std::move(query));
}
void on_result(BufferSlice packet) final {
@ -6045,8 +6029,6 @@ MessagesManager::MessagesManager(Td *td, ActorShared<> parent)
preload_folder_dialog_list_timeout_.set_callback(on_preload_folder_dialog_list_timeout_callback);
preload_folder_dialog_list_timeout_.set_callback_data(static_cast<void *>(this));
sequence_dispatcher_ = MultiSequenceDispatcher::create("multi sequence dispatcher");
}
MessagesManager::~MessagesManager() = default;

View File

@ -52,7 +52,6 @@
#include "td/telegram/secret_api.h"
#include "td/telegram/SecretChatId.h"
#include "td/telegram/SecretInputMedia.h"
#include "td/telegram/SequenceDispatcher.h"
#include "td/telegram/ServerMessageId.h"
#include "td/telegram/td_api.h"
#include "td/telegram/telegram_api.h"
@ -974,8 +973,6 @@ class MessagesManager final : public Actor {
void get_current_state(vector<td_api::object_ptr<td_api::Update>> &updates) const;
ActorOwn<MultiSequenceDispatcher> sequence_dispatcher_;
static uint64 get_sequence_dispatcher_id(DialogId dialog_id, MessageContentType message_content_type);
private:

View File

@ -568,7 +568,7 @@ class GetBankCardInfoQuery final : public Td::ResultHandler {
void send(const string &bank_card_number) {
send_query(G()->net_query_creator().create(telegram_api::payments_getBankCardData(bank_card_number),
G()->get_webfile_dc_id()));
{}, G()->get_webfile_dc_id()));
}
void on_result(BufferSlice packet) final {

View File

@ -155,12 +155,11 @@ class SetPollAnswerActor final : public NetActorOnce {
}
auto message_id = full_message_id.get_message_id().get_server_message_id().get();
auto sequence_id = static_cast<uint64>(-1);
auto query = G()->net_query_creator().create(
telegram_api::messages_sendVote(std::move(input_peer), message_id, std::move(options)));
telegram_api::messages_sendVote(std::move(input_peer), message_id, std::move(options)), {sequence_id});
*query_ref = query.get_weak();
auto sequence_id = -1;
send_closure(td_->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), sequence_id);
send_query(std::move(query));
}
void on_result(BufferSlice packet) final {
@ -210,13 +209,11 @@ class StopPollActor final : public NetActorOnce {
auto query = G()->net_query_creator().create(telegram_api::messages_editMessage(
flags, false /*ignored*/, std::move(input_peer), message_id, string(), std::move(input_media),
std::move(input_reply_markup), vector<tl_object_ptr<telegram_api::MessageEntity>>(), 0));
if (td_->auth_manager_->is_bot()) {
send_query(std::move(query));
} else {
auto sequence_id = -1;
send_closure(td_->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), sequence_id);
if (!td_->auth_manager_->is_bot()) {
auto sequence_id = static_cast<uint64>(-1);
query->set_chains({sequence_id});
}
send_query(std::move(query));
}
void on_result(BufferSlice packet) final {

View File

@ -70,7 +70,7 @@ SecretChatActor::SecretChatActor(int32 id, unique_ptr<Context> context, bool can
template <class T>
NetQueryPtr SecretChatActor::create_net_query(QueryType type, const T &function) {
return context_->net_query_creator().create(UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(type)),
function, DcId::main(), NetQuery::Type::Common, NetQuery::AuthFlag::On);
function, {}, DcId::main(), NetQuery::Type::Common, NetQuery::AuthFlag::On);
}
void SecretChatActor::update_chat(telegram_api::object_ptr<telegram_api::EncryptedChat> chat) {

View File

@ -180,7 +180,6 @@ void SequenceDispatcher::loop() {
VLOG(net_query) << "Send " << data_[next_i_].query_;
data_[next_i_].query_->debug("send to Td::send_with_callback");
data_[next_i_].query_->set_session_rand(session_rand_);
G()->net_query_dispatcher().dispatch_with_callback(std::move(data_[next_i_].query_),
actor_shared(this, next_i_ + id_offset_));
data_[next_i_].state_ = State::Wait;
@ -248,8 +247,11 @@ void SequenceDispatcher::close_silent() {
}
/*** MultiSequenceDispatcher ***/
void MultiSequenceDispatcherOld::send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback,
Span<uint64> chains) {
void MultiSequenceDispatcherOld::send(NetQueryPtr query) {
auto callback = query->move_callback();
CHECK(!callback.empty());
Span<ChainId> chains = query->chains();
query->set_in_sequence_dispatcher(true);
CHECK(all_of(chains, [](auto chain_id) { return chain_id != 0; }));
CHECK(!chains.empty());
auto sequence_id = chains[0];
@ -282,11 +284,12 @@ void MultiSequenceDispatcherOld::ready_to_close() {
class MultiSequenceDispatcherNewImpl final : public MultiSequenceDispatcherNew {
public:
void send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback, Span<uint64> chains) final {
void send(NetQueryPtr query) final {
auto callback = query->move_callback();
CHECK(!callback.empty());
Span<ChainId> chains = query->chains();
query->set_in_sequence_dispatcher(true);
CHECK(all_of(chains, [](auto chain_id) { return chain_id != 0; }));
if (!chains.empty()) {
query->set_session_rand(static_cast<uint32>(chains[0] >> 10));
}
Node node;
node.net_query = std::move(query);
node.net_query->debug("Waiting at SequenceDispatcher");

View File

@ -77,7 +77,7 @@ class SequenceDispatcher final : public NetQueryCallback {
class MultiSequenceDispatcherOld final : public SequenceDispatcher::Parent {
public:
void send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback, Span<uint64> chains);
void send(NetQueryPtr query);
static ActorOwn<MultiSequenceDispatcherOld> create(Slice name) {
return create_actor<MultiSequenceDispatcherOld>(name);
}
@ -96,7 +96,7 @@ using ChainId = uint64;
using ChainIds = vector<ChainId>;
class MultiSequenceDispatcherNew : public NetQueryCallback {
public:
virtual void send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback, Span<uint64> chains) = 0;
virtual void send(NetQueryPtr query) = 0;
static ActorOwn<MultiSequenceDispatcherNew> create(Slice name);
};

View File

@ -252,13 +252,13 @@ Result<std::pair<NetQueryPtr, bool>> FileDownloader::start_part(Part part, int32
id,
telegram_api::upload_getWebFile(remote_.as_input_web_file_location(),
static_cast<int32>(part.offset), static_cast<int32>(size)),
dc_id, net_query_type, NetQuery::AuthFlag::On)
{}, dc_id, net_query_type, NetQuery::AuthFlag::On)
: G()->net_query_creator().create(
id,
telegram_api::upload_getFile(flags, false /*ignored*/, false /*ignored*/,
remote_.as_input_file_location(),
static_cast<int32>(part.offset), static_cast<int32>(size)),
dc_id, net_query_type, NetQuery::AuthFlag::On);
{}, dc_id, net_query_type, NetQuery::AuthFlag::On);
} else {
if (remote_.is_web()) {
return Status::Error("Can't download web file from CDN");
@ -271,13 +271,13 @@ Result<std::pair<NetQueryPtr, bool>> FileDownloader::start_part(Part part, int32
LOG(DEBUG) << part.id << " " << to_string(query);
net_query =
G()->net_query_creator().create(UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::CDN)),
query, cdn_dc_id_, net_query_type, NetQuery::AuthFlag::Off);
query, {}, cdn_dc_id_, net_query_type, NetQuery::AuthFlag::Off);
} else {
auto query = telegram_api::upload_reuploadCdnFile(BufferSlice(cdn_file_token_), BufferSlice(it->second));
LOG(DEBUG) << part.id << " " << to_string(query);
net_query = G()->net_query_creator().create(
UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::ReuploadCDN)), query,
remote_.get_dc_id(), net_query_type, NetQuery::AuthFlag::On);
{}, remote_.get_dc_id(), net_query_type, NetQuery::AuthFlag::On);
cdn_part_reupload_token_.erase(it);
}
}
@ -470,7 +470,7 @@ Result<FileLoader::CheckInfo> FileDownloader::check_loop(int64 checked_prefix_si
auto query =
telegram_api::upload_getFileHashes(remote_.as_input_file_location(), narrow_cast<int32>(checked_prefix_size));
auto net_query_type = is_small_ ? NetQuery::Type::DownloadSmall : NetQuery::Type::Download;
auto net_query = G()->net_query_creator().create(query, remote_.get_dc_id(), net_query_type);
auto net_query = G()->net_query_creator().create(query, {}, remote_.get_dc_id(), net_query_type);
info.queries.push_back(std::move(net_query));
break;
}

View File

@ -201,7 +201,7 @@ class MapDownloadGenerateActor final : public FileGenerateActor {
LOG(INFO) << "Download " << conversion_;
auto query =
G()->net_query_creator().create(telegram_api::upload_getWebFile(r_input_web_file.move_as_ok(), 0, 1 << 20),
G()->get_webfile_dc_id(), NetQuery::Type::DownloadSmall);
{}, G()->get_webfile_dc_id(), NetQuery::Type::DownloadSmall);
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), {net_callback_.get(), 0});
}

View File

@ -278,10 +278,10 @@ Result<std::pair<NetQueryPtr, bool>> FileUploader::start_part(Part part, int32 p
if (big_flag_) {
auto query =
telegram_api::upload_saveBigFilePart(file_id_, part.id, local_is_ready_ ? part_count : -1, std::move(bytes));
net_query = G()->net_query_creator().create(query, DcId::main(), NetQuery::Type::Upload);
net_query = G()->net_query_creator().create(query, {}, DcId::main(), NetQuery::Type::Upload);
} else {
auto query = telegram_api::upload_saveFilePart(file_id_, part.id, std::move(bytes));
net_query = G()->net_query_creator().create(query, DcId::main(), NetQuery::Type::Upload);
net_query = G()->net_query_creator().create(query, {}, DcId::main(), NetQuery::Type::Upload);
}
net_query->file_type_ = narrow_cast<int32>(file_type_);
return std::make_pair(std::move(net_query), false);

View File

@ -166,7 +166,7 @@ void DcAuthManager::dc_loop(DcInfo &dc) {
VLOG(dc) << "Send exportAuthorization to " << dc.dc_id;
auto id = UniqueId::next();
auto query = G()->net_query_creator().create(id, telegram_api::auth_exportAuthorization(dc.dc_id.get_raw_id()),
DcId::main(), NetQuery::Type::Common, NetQuery::AuthFlag::On);
{}, DcId::main(), NetQuery::Type::Common, NetQuery::AuthFlag::On);
query->total_timeout_limit_ = 60 * 60 * 24;
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), actor_shared(this, dc.dc_id.get_raw_id()));
dc.wait_id = id;
@ -182,7 +182,7 @@ void DcAuthManager::dc_loop(DcInfo &dc) {
uint64 id = UniqueId::next();
VLOG(dc) << "Send importAuthorization to " << dc.dc_id;
auto query = G()->net_query_creator().create(
id, telegram_api::auth_importAuthorization(dc.export_id, std::move(dc.export_bytes)), dc.dc_id,
id, telegram_api::auth_importAuthorization(dc.export_id, std::move(dc.export_bytes)), {}, dc.dc_id,
NetQuery::Type::Common, NetQuery::AuthFlag::Off);
query->total_timeout_limit_ = 60 * 60 * 24;
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), actor_shared(this, dc.dc_id.get_raw_id()));

View File

@ -203,11 +203,11 @@ class NetQuery final : public TsListNode<NetQueryDebug> {
void set_invoke_after(std::vector<NetQueryRef> refs) {
invoke_after_ = std::move(refs);
}
void set_session_rand(uint32 session_rand) {
session_rand_ = session_rand;
}
uint32 session_rand() const {
return session_rand_;
if (in_sequence_dispacher_ && !chains_.empty()) {
return static_cast<uint32>(chains_[0] >> 10);
}
return 0;
}
void cancel(int32 cancellation_token) {
@ -276,6 +276,19 @@ class NetQuery final : public TsListNode<NetQueryDebug> {
priority_ = priority;
}
Span<uint64> chains() const {
return chains_;
}
void set_chains(std::vector<uint64> chains) {
chains_ = std::move(chains);
}
void set_in_sequence_dispatcher(bool flag) {
in_sequence_dispacher_ = flag;
}
bool in_sequence_dispatcher() const {
return in_sequence_dispacher_;
}
private:
State state_ = State::Empty;
Type type_ = Type::Common;
@ -291,8 +304,9 @@ class NetQuery final : public TsListNode<NetQueryDebug> {
int32 tl_constructor_ = 0;
std::vector<NetQueryRef> invoke_after_;
uint32 session_rand_ = 0;
std::vector<uint64> chains_;
bool in_sequence_dispacher_ = false;
bool may_be_lost_ = false;
int8 priority_{0};

View File

@ -25,11 +25,11 @@ NetQueryCreator::NetQueryCreator(std::shared_ptr<NetQueryStats> net_query_stats)
object_pool_.set_check_empty(true);
}
NetQueryPtr NetQueryCreator::create(const telegram_api::Function &function, DcId dc_id, NetQuery::Type type) {
return create(UniqueId::next(), function, dc_id, type, NetQuery::AuthFlag::On);
NetQueryPtr NetQueryCreator::create(const telegram_api::Function &function, std::vector<uint64> chains, DcId dc_id, NetQuery::Type type) {
return create(UniqueId::next(), function, std::move(chains), dc_id, type, NetQuery::AuthFlag::On);
}
NetQueryPtr NetQueryCreator::create(uint64 id, const telegram_api::Function &function, DcId dc_id, NetQuery::Type type,
NetQueryPtr NetQueryCreator::create(uint64 id, const telegram_api::Function &function, std::vector<uint64> chains, DcId dc_id, NetQuery::Type type,
NetQuery::AuthFlag auth_flag) {
LOG(INFO) << "Create query " << to_string(function);
auto storer = DefaultStorer<telegram_api::Function>(function);
@ -78,6 +78,7 @@ NetQueryPtr NetQueryCreator::create(uint64 id, const telegram_api::Function &fun
auto query = object_pool_.create(NetQuery::State::Query, id, std::move(slice), BufferSlice(), dc_id, type, auth_flag,
gzip_flag, tl_constructor, total_timeout_limit, net_query_stats_.get());
query->set_cancellation_token(query.generation());
query->set_chains(std::move(chains));
return query;
}

View File

@ -29,14 +29,14 @@ class NetQueryCreator {
object_pool_.set_check_empty(false);
}
NetQueryPtr create(const telegram_api::Function &function, DcId dc_id = DcId::main(),
NetQueryPtr create(const telegram_api::Function &function, std::vector<uint64> chains = {}, DcId dc_id = DcId::main(),
NetQuery::Type type = NetQuery::Type::Common);
NetQueryPtr create_unauth(const telegram_api::Function &function, DcId dc_id = DcId::main()) {
return create(UniqueId::next(), function, dc_id, NetQuery::Type::Common, NetQuery::AuthFlag::Off);
return create(UniqueId::next(), function, {}, dc_id, NetQuery::Type::Common, NetQuery::AuthFlag::Off);
}
NetQueryPtr create(uint64 id, const telegram_api::Function &function, DcId dc_id, NetQuery::Type type,
NetQueryPtr create(uint64 id, const telegram_api::Function &function, std::vector<uint64> chains, DcId dc_id, NetQuery::Type type,
NetQuery::AuthFlag auth_flag);
private:

View File

@ -49,12 +49,21 @@ void NetQueryDispatcher::dispatch(NetQueryPtr net_query) {
if (G()->shared_config().get_option_boolean("test_flood_wait")) {
net_query->set_error(Status::Error(429, "Too Many Requests: retry after 10"));
return complete_net_query(std::move(net_query));
// if (net_query->is_ok() && net_query->tl_constructor() == 0x0d9d75a4) {
// net_query->set_error(Status::Error(420, "FLOOD_WAIT_10"));
// }
}
if (net_query->tl_constructor() == telegram_api::account_getPassword::ID && false) {
net_query->set_error(Status::Error(429, "Too Many Requests: retry after 10"));
return complete_net_query(std::move(net_query));
}
if (!net_query->in_sequence_dispatcher() && !net_query->chains().empty()) {
net_query->debug("sent to main sequence dispatcher");
send_closure(sequence_dispatcher_, &MultiSequenceDispatcher::send, std::move(net_query));
return;
}
if (net_query->is_ready()) {
if (net_query->is_error()) {
auto code = net_query->error().code();
@ -208,6 +217,7 @@ void NetQueryDispatcher::stop() {
}
public_rsa_key_watchdog_.reset();
dc_auth_manager_.reset();
sequence_dispatcher_.reset();
}
void NetQueryDispatcher::update_session_count() {
@ -286,6 +296,7 @@ NetQueryDispatcher::NetQueryDispatcher(const std::function<ActorShared<>()> &cre
dc_auth_manager_ = create_actor<DcAuthManager>("DcAuthManager", create_reference());
common_public_rsa_key_ = std::make_shared<PublicRsaKeyShared>(DcId::empty(), G()->is_test_dc());
public_rsa_key_watchdog_ = create_actor<PublicRsaKeyWatchdog>("PublicRsaKeyWatchdog", create_reference());
sequence_dispatcher_ = MultiSequenceDispatcher::create("MultiSequenceDispatcher");
td_guard_ = create_shared_lambda_guard([actor = create_reference()] {});
}

View File

@ -15,6 +15,7 @@
#include "td/utils/common.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/Status.h"
#include "td/telegram/SequenceDispatcher.h"
#include <array>
#include <atomic>
@ -63,6 +64,7 @@ class NetQueryDispatcher {
bool need_destroy_auth_key_{false};
ActorOwn<NetQueryDelayer> delayer_;
ActorOwn<DcAuthManager> dc_auth_manager_;
ActorOwn<MultiSequenceDispatcher> sequence_dispatcher_;
struct Dc {
DcId id_;
std::atomic<bool> is_valid_{false};

View File

@ -1215,7 +1215,7 @@ bool Session::connection_send_check_main_key(ConnectionInfo *info) {
being_checked_main_auth_key_id_ = key_id;
last_check_query_id_ = UniqueId::next(UniqueId::BindKey);
NetQueryPtr query = G()->net_query_creator().create(last_check_query_id_, telegram_api::help_getNearestDc(),
DcId::main(), NetQuery::Type::Common, NetQuery::AuthFlag::On);
{}, DcId::main(), NetQuery::Type::Common, NetQuery::AuthFlag::On);
query->dispatch_ttl_ = 0;
query->set_callback(actor_shared(this));
connection_send_query(info, std::move(query));
@ -1252,7 +1252,7 @@ bool Session::connection_send_bind_key(ConnectionInfo *info) {
LOG(INFO) << "Bind key: " << tag("tmp", key_id) << tag("perm", static_cast<uint64>(perm_auth_key_id));
NetQueryPtr query = G()->net_query_creator().create(
last_bind_query_id_,
telegram_api::auth_bindTempAuthKey(perm_auth_key_id, nonce, expires_at, std::move(encrypted)), DcId::main(),
telegram_api::auth_bindTempAuthKey(perm_auth_key_id, nonce, expires_at, std::move(encrypted)), {}, DcId::main(),
NetQuery::Type::Common, NetQuery::AuthFlag::On);
query->dispatch_ttl_ = 0;
query->set_callback(actor_shared(this));